Commit 7ebf6bf3 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Logging improvements. Mostly fixes #27. Addresses #455, #27.

git-svn-id: file:///svn/tokudb@2471 c7de825b-a66e-492c-adef-691d508d4ae1
parent 0c74e904
...@@ -151,7 +151,7 @@ brt.o: $(BRT_INTERNAL_H_INCLUDES) key.h log_header.h ...@@ -151,7 +151,7 @@ brt.o: $(BRT_INTERNAL_H_INCLUDES) key.h log_header.h
fifo.o: fifo.h brttypes.h fifo.o: fifo.h brttypes.h
memory.o: memory.h memory.o: memory.h
primes.o: primes.h toku_assert.h primes.o: primes.h toku_assert.h
fifo-test: fifo.o memory.o toku_assert.o fifo-test: fifo.o memory.o toku_assert.o ybt.o
brt-serialize.o: $(BRT_INTERNAL_H_INCLUDES) key.h wbuf.h rbuf.h brt-serialize.o: $(BRT_INTERNAL_H_INCLUDES) key.h wbuf.h rbuf.h
brt-bigtest: memory.o ybt.o brt.o pma.o cachetable.o key.o fifo.o brt-serialize.o brt-bigtest: memory.o ybt.o brt.o pma.o cachetable.o key.o fifo.o brt-serialize.o
brt-bigtest.o: brt.h ../include/db.h brt-bigtest.o: brt.h ../include/db.h
......
...@@ -144,28 +144,6 @@ void toku_brtnode_free (BRTNODE *node); ...@@ -144,28 +144,6 @@ void toku_brtnode_free (BRTNODE *node);
#define DEADBEEF ((void*)0xDEADBEEFDEADBEEF) #define DEADBEEF ((void*)0xDEADBEEFDEADBEEF)
#endif #endif
/* tree command types */
enum brt_cmd_type {
BRT_NONE = 0,
BRT_INSERT = 1,
BRT_DELETE = 2,
BRT_DELETE_BOTH = 3,
};
/* tree commands */
struct brt_cmd {
enum brt_cmd_type type;
TXNID xid;
union {
/* insert or delete */
struct brt_cmd_insert_delete {
DBT *key;
DBT *val;
} id;
} u;
};
typedef struct brt_cmd BRT_CMD_S, *BRT_CMD;
struct brtenv { struct brtenv {
CACHETABLE ct; CACHETABLE ct;
TOKULOGGER logger; TOKULOGGER logger;
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
/* Buffered repository tree.
* Observation: The in-memory representation of a node doesn't have to be the same as the on-disk representation.
* Goal for the in-memory representation: fast
* Goal for on-disk: small
*
* So to get this running fast, I'll make a version that doesn't do range queries:
* use a hash table for in-memory
* simply write the strings on disk.
* Later I'll do a PMA or a skiplist for the in-memory version.
* Also, later I'll convert the format to network order fromn host order.
* Later, for on disk, I'll compress it (perhaps with gzip, perhaps with the bzip2 algorithm.)
*
* The collection of nodes forms a data structure like a B-tree. The complexities of keeping it balanced apply.
*
* We always write nodes to a new location on disk.
* The nodes themselves contain the information about the tree structure.
* Q: During recovery, how do we find the root node without looking at every block on disk?
* A: The root node is either the designated root near the front of the freelist.
* The freelist is updated infrequently. Before updating the stable copy of the freelist, we make sure that
* the root is up-to-date. We can make the freelist-and-root update be an arbitrarily small fraction of disk bandwidth.
*
*/
#include <arpa/inet.h>
#include <errno.h>
#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "toku_assert.h"
#include "brt-internal.h"
#include "key.h"
#include "log_header.h"
extern long long n_items_malloced;
static int malloc_diskblock (DISKOFF *res, BRT brt, int size, TOKULOGGER);
static void verify_local_fingerprint_nonleaf (BRTNODE node);
/* Frees a node, including all the stuff in the hash table. */
void toku_brtnode_free (BRTNODE *nodep) {
BRTNODE node=*nodep;
int i;
//printf("%s:%d %p->mdict[0]=%p\n", __FILE__, __LINE__, node, node->mdicts[0]);
if (node->height>0) {
for (i=0; i<node->u.n.n_children-1; i++) {
toku_free((void*)node->u.n.childkeys[i]);
}
for (i=0; i<node->u.n.n_children; i++) {
if (BNC_BUFFER(node,i)) {
toku_fifo_free(&BNC_BUFFER(node,i));
}
}
} else {
if (node->u.l.buffer) // The buffer may have been freed already, in some cases.
toku_pma_free(&node->u.l.buffer);
}
toku_free(node);
*nodep=0;
}
static long brtnode_size(BRTNODE node) {
long size;
assert(node->tag == TYP_BRTNODE);
if (node->height > 0)
size = node->u.n.n_bytes_in_buffers;
else
size = node->u.l.n_bytes_in_buffer;
return size;
}
static void toku_update_brtnode_loggerlsn(BRTNODE node, TOKULOGGER logger) {
if (logger) {
node->log_lsn = toku_logger_last_lsn(logger);
}
}
static void fixup_child_fingerprint(BRTNODE node, int childnum_of_node, BRTNODE child, BRT brt, TOKULOGGER logger) {
u_int32_t old_fingerprint = BNC_SUBTREE_FINGERPRINT(node,childnum_of_node);
u_int32_t sum = child->local_fingerprint;
if (child->height>0) {
int i;
for (i=0; i<child->u.n.n_children; i++) {
sum += BNC_SUBTREE_FINGERPRINT(child,i);
}
}
// Don't try to get fancy about not modifying the fingerprint if it didn't change.
// We only call this function if we have reason to believe that the child's fingerprint did change.
BNC_SUBTREE_FINGERPRINT(node,childnum_of_node)=sum;
node->dirty=1;
toku_log_changechildfingerprint(logger, toku_cachefile_filenum(brt->cf), node->thisnodename, childnum_of_node, old_fingerprint, sum);
toku_update_brtnode_loggerlsn(node, logger);
}
// If you pass in data==0 then it only compares the key, not the data (even if is a DUPSORT database)
static int brt_compare_pivot(BRT brt, DBT *key, DBT *data, bytevec ck) {
int cmp;
DBT mydbt;
struct kv_pair *kv = (struct kv_pair *) ck;
if (brt->flags & TOKU_DB_DUPSORT) {
cmp = brt->compare_fun(brt->db, key, toku_fill_dbt(&mydbt, kv_pair_key(kv), kv_pair_keylen(kv)));
if (cmp == 0 && data != 0)
cmp = brt->dup_compare(brt->db, data, toku_fill_dbt(&mydbt, kv_pair_val(kv), kv_pair_vallen(kv)));
} else {
cmp = brt->compare_fun(brt->db, key, toku_fill_dbt(&mydbt, kv_pair_key(kv), kv_pair_keylen(kv)));
}
return cmp;
}
void toku_brtnode_flush_callback (CACHEFILE cachefile, DISKOFF nodename, void *brtnode_v, long size __attribute((unused)), BOOL write_me, BOOL keep_me, LSN modified_lsn __attribute__((__unused__)) , BOOL rename_p __attribute__((__unused__))) {
BRTNODE brtnode = brtnode_v;
if (0) {
printf("%s:%d toku_brtnode_flush_callback %p keep_me=%d height=%d", __FILE__, __LINE__, brtnode, keep_me, brtnode->height);
if (brtnode->height==0) printf(" pma=%p", brtnode->u.l.buffer);
printf("\n");
}
assert(brtnode->thisnodename==nodename);
if (write_me) {
toku_serialize_brtnode_to(toku_cachefile_fd(cachefile), brtnode->thisnodename, brtnode->nodesize, brtnode);
}
if (!keep_me) {
toku_brtnode_free(&brtnode);
}
}
int toku_brtnode_fetch_callback (CACHEFILE cachefile, DISKOFF nodename, void **brtnode_pv, long *sizep, void*extraargs, LSN *written_lsn) {
BRT t =(BRT)extraargs;
BRTNODE *result=(BRTNODE*)brtnode_pv;
int r = toku_deserialize_brtnode_from(toku_cachefile_fd(cachefile), nodename, result, t->flags, t->nodesize,
t->compare_fun, t->dup_compare, t->db, toku_cachefile_filenum(t->cf));
if (r == 0) {
*sizep = brtnode_size(*result);
*written_lsn = (*result)->disk_lsn;
}
return r;
}
void toku_brtheader_flush_callback (CACHEFILE cachefile, DISKOFF nodename, void *header_v, long size __attribute((unused)), BOOL write_me, BOOL keep_me, LSN lsn __attribute__((__unused__)), BOOL rename_p __attribute__((__unused__))) {
struct brt_header *h = header_v;
assert(nodename==0);
assert(!h->dirty); // shouldn't be dirty once it is unpinned.
if (write_me) {
toku_serialize_brt_header_to(toku_cachefile_fd(cachefile), h);
}
if (!keep_me) {
if (h->n_named_roots>0) {
int i;
for (i=0; i<h->n_named_roots; i++) {
toku_free(h->names[i]);
}
toku_free(h->names);
toku_free(h->roots);
}
toku_free(h);
}
}
int toku_brtheader_fetch_callback (CACHEFILE cachefile, DISKOFF nodename, void **headerp_v, long *sizep __attribute__((unused)), void*extraargs __attribute__((__unused__)), LSN *written_lsn) {
struct brt_header **h = (struct brt_header **)headerp_v;
assert(nodename==0);
int r = toku_deserialize_brtheader_from(toku_cachefile_fd(cachefile), nodename, h);
written_lsn->lsn = 0; // !!! WRONG. This should be stored or kept redundantly or something.
return r;
}
int toku_read_and_pin_brt_header (CACHEFILE cf, struct brt_header **header) {
void *header_p;
//fprintf(stderr, "%s:%d read_and_pin_brt_header(...)\n", __FILE__, __LINE__);
int r = toku_cachetable_get_and_pin(cf, 0, &header_p, NULL,
toku_brtheader_flush_callback, toku_brtheader_fetch_callback, 0);
if (r!=0) return r;
*header = header_p;
return 0;
}
int toku_unpin_brt_header (BRT brt) {
int r = toku_cachetable_unpin(brt->cf, 0, brt->h->dirty, 0);
brt->h->dirty=0;
brt->h=0;
return r;
}
static int unpin_brtnode (BRT brt, BRTNODE node) {
return toku_cachetable_unpin(brt->cf, node->thisnodename, node->dirty, brtnode_size(node));
}
typedef struct kvpair {
bytevec key;
unsigned int keylen;
bytevec val;
unsigned int vallen;
} *KVPAIR;
/* Forgot to handle the case where there is something in the freelist. */
static int malloc_diskblock_header_is_in_memory (DISKOFF *res, BRT brt, int size, TOKULOGGER logger) {
DISKOFF result = brt->h->unused_memory;
brt->h->unused_memory+=size;
brt->h->dirty = 1;
int r = toku_log_changeunusedmemory(logger, toku_cachefile_filenum(brt->cf), result, brt->h->unused_memory);
*res = result;
return r;
}
int malloc_diskblock (DISKOFF *res, BRT brt, int size, TOKULOGGER logger) {
#if 0
int r = read_and_pin_brt_header(brt->fd, &brt->h);
assert(r==0);
{
DISKOFF result = malloc_diskblock_header_is_in_memory(brt, size);
r = write_brt_header(brt->fd, &brt->h);
assert(r==0);
return result;
}
#else
return malloc_diskblock_header_is_in_memory(res, brt,size, logger);
#endif
}
static void initialize_brtnode (BRT t, BRTNODE n, DISKOFF nodename, int height) {
int i;
n->tag = TYP_BRTNODE;
n->nodesize = t->h->nodesize;
n->flags = t->h->flags;
n->thisnodename = nodename;
n->disk_lsn.lsn = 0; // a new one can always be 0.
n->log_lsn = n->disk_lsn;
n->layout_version = 2;
n->height = height;
n->rand4fingerprint = random();
n->local_fingerprint = 0;
n->dirty = 1;
assert(height>=0);
if (height>0) {
n->u.n.n_children = 0;
for (i=0; i<TREE_FANOUT; i++) {
// n->u.n.childkeys[i] = 0;
// n->u.n.childkeylens[i] = 0;
}
n->u.n.totalchildkeylens = 0;
for (i=0; i<TREE_FANOUT+1; i++) {
BNC_SUBTREE_FINGERPRINT(n, i) = 0;
// n->u.n.children[i] = 0;
// n->u.n.buffers[i] = 0;
BNC_NBYTESINBUF(n,i) = 0;
}
n->u.n.n_bytes_in_buffers = 0;
} else {
int r = toku_pma_create(&n->u.l.buffer, t->compare_fun, t->db, toku_cachefile_filenum(t->cf), n->nodesize);
assert(r==0);
toku_pma_set_dup_mode(n->u.l.buffer, t->flags & (TOKU_DB_DUP+TOKU_DB_DUPSORT));
toku_pma_set_dup_compare(n->u.l.buffer, t->dup_compare);
static int rcount=0;
//printf("%s:%d n PMA= %p (rcount=%d)\n", __FILE__, __LINE__, n->u.l.buffer, rcount);
rcount++;
n->u.l.n_bytes_in_buffer = 0;
}
}
static int create_new_brtnode (BRT t, BRTNODE *result, int height, TOKULOGGER logger) {
TAGMALLOC(BRTNODE, n);
int r;
DISKOFF name;
if ((r = malloc_diskblock(&name, t, t->h->nodesize, logger))) return r;
assert(n);
assert(t->h->nodesize>0);
//printf("%s:%d malloced %lld (and malloc again=%lld)\n", __FILE__, __LINE__, name, malloc_diskblock(t, t->nodesize));
initialize_brtnode(t, n, name, height);
*result = n;
assert(n->nodesize>0);
// n->brt = t;
//printf("%s:%d putting %p (%lld) parent=%p\n", __FILE__, __LINE__, n, n->thisnodename, parent_brtnode);
if ((r = toku_cachetable_put(t->cf, n->thisnodename, n, brtnode_size(n), toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t)))
return r;
if ((r = toku_log_newbrtnode(logger, toku_cachefile_filenum(t->cf), n->thisnodename, height, n->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, n->rand4fingerprint)))
return r;
toku_update_brtnode_loggerlsn(n, logger);
return 0;
}
static int insert_to_buffer_in_nonleaf (BRTNODE node, int childnum, BRT_CMD cmd) {
unsigned int n_bytes_added = BRT_CMD_OVERHEAD + KEY_VALUE_OVERHEAD + cmd->u.id.key->size + cmd->u.id.val->size;
int r = toku_fifo_enq_cmdstruct(BNC_BUFFER(node,childnum), cmd);
if (r!=0) return r;
node->local_fingerprint += node->rand4fingerprint*toku_calccrc32_cmdstruct(cmd);
BNC_NBYTESINBUF(node,childnum) += n_bytes_added;
node->u.n.n_bytes_in_buffers += n_bytes_added;
node->dirty = 1;
return 0;
}
// Split a leaf node, reusing it in new_nodes (as the last element)
static int split_leaf_node (BRT t, TOKULOGGER logger, BRTNODE node, int *n_new_nodes, BRTNODE **new_nodes, DBT **splitks) {
assert(node->height==0);
int r;
int n_children=1; // Initially we have the node itself.
BRTNODE *result_nodes=toku_malloc(sizeof(*result_nodes));
if (errno!=0) { r=errno; if (0) { died0: toku_free(result_nodes); } return r; }
DBT *result_splitks=toku_malloc(sizeof(*result_splitks));
if (errno!=0) { r=errno; if (0) { died1: toku_free(result_splitks); } goto died0; }
while (toku_serialize_brtnode_size(node)>node->nodesize) {
BRTNODE B;
DBT splitk;
if ((r = create_new_brtnode(t, &B, 0, logger))) return r;
// Split so that B is at least 1/2 full
// The stuff in B goes *before* node
if ((r = toku_pma_split(logger, toku_cachefile_filenum(t->cf),
node->thisnodename, node->u.l.buffer, &node->u.l.n_bytes_in_buffer, node->rand4fingerprint, &node->local_fingerprint, &node->log_lsn,
&splitk,
B->thisnodename, B->u.l.buffer, &B->u.l.n_bytes_in_buffer, B->rand4fingerprint, &B->local_fingerprint, &B->log_lsn)))
goto died1;
n_children++;
result_nodes = toku_realloc(result_nodes, n_children*sizeof(*result_nodes));
result_nodes[n_children-2] = B;
result_splitks = toku_realloc(result_nodes, (n_children-1)*sizeof(*result_splitks));
result_splitks[n_children-2] = splitk;
}
result_nodes[n_children-1]=node;
*n_new_nodes = n_children;
*new_nodes = result_nodes;
*splitks = result_splitks;
return 0;
}
/* Side effect: sets splitk->data pointer to a malloc'd value */
static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, TOKULOGGER logger) {
int old_n_children = node->u.n.n_children;
int n_children_in_a = old_n_children/2;
int n_children_in_b = old_n_children-n_children_in_a;
BRTNODE B;
FILENUM fnum = toku_cachefile_filenum(t->cf);
assert(node->height>0);
assert(node->u.n.n_children>=2); // Otherwise, how do we split? We need at least two children to split. */
assert(t->h->nodesize>=node->nodesize); /* otherwise we might be in trouble because the nodesize shrank. */
create_new_brtnode(t, &B, node->height, logger);
B->u.n.n_children =n_children_in_b;
//printf("%s:%d %p (%lld) becomes %p and %p\n", __FILE__, __LINE__, node, node->thisnodename, A, B);
//printf("%s:%d A is at %lld\n", __FILE__, __LINE__, A->thisnodename);
{
/* The first n_children_in_a go into node a.
* That means that the first n_children_in_a-1 keys go into node a.
* The splitter key is key number n_children_in_a */
int i;
for (i=0; i<n_children_in_b; i++) {
int r = toku_fifo_create(&BNC_BUFFER(B,i));
if (r!=0) return r;
}
for (i=n_children_in_a; i<old_n_children; i++) {
int targchild = i-n_children_in_a;
FIFO from_htab = BNC_BUFFER(node,i);
FIFO to_htab = BNC_BUFFER(B, targchild);
DISKOFF thischilddiskoff = BNC_DISKOFF(node, i);
BNC_DISKOFF(B, targchild) = thischilddiskoff;
int r = toku_log_addchild(logger, fnum, B->thisnodename, targchild, thischilddiskoff, BNC_SUBTREE_FINGERPRINT(node, i));
if (r!=0) return r;
while (1) {
bytevec key, data;
unsigned int keylen, datalen;
int type;
TXNID xid;
int fr = toku_fifo_peek(from_htab, &key, &keylen, &data, &datalen, &type, &xid);
if (fr!=0) break;
int n_bytes_moved = keylen+datalen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
BYTESTRING keybs = { .len = keylen, .data = (char*)key };
BYTESTRING databs = { .len = datalen, .data = (char*)data };
u_int32_t old_from_fingerprint = node->local_fingerprint;
u_int32_t old_to_fingerprint = B->local_fingerprint;
u_int32_t delta = toku_calccrc32_cmd(type, xid, key, keylen, data, datalen);
u_int32_t new_from_fingerprint = old_from_fingerprint - node->rand4fingerprint*delta;
u_int32_t new_to_fingerprint = old_to_fingerprint + B->rand4fingerprint *delta;
if (r!=0) return r;
r = toku_log_brtdeq(logger, fnum, node->thisnodename, n_children_in_a, xid, type, keybs, databs, old_from_fingerprint, new_from_fingerprint);
if (r!=0) return r;
r = toku_log_brtenq(logger, fnum, B->thisnodename, targchild, xid, type, keybs, databs, old_to_fingerprint, new_to_fingerprint);
r = toku_fifo_enq(to_htab, key, keylen, data, datalen, type, xid);
if (r!=0) return r;
toku_fifo_deq(from_htab);
// key and data will no longer be valid
node->local_fingerprint = new_from_fingerprint;
B->local_fingerprint = new_to_fingerprint;
B->u.n.n_bytes_in_buffers += n_bytes_moved;
BNC_NBYTESINBUF(B, targchild) += n_bytes_moved;
node->u.n.n_bytes_in_buffers -= n_bytes_moved;
BNC_NBYTESINBUF(node, i) -= n_bytes_moved;
// verify_local_fingerprint_nonleaf(B);
// verify_local_fingerprint_nonleaf(node);
}
// Delete a child, removing it's fingerprint, and also the preceeding pivot key. The child number must be > 0
{
BYTESTRING bs = { .len = kv_pair_keylen(node->u.n.childkeys[i-1]),
.data = kv_pair_key(node->u.n.childkeys[i-1]) };
assert(i>0);
r = toku_log_delchild(logger, fnum, node->thisnodename, n_children_in_a, thischilddiskoff, BNC_SUBTREE_FINGERPRINT(node, i), bs);
if (r!=0) return r;
if (i>n_children_in_a) {
r = toku_log_setpivot(logger, fnum, B->thisnodename, targchild-1, bs);
if (r!=0) return r;
B->u.n.childkeys[targchild-1] = node->u.n.childkeys[i-1];
B->u.n.totalchildkeylens += toku_brt_pivot_key_len(t, node->u.n.childkeys[i-1]);
node->u.n.totalchildkeylens -= toku_brt_pivot_key_len(t, node->u.n.childkeys[i-1]);
node->u.n.childkeys[i-1] = 0;
}
}
BNC_DISKOFF(node, i) = 0;
BNC_SUBTREE_FINGERPRINT(B, targchild) = BNC_SUBTREE_FINGERPRINT(node, i);
BNC_SUBTREE_FINGERPRINT(node, i) = 0;
assert(BNC_NBYTESINBUF(node, i) == 0);
}
// Drop the n_children now (not earlier) so that we can do the fingerprint verification at any time.
node->u.n.n_children=n_children_in_a;
for (i=n_children_in_a; i<old_n_children; i++) {
toku_fifo_free(&BNC_BUFFER(node,i));
}
splitk->data = (void*)(node->u.n.childkeys[n_children_in_a-1]);
splitk->size = toku_brt_pivot_key_len(t, node->u.n.childkeys[n_children_in_a-1]);
node->u.n.totalchildkeylens -= toku_brt_pivot_key_len(t, node->u.n.childkeys[n_children_in_a-1]);
node->u.n.childkeys[n_children_in_a-1]=0;
verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(B);
}
*nodea = node;
*nodeb = B;
assert(toku_serialize_brtnode_size(node)<node->nodesize);
assert(toku_serialize_brtnode_size(B)<B->nodesize);
return 0;
}
static void find_heaviest_child (BRTNODE node, int *childnum) {
int max_child = 0;
int max_weight = BNC_NBYTESINBUF(node, 0);
int i;
if (0) printf("%s:%d weights: %d", __FILE__, __LINE__, max_weight);
assert(node->u.n.n_children>0);
for (i=1; i<node->u.n.n_children; i++) {
int this_weight = BNC_NBYTESINBUF(node,i);
if (0) printf(" %d", this_weight);
if (max_weight < this_weight) {
max_child = i;
max_weight = this_weight;
}
}
*childnum = max_child;
if (0) printf("\n");
}
/* find the leftmost child that may contain the key */
static unsigned int brtnode_which_child (BRTNODE node , DBT *k, DBT *d, BRT t) {
int i;
assert(node->height>0);
for (i=0; i<node->u.n.n_children-1; i++) {
int cmp = brt_compare_pivot(t, k, d, node->u.n.childkeys[i]);
if (cmp > 0) continue;
if (cmp < 0) return i;
return i;
}
return node->u.n.n_children-1;
}
// We return the tree in a possibly overflowed state
static int brt_leaf_put_cmd_no_io (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger) {
assert(node->height==0);
FILENUM filenum = toku_cachefile_filenum(t->cf);
if (cmd->type == BRT_INSERT) {
DBT *k = cmd->u.id.key;
DBT *v = cmd->u.id.val;
int replaced_v_size;
enum pma_errors pma_status = toku_pma_insert_or_replace(node->u.l.buffer,
k, v, &replaced_v_size,
logger, cmd->xid,
filenum, node->thisnodename, node->rand4fingerprint, &node->local_fingerprint, &node->log_lsn);
assert(pma_status==BRT_OK);
if (replaced_v_size>=0) {
node->u.l.n_bytes_in_buffer += v->size - replaced_v_size;
} else {
node->u.l.n_bytes_in_buffer += k->size + v->size + KEY_VALUE_OVERHEAD + PMA_ITEM_OVERHEAD;
}
node->dirty = 1;
return 0;
} else if (cmd->type == BRT_DELETE) {
u_int32_t delta;
int r = toku_pma_delete(node->u.l.buffer, cmd->u.id.key, (DBT*)0,
logger, cmd->xid, node->thisnodename,
node->rand4fingerprint, &node->local_fingerprint, &delta, &node->log_lsn);
if (r == BRT_OK) {
node->u.l.n_bytes_in_buffer -= delta;
node->dirty = 1;
}
return BRT_OK;
} else if (cmd->type == BRT_DELETE_BOTH) {
u_int32_t delta;
int r = toku_pma_delete(node->u.l.buffer, cmd->u.id.key, cmd->u.id.val,
logger, cmd->xid, node->thisnodename,
node->rand4fingerprint, &node->local_fingerprint, &delta, &node->log_lsn);
if (r == BRT_OK) {
node->u.l.n_bytes_in_buffer -= delta;
node->dirty = 1;
}
return BRT_OK;
} else {
return EINVAL;
}
}
static int brtnode_put_cmd_no_io (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger);
// Put an command in a particular child's fifo without doing I/O. The node could end up overfull.
// If the child is in main memory and has space and the queue is empty, then we push into the child.
static int brt_nonleaf_put_cmd_to_child_no_io (BRT t, BRTNODE node, int childnum, BRT_CMD cmd, TOKULOGGER logger) {
DBT *k = cmd->u.id.key;
DBT *v = cmd->u.id.val;
if (toku_fifo_n_entries(BNC_BUFFER(node,childnum))==0) {
void *child_v;
int r = toku_cachetable_maybe_get_and_pin(t->cf, BNC_DISKOFF(node, childnum), &child_v);
if (r==0) {
BRTNODE child=child_v;
if (0) { died: unpin_brtnode(t, child); return r; }
int sizediff = k->size + v->size + KEY_VALUE_OVERHEAD + (child->height>0) ? BRT_CMD_OVERHEAD : PMA_ITEM_OVERHEAD;
if (sizediff+toku_serialize_brtnode_size(child) <= child->nodesize) {
// The fifo is empty, the child is in main memory, and it fits. So push it there.
if ((r = brtnode_put_cmd_no_io(t, child, cmd, logger))) goto died;
return unpin_brtnode(t, child);
}
if ((r = unpin_brtnode(t, child))) return r;
}
}
// Fall through to here if the fifo isn't empty or the child isn't in main memory or the stuff wouldn't fit into the child
int r=toku_fifo_enq_cmdstruct(BNC_BUFFER(node,childnum), cmd);
int diff = k->size + v->size + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
node->local_fingerprint += node->rand4fingerprint * toku_calccrc32_cmdstruct(cmd);
node->u.n.n_bytes_in_buffers += diff;
BNC_NBYTESINBUF(node, childnum) += diff;
node->dirty = 1;
return r;
}
// Put an "insert" command, without doing I/O. The node could end up overfull.
// The command could be pushed into a child if the child is in main memory and has enough space.
static int brt_nonleaf_insert_cmd_no_io (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger) {
return brt_nonleaf_put_cmd_to_child_no_io(t,
node,
brtnode_which_child(node, cmd->u.id.key, cmd->u.id.val, t),
cmd,
logger);
}
// Put the cmd into all the subtrees that it belong in. (Deletes can end up in several subtrees.)
// Don't do any I/O. The node could end up overfull.
// The commands could be pushed into children that are in main memory if such child has space
static int brt_nonleaf_delete_cmd_no_io (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger) {
int i;
int cmp=0;
for (i = 0; i < node->u.n.n_children-1; i++) {
cmp = brt_compare_pivot(t, cmd->u.id.key, 0, node->u.n.childkeys[i]);
if (cmp > 0) {
continue;
} else if (cmp < 0 || ((cmp==0) && t->flags & TOKU_DB_DUPSORT)) {
int r = brt_nonleaf_put_cmd_to_child_no_io(t, node, i, cmd, logger);
if (r!=0) return r;
} else {
return 0;
}
}
// If we fell off the bottom, the last key compare was <=
assert(cmp<=0);
assert(i<node->u.n.n_children); // i must be a valid child number. This works even if there is only one child.
if (t->flags & TOKU_DB_DUPSORT) {
return brt_nonleaf_put_cmd_to_child_no_io(t, node, i, cmd, logger);
}
return 0;
}
// Put the cmd into the node. Possibly results in the node being overfull.
// The command could get pushed into the appropriate child if the child is in main memory and has space to hold the command.
static int brt_nonleaf_put_cmd_no_io (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger) {
if (cmd->type == BRT_INSERT || cmd->type == BRT_DELETE_BOTH) {
return brt_nonleaf_insert_cmd_no_io(t, node, cmd, logger);
} else if (cmd->type == BRT_DELETE) {
return brt_nonleaf_delete_cmd_no_io(t, node, cmd, logger);
} else
return EINVAL;
}
// Pput the command into the node. For leaf nodes, that means execute the command.
// For internal nodes, just put it into the fifo, unless the appropriate child is in main memory and has a place to put the command without getting too big.
// The node could end up overfull (but the children cannot get too big)
// However, if you precalculate that the node is big enough, then the node will not get too big.
// (This implies that none of the children will overflow since we precalculate before calling this function on a child.)
static int brtnode_put_cmd_no_io (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger) {
if (node->height==0) {
return brt_leaf_put_cmd_no_io(t, node, cmd, logger);
} else {
return brt_nonleaf_put_cmd_no_io(t, node, cmd, logger);
}
}
static void verify_local_fingerprint_nonleaf (BRTNODE node) {
u_int32_t fp=0;
int i;
if (node->height==0) return;
for (i=0; i<node->u.n.n_children; i++)
FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, xid,
({
fp += node->rand4fingerprint * toku_calccrc32_cmd(type, xid, key, keylen, data, datalen);
}));
assert(fp==node->local_fingerprint);
}
static int setup_initial_brt_root_node (BRT t, DISKOFF offset, TOKULOGGER logger) {
int r;
TAGMALLOC(BRTNODE, node);
assert(node);
initialize_brtnode(t, node,
offset, /* the location is one nodesize offset from 0. */
0);
// node->brt = t;
if (0) {
printf("%s:%d for tree %p node %p mdict_create--> %p\n", __FILE__, __LINE__, t, node, node->u.l.buffer);
printf("%s:%d put root at %lld\n", __FILE__, __LINE__, offset);
}
r=toku_cachetable_put(t->cf, offset, node, brtnode_size(node),
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t);
if (r!=0) {
toku_free(node);
return r;
}
toku_verify_counts(node);
toku_log_newbrtnode(logger, toku_cachefile_filenum(t->cf), offset, 0, t->h->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, node->rand4fingerprint);
toku_update_brtnode_loggerlsn(node, logger);
r=unpin_brtnode(t, node);
if (r!=0) {
toku_free(node);
return r;
}
return 0;
}
int toku_brt_create(BRT *brt_ptr) {
BRT brt = toku_malloc(sizeof *brt);
if (brt == 0)
return ENOMEM;
memset(brt, 0, sizeof *brt);
list_init(&brt->cursors);
brt->flags = 0;
brt->nodesize = BRT_DEFAULT_NODE_SIZE;
brt->compare_fun = toku_default_compare_fun;
brt->dup_compare = toku_default_compare_fun;
*brt_ptr = brt;
return 0;
}
int toku_brt_set_flags(BRT brt, unsigned int flags) {
brt->flags = flags;
return 0;
}
int toku_brt_get_flags(BRT brt, unsigned int *flags) {
*flags = brt->flags;
return 0;
}
int toku_brt_set_nodesize(BRT brt, unsigned int nodesize) {
brt->nodesize = nodesize;
return 0;
}
int toku_brt_get_nodesize(BRT brt, unsigned int *nodesize) {
*nodesize = brt->nodesize;
return 0;
}
int toku_brt_set_bt_compare(BRT brt, int (*bt_compare)(DB *, const DBT*, const DBT*)) {
brt->compare_fun = bt_compare;
return 0;
}
int toku_brt_set_dup_compare(BRT brt, int (*dup_compare)(DB *, const DBT*, const DBT*)) {
brt->dup_compare = dup_compare;
return 0;
}
int toku_brt_get_fd(BRT brt, int *fdp) {
*fdp = toku_cachefile_fd(brt->cf);
return 0;
}
enum { UNDO_COUNTER_LIMIT=10 };
typedef void(*undo_fun)(void*);
struct undo_rec { undo_fun f; void *v; };
struct undo {
int undo_counter;
struct undo_rec undos[UNDO_COUNTER_LIMIT];
};
#define INITUNDO(u) struct undo u = (struct undo){.undo_counter=0}
void push_undo(struct undo *undos, undo_fun f, void *v) {
assert(undos->undo_counter<UNDO_COUNTER_LIMIT);
undos->undos[undos->undo_counter++]=(struct undo_rec){f,v};
}
void do_undos(struct undo *undos) {
while (undos->undo_counter>0) {
struct undo_rec *r = &undos->undos[--undos->undo_counter];
r->f(r->v);
}
}
void undo_free (void *v) {
void **ptr=v;
toku_free(*ptr);
*ptr=0;
}
// tbou means "toku_brt_open undo"
void tbou_close_cachefile (void *v) {
BRT t = v;
toku_cachefile_close(&t->cf);
}
struct maybe_unpin_info {
int is_pinned;
CACHEFILE cf;
CACHEKEY ckey;
};
void tbou_maybe_unpin (void *v) {
struct maybe_unpin_info *mui = v;
if (mui->is_pinned)
toku_cachetable_unpin(mui->cf, mui->ckey, 0, 0);
mui->is_pinned=0;
}
int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char *dbname, int is_create, int only_create, int load_flags, CACHETABLE cachetable, TOKUTXN txn, DB *db) {
/* If dbname is NULL then we setup to hold a single tree. Otherwise we setup an array. */
int r;
struct maybe_unpin_info mui = {.is_pinned=0};
INITUNDO(undos);
push_undo(&undos, tbou_maybe_unpin, &mui); // if we pin a cf, then we put it into the maybe_undo_info so it will get undone on error.
assert(is_create || !only_create);
assert(!load_flags || !only_create);
if (0) {
died:
do_undos(&undos);
return r;
}
{
if (dbname) {
char *malloced_name = toku_strdup(dbname);
if (malloced_name==0) { r = errno; goto died; }
push_undo(&undos, undo_free, &t->database_name);
t->database_name = malloced_name;
} else {
t->database_name = 0;
}
}
t->db = db;
{
int fd = open(fname, O_RDWR, 0777);
r = errno;
if (fd==-1) {
if (r==ENOENT) {
if (!is_create) { goto died; }
fd = open(fname, O_RDWR | O_CREAT, 0777);
if (fd==-1) { r=errno; goto died; }
r = toku_logger_log_fcreate(txn, fname_in_env, 0777);
if (r!=0) goto died;
} else
goto died;
}
if ((r = toku_cachetable_openfd(&t->cf, cachetable, fd, t))) goto died;
push_undo(&undos, tbou_close_cachefile, t);
}
if ((r = toku_logger_log_fopen(txn, fname_in_env, toku_cachefile_filenum(t->cf)))) goto died;
// no undo action for log_fopen
assert(t->nodesize>0);
if (is_create) {
r = toku_read_and_pin_brt_header(t->cf, &t->h);
if (r!=0 && r!=-1) goto died;
if (r==0) {
mui=(struct maybe_unpin_info){.is_pinned=1, .cf=t->cf, .ckey=0}; // remember to unpin it
int i;
assert(r==0);
assert(dbname);
if (t->h->unnamed_root!=-1) { r=EINVAL; goto died; } // Cannot create a subdb in a file that is not enabled for subdbs
assert(t->h->n_named_roots>=0);
for (i=0; i<t->h->n_named_roots; i++) {
if (strcmp(t->h->names[i], dbname)==0) {
if (only_create) {
r = EEXIST;
goto died;
}
else goto found_it;
}
}
if ((t->h->names = toku_realloc(t->h->names, (1+t->h->n_named_roots)*sizeof(*t->h->names))) == 0) { r=errno; goto died; }
if ((t->h->roots = toku_realloc(t->h->roots, (1+t->h->n_named_roots)*sizeof(*t->h->roots))) == 0) { r=errno; goto died; }
t->h->n_named_roots++;
if ((t->h->names[t->h->n_named_roots-1] = toku_strdup(dbname)) == 0) { r=errno; goto died; }
push_undo(&undos, undo_free, &t->h->names[t->h->n_named_roots-1]);
r = malloc_diskblock_header_is_in_memory(&t->h->roots[t->h->n_named_roots-1], t, t->h->nodesize, toku_txn_logger(txn));
if (r!=0) goto died;
t->h->dirty = 1;
if ((r=setup_initial_brt_root_node(t, t->h->roots[t->h->n_named_roots-1], toku_txn_logger(txn)))!=0) goto died;
} else {
assert(r==-1); // the pin failed because no data was present
/* construct a new header. */
if ((MALLOC(t->h))==0) { r = errno; goto died; }
t->h->dirty=1;
t->h->flags = t->flags;
t->h->nodesize=t->nodesize;
t->h->freelist=-1;
t->h->unused_memory=2*t->nodesize;
if (dbname) {
t->h->unnamed_root = -1;
t->h->n_named_roots = 1;
if ((MALLOC_N(1, t->h->names))==0) { r=errno; goto died; } push_undo(&undos, undo_free, &t->h->names);
if ((MALLOC_N(1, t->h->roots))==0) { r=errno; goto died; } push_undo(&undos, undo_free, &t->h->roots);
if ((t->h->names[0] = toku_strdup(dbname))==0) { r=errno; goto died; } push_undo(&undos, undo_free, &t->h->names[0]);
t->h->roots[0] = t->nodesize;
} else {
t->h->unnamed_root = t->nodesize;
t->h->n_named_roots = -1;
t->h->names=0;
t->h->roots=0;
}
if ((r=toku_logger_log_header(txn, toku_cachefile_filenum(t->cf), t->h))) goto died;
if ((r=setup_initial_brt_root_node(t, t->nodesize, toku_txn_logger(txn)))!=0) goto died;
if ((r=toku_cachetable_put(t->cf, 0, t->h, 0, toku_brtheader_flush_callback, toku_brtheader_fetch_callback, 0))) goto died;
mui=(struct maybe_unpin_info){.is_pinned=1, .cf=t->cf, .ckey=0}; // remember to unpin it
}
} else {
if ((r = toku_read_and_pin_brt_header(t->cf, &t->h))!=0) goto died;
mui=(struct maybe_unpin_info){.is_pinned=1, .cf=t->cf, .ckey=0}; // remember to unpin it
if (!dbname) {
if (t->h->n_named_roots!=-1) { r = EINVAL; goto died; } // requires a subdb
} else {
int i;
if (t->h->n_named_roots==-1) { r=EINVAL; goto died; } // no suddbs in the db
// printf("%s:%d n_roots=%d\n", __FILE__, __LINE__, t->h->n_named_roots);
for (i=0; i<t->h->n_named_roots; i++) {
if (strcmp(t->h->names[i], dbname)==0) {
goto found_it;
}
}
r=ENOENT; /* the database doesn't exist */
goto died;
}
found_it:
t->nodesize = t->h->nodesize; /* inherit the pagesize from the file */
if (t->flags != t->h->flags) { /* flags must match */
if (load_flags) t->flags = t->h->flags;
else { r = EINVAL; goto died; }
}
}
assert(t->h);
if ((r = toku_unpin_brt_header(t)) !=0) goto died; // it's unpinned
mui.is_pinned=0;
assert(t->h==0);
return 0;
}
int toku_brt_remove_subdb(BRT brt, const char *dbname, u_int32_t flags) {
int r;
int i;
int found = -1;
assert(flags == 0);
r = toku_read_and_pin_brt_header(brt->cf, &brt->h);
if (r!=0) return r;
assert(brt->h->unnamed_root==-1);
assert(brt->h->n_named_roots>=0);
for (i = 0; i < brt->h->n_named_roots; i++) {
if (strcmp(brt->h->names[i], dbname) == 0) {
found = i;
break;
}
}
if (found == -1) {
//Should not be possible.
r = ENOENT;
goto error;
}
//Free old db name
toku_free(brt->h->names[found]);
//TODO: Free Diskblocks including root
for (i = found + 1; i < brt->h->n_named_roots; i++) {
brt->h->names[i - 1] = brt->h->names[i];
brt->h->roots[i - 1] = brt->h->roots[i];
}
brt->h->n_named_roots--;
brt->h->dirty = 1;
//TODO: What if n_named_roots becomes 0? Should we handle it specially? Should we delete the file?
if ((brt->h->names = toku_realloc(brt->h->names, (brt->h->n_named_roots)*sizeof(*brt->h->names))) == 0) { r=errno; goto error; }
if ((brt->h->roots = toku_realloc(brt->h->roots, (brt->h->n_named_roots)*sizeof(*brt->h->roots))) == 0) { r=errno; goto error; }
r = toku_unpin_brt_header(brt);
return r;
error:
toku_unpin_brt_header(brt);
return r;
}
// This one has no env
int toku_open_brt (const char *fname, const char *dbname, int is_create, BRT *newbrt, int nodesize, CACHETABLE cachetable, TOKUTXN txn,
int (*compare_fun)(DB*,const DBT*,const DBT*), DB *db) {
BRT brt;
int r;
const int only_create = 0;
const int load_flags = 0;
r = toku_brt_create(&brt);
if (r != 0) return r;
toku_brt_set_nodesize(brt, nodesize);
toku_brt_set_bt_compare(brt, compare_fun);
r = toku_brt_open(brt, fname, fname, dbname, is_create, only_create, load_flags, cachetable, txn, db);
if (r != 0) {
toku_free(brt);
return r;
}
*newbrt = brt;
return 0;
}
int toku_close_brt (BRT brt) {
int r;
while (!list_empty(&brt->cursors)) {
BRT_CURSOR c = list_struct(list_pop(&brt->cursors), struct brt_cursor, cursors_link);
r=toku_brt_cursor_close(c);
if (r!=0) return r;
}
if (brt->cf) {
assert(0==toku_cachefile_count_pinned(brt->cf, 1)); // For the brt, the pinned count should be zero.
//printf("%s:%d closing cachetable\n", __FILE__, __LINE__);
if ((r = toku_cachefile_close(&brt->cf))!=0) return r;
}
if (brt->database_name) toku_free(brt->database_name);
if (brt->skey) { toku_free(brt->skey); }
if (brt->sval) { toku_free(brt->sval); }
toku_free(brt);
return 0;
}
CACHEKEY* toku_calculate_root_offset_pointer (BRT brt) {
if (brt->database_name==0) {
return &brt->h->unnamed_root;
} else {
int i;
for (i=0; i<brt->h->n_named_roots; i++) {
if (strcmp(brt->database_name, brt->h->names[i])==0) {
return &brt->h->roots[i];
}
}
}
abort();
}
static int brt_init_new_root(BRT brt, int n_new_nodes, BRTNODE *new_nodes, DBT *splitks, CACHEKEY *rootp, TOKULOGGER logger, BRTNODE *newrootp) {
assert(n_new_nodes>0);
TAGMALLOC(BRTNODE, newroot);
int r;
int new_height = new_nodes[0]->height+1;
int new_nodesize = brt->h->nodesize;
DISKOFF newroot_diskoff;
if ((r=malloc_diskblock(&newroot_diskoff, brt, new_nodesize, logger))) return r;
assert(newroot);
if (brt->database_name==0) {
toku_log_changeunnamedroot(logger, toku_cachefile_filenum(brt->cf), *rootp, newroot_diskoff);
} else {
BYTESTRING bs;
bs.len = 1+strlen(brt->database_name);
bs.data = brt->database_name;
toku_log_changenamedroot(logger, toku_cachefile_filenum(brt->cf), bs, *rootp, newroot_diskoff);
}
*rootp=newroot_diskoff;
brt->h->dirty=1;
initialize_brtnode (brt, newroot, newroot_diskoff, new_height);
newroot->u.n.n_children=n_new_nodes;
r=toku_log_newbrtnode(logger, toku_cachefile_filenum(brt->cf), newroot_diskoff, new_height, new_nodesize, (brt->flags&TOKU_DB_DUPSORT)!=0, newroot->rand4fingerprint);
if (r!=0) return r;
int i;
for (i=0; i<n_new_nodes; i++) {
BNC_DISKOFF(newroot, i)=new_nodes[i]->thisnodename;
r=toku_fifo_create(&BNC_BUFFER(newroot,i)); if (r!=0) return r;
r=toku_log_addchild(logger, toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, new_nodes[i]->thisnodename, 0);
if (r!=0) return r;
fixup_child_fingerprint(newroot, i, new_nodes[i], brt, logger);
}
toku_verify_counts(newroot);
int sum_splitk_sizes=0;
for (i=0; i+1<n_new_nodes; i++) {
sum_splitk_sizes += splitks[i].size;
newroot->u.n.childkeys[i] = splitks[i].data;
BYTESTRING bs = { .len = kv_pair_keylen(newroot->u.n.childkeys[0]),
.data = kv_pair_key(newroot->u.n.childkeys[0]) };
r=toku_log_setpivot(logger, toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, bs);
if (r!=0) return r;
toku_update_brtnode_loggerlsn(newroot, logger);
}
newroot->u.n.totalchildkeylens=sum_splitk_sizes;
for (i=0; i<n_new_nodes; i++) {
r=unpin_brtnode(brt, new_nodes[i]);
if (r!=0) return r;
}
toku_cachetable_put(brt->cf, newroot_diskoff, newroot, brtnode_size(newroot),
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
*newrootp = newroot;
return 0;
}
static int nonleaf_node_is_too_wide (BRT, BRTNODE);
static int split_nonleaf_node(BRT, int *n_new_nodes, BRTNODE **new_nodes, DBT **splitks);
static int leaf_node_is_too_full (BRT, BRTNODE);
// If CHILD is too wide, split it, and create a new node with the new children. Unpin CHILD or the new children (even if something goes wrong).
// If it does split, unpin the new root node also.
static int maybe_split_root(BRT brt, BRTNODE child, CACHEKEY *rootp, TOKULOGGER logger);
// if CHILD is too wide, split it, and fix up NODE. Either way, unpin the child or resulting children (even if it fails do the unpin)
static int maybe_split_nonroot (BRT brt, BRTNODE node, int childnum, BRTNODE child, TOKULOGGER logger);
// push things down into node's children (and into their children and so forth) but don't make any descendant too big.
static int push_down_without_overfilling (BRT brt, BRTNODE node, TOKULOGGER logger);
// If the buffers are too big, push stuff down. The subchild may need to be split, in which case our fanout may get too large.
// When are done, this node is has little enough stuff in its buffers (but the fanout may be too large), and all the descendant
// nodes are properly sized (the buffer sizes and fanouts are all small enough).
static int push_down_if_buffers_too_full(BRT brt, BRTNODE node, TOKULOGGER logger) {
if (node->height==0) return 0; // can't push down for leaf nodes
while (node->u.n.n_bytes_in_buffers > 0 && toku_serialize_brtnode_size(node)>node->nodesize) {
int childnum;
find_heaviest_child(node, &childnum);
void *child_v;
int r = toku_cachetable_get_and_pin(brt->cf, BNC_DISKOFF(node, childnum), &child_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
if (r!=0) return r;
BRTNODE child=child_v;
if (0) { died: unpin_brtnode(brt, child); return r; }
BRT_CMD_S cmd;
DBT key,val;
while (0==toku_fifo_peek_cmdstruct(BNC_BUFFER(node, childnum), &cmd, &key, &val)) {
r=toku_fifo_deq(BNC_BUFFER(node, childnum));
assert(r==0); // we just did a peek, so the buffer must be nonempty
r=brtnode_put_cmd_no_io(brt, child, &cmd, logger); if (r!=0) goto died;
if (toku_serialize_brtnode_size(child)>child->nodesize) {
// The child got too big, so do the fixup on the child
r = push_down_if_buffers_too_full(brt, child, logger); if (r!=0) goto died;
// After the split_nonroot call, the children are all unpinned...
r = maybe_split_nonroot(brt, node, childnum, child, logger);
if (r!=0) return r; // so on error just return r instead of going to died.
r =push_down_without_overfilling(brt, node, logger);
if (r!=0) return r;
// We hope that NODE is now not too full. One can imagine cases where it is too full, however, so we
// stop popping from this fifo, and go around the outer while loop to look at the node to see if it is too big again.
break;
}
}
}
return 0;
}
// Push data toward a child. If the child gets too big then the child will push down or split.
// If a split happens, then return immediately so that we can check to see if NODE needs to be split
static int flush_toward_child (BRT brt, BRTNODE node, int childnum, TOKULOGGER logger);
static int maybe_fixup_root (BRT brt, BRTNODE node, CACHEKEY *rootp, TOKULOGGER logger) {
int r;
if (node->height>0) {
// internal nodes can be too wide, but if too full, they did a push down
maybe_reshape_internal_node:
while (nonleaf_node_is_too_wide(brt, node)) {
int n_new_nodes; BRTNODE *new_nodes; DBT *splitks;
if ((r=split_nonleaf_node(brt, &n_new_nodes, &new_nodes, &splitks))) return r;
if ((r=brt_init_new_root(brt, n_new_nodes, new_nodes, splitks, rootp, logger, &node))) return r; // unpins all the new nodes, which are all small enough
// now node is still possibly too wide, hence the loop
}
} else {
// leaf nodes can be too full
if (leaf_node_is_too_full(brt, node)) {
int n_new_nodes; BRTNODE *new_nodes; DBT *splitks;
if ((r==split_leaf_node(brt, logger, node, &n_new_nodes, &new_nodes, &splitks))) return r;
if ((r==brt_init_new_root(brt, n_new_nodes, new_nodes, splitks, rootp, logger, &node))) return r; // unpins all the new nodes, which are all small enough
assert(node->height>0);
goto maybe_reshape_internal_node;
}
}
return 0;
}
static int brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger) {
void *node_v;
BRTNODE node;
CACHEKEY *rootp;
int r;
//assert(0==toku_cachetable_assert_all_unpinned(brt->cachetable));
if ((r = toku_read_and_pin_brt_header(brt->cf, &brt->h))) {
if (0) { died0: toku_unpin_brt_header(brt); }
return r;
}
rootp = toku_calculate_root_offset_pointer(brt);
if ((r=toku_cachetable_get_and_pin(brt->cf, *rootp, &node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt))) {
if (0) { died1: unpin_brtnode(brt, node); goto died0; }
goto died0;
}
//printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v);
node=node_v;
if ((r = brtnode_put_cmd_no_io(brt, node, cmd, logger))) goto died1; // put stuff in, possibly causing the buffers to get too big
if ((r = push_down_if_buffers_too_full(brt, node, logger))) goto died1; // if the buffers are too big, push stuff down
if ((r = maybe_split_root(brt, node, rootp, logger))) goto died1; // now the node might have to split (leaf nodes can't push down, and internal nodes have too much fanout) This will change node.
// Now the node is OK,
brt->h->dirty=1;
return toku_unpin_brt_header(brt);
}
int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
int r;
BRT_CMD_S brtcmd = { BRT_INSERT, toku_txn_get_txnid(txn), .u.id={key,val}};
r = brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn));
return r;
}
int toku_brt_lookup (BRT brt, DBT *k, DBT *v) {
int r, rr;
BRT_CURSOR cursor;
rr = toku_brt_cursor(brt, &cursor);
if (rr != 0) return rr;
int op = brt->flags & TOKU_DB_DUPSORT ? DB_GET_BOTH : DB_SET;
r = toku_brt_cursor_get(cursor, k, v, op, 0);
rr = toku_brt_cursor_close(cursor); assert(rr == 0);
return r;
}
int toku_brt_delete(BRT brt, DBT *key, TOKUTXN txn) {
int r;
DBT val;
BRT_CMD_S brtcmd = { BRT_DELETE, toku_txn_get_txnid(txn), .u.id={key, toku_init_dbt(&val)}};
r = brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn));
return r;
}
int toku_brt_delete_both(BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
int r;
BRT_CMD_S brtcmd = { BRT_DELETE_BOTH, toku_txn_get_txnid(txn), .u.id={key,val}};
r = brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn));
return r;
}
int toku_verify_brtnode (BRT brt, DISKOFF off, bytevec lorange, ITEMLEN lolen, bytevec hirange, ITEMLEN hilen, int recurse, BRTNODE parent_brtnode);
int toku_dump_brtnode (BRT brt, DISKOFF off, int depth, bytevec lorange, ITEMLEN lolen, bytevec hirange, ITEMLEN hilen, BRTNODE parent_brtnode) {
int result=0;
BRTNODE node;
void *node_v;
int r = toku_cachetable_get_and_pin(brt->cf, off, &node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
assert(r==0);
printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v);
node=node_v;
result=toku_verify_brtnode(brt, off, lorange, lolen, hirange, hilen, 0, parent_brtnode);
printf("%*sNode=%p\n", depth, "", node);
if (node->height>0) {
printf("%*sNode %lld nodesize=%d height=%d n_children=%d n_bytes_in_buffers=%d keyrange=%s %s\n",
depth, "", off, node->nodesize, node->height, node->u.n.n_children, node->u.n.n_bytes_in_buffers, (char*)lorange, (char*)hirange);
//printf("%s %s\n", lorange ? lorange : "NULL", hirange ? hirange : "NULL");
{
int i;
for (i=0; i< node->u.n.n_children; i++) {
printf("%*schild %d buffered (%d entries):\n", depth+1, "", i, toku_fifo_n_entries(BNC_BUFFER(node,i)));
FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, xid,
({
data=data; datalen=datalen; keylen=keylen;
printf("%*s xid=%"PRId64" %d (type=%d)\n", depth+2, "", xid, ntohl(*(int*)key), type);
//assert(strlen((char*)key)+1==keylen);
//assert(strlen((char*)data)+1==datalen);
}));
}
for (i=0; i<node->u.n.n_children; i++) {
printf("%*schild %d\n", depth, "", i);
if (i>0) {
printf("%*spivot %d len=%d %d\n", depth+1, "", i-1, node->u.n.childkeys[i-1]->keylen, ntohl(*(int*)&node->u.n.childkeys[i-1]->key));
}
toku_dump_brtnode(brt, BNC_DISKOFF(node, i), depth+4,
(i==0) ? lorange : node->u.n.childkeys[i-1],
(i==0) ? lolen : toku_brt_pivot_key_len(brt, node->u.n.childkeys[i-1]),
(i==node->u.n.n_children-1) ? hirange : node->u.n.childkeys[i],
(i==node->u.n.n_children-1) ? hilen : toku_brt_pivot_key_len(brt, node->u.n.childkeys[i]),
node
);
}
}
} else {
printf("%*sNode %lld nodesize=%d height=%d n_bytes_in_buffer=%d keyrange=%d %d\n",
depth, "", off, node->nodesize, node->height, node->u.l.n_bytes_in_buffer, lorange ? ntohl(*(int*)lorange) : 0, hirange ? ntohl(*(int*)hirange) : 0);
PMA_ITERATE(node->u.l.buffer, key, keylen, val __attribute__((__unused__)), vallen,
( keylen=keylen, vallen=vallen, printf(" (%d)%d ", keylen, ntohl(*(int*)key))));
printf("\n");
}
r = toku_cachetable_unpin(brt->cf, off, 0, 0);
assert(r==0);
return result;
}
int toku_dump_brt (BRT brt) {
int r;
CACHEKEY *rootp;
struct brt_header *prev_header = brt->h;
if ((r = toku_read_and_pin_brt_header(brt->cf, &brt->h))) {
if (0) { died0: toku_unpin_brt_header(brt); }
return r;
}
rootp = toku_calculate_root_offset_pointer(brt);
if ((r = toku_dump_brtnode(brt, *rootp, 0, 0, 0, 0, 0, null_brtnode))) goto died0;
if ((r = toku_unpin_brt_header(brt))!=0) return r;
brt->h = prev_header;
return 0;
}
static int show_brtnode_blocknumbers (BRT brt, DISKOFF off) {
BRTNODE node;
void *node_v;
int i,r;
assert(off%brt->h->nodesize==0);
if ((r = toku_cachetable_get_and_pin(brt->cf, off, &node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt))) {
if (0) { died0: toku_cachetable_unpin(brt->cf, off, 0, 0); }
return r;
}
printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v);
node=node_v;
printf(" %lld", off/brt->h->nodesize);
if (node->height>0) {
for (i=0; i<node->u.n.n_children; i++) {
if ((r=show_brtnode_blocknumbers(brt, BNC_DISKOFF(node, i)))) goto died0;
}
}
r = toku_cachetable_unpin(brt->cf, off, 0, 0);
return r;
}
#if 0
int show_brt_blocknumbers (BRT brt) {
int r;
CACHEKEY *rootp;
if ((r = toku_read_and_pin_brt_header(brt->cf, &brt->h))) {
if (0) { died0: toku_unpin_brt_header(brt); }
return r;
}
rootp = toku_calculate_root_offset_pointer(brt);
printf("BRT %p has blocks:", brt);
if ((r=show_brtnode_blocknumbers (brt, *rootp, 0))) goto died0;
printf("\n");
if ((r = toku_unpin_brt_header(brt))!=0) return r;
return 0;
}
#endif
int toku_brt_dbt_set_key(BRT brt, DBT *ybt, bytevec val, ITEMLEN vallen) {
int r = toku_dbt_set_value(ybt, val, vallen, &brt->skey);
return r;
}
int toku_brt_dbt_set_value(BRT brt, DBT *ybt, bytevec val, ITEMLEN vallen) {
int r = toku_dbt_set_value(ybt, val, vallen, &brt->sval);
return r;
}
/* search in a node's child */
static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, DBT *newkey, DBT *newval, TOKULOGGER logger) {
int r, rr;
/* if the child's buffer is not empty then try to empty it */
if (BNC_NBYTESINBUF(node, childnum) > 0) {
rr = maybe_push_some_brt_cmds_down(brt, node, childnum, logger);
if (rr!=0) return rr;
/* push down may cause a child split, so childnum may not be appropriate, and the node itself may split, so retry */
return EAGAIN;
}
void *node_v;
rr = toku_cachetable_get_and_pin(brt->cf, BNC_DISKOFF(node,childnum), &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
assert(rr == 0);
for (;;) {
BRTNODE childnode = node_v;
BRT_SPLIT childsplit; brt_split_init(&childsplit);
r = brt_search_node(brt, childnode, search, newkey, newval, &childsplit, logger);
if (childsplit.did_split) {
rr = handle_split_of_child(brt, node, childnum, childsplit.nodea, childsplit.nodeb, &childsplit.splitk,
&split->did_split, &split->nodea, &split->nodeb, &split->splitk, logger);
assert(rr == 0);
break;
} else {
if (r == EAGAIN)
continue;
rr = toku_cachetable_unpin(brt->cf, childnode->thisnodename, childnode->dirty, brtnode_size(childnode));
assert(rr == 0);
break;
}
}
return r;
}
static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, TOKULOGGER logger) {
int c;
/* binary search is overkill for a small array */
int child[node->u.n.n_children];
/* scan left to right or right to left depending on the search direction */
for (c = 0; c < node->u.n.n_children; c++)
child[c] = search->direction & BRT_SEARCH_LEFT ? c : node->u.n.n_children - 1 - c;
for (c = 0; c < node->u.n.n_children-1; c++) {
int p = search->direction & BRT_SEARCH_LEFT ? child[c] : child[c] - 1;
struct kv_pair *pivot = node->u.n.childkeys[p];
DBT pivotkey, pivotval;
if (search->compare(search,
toku_fill_dbt(&pivotkey, kv_pair_key(pivot), kv_pair_keylen(pivot)),
brt->flags & TOKU_DB_DUPSORT ? toku_fill_dbt(&pivotval, kv_pair_val(pivot), kv_pair_vallen(pivot)): 0)) {
int r = brt_search_child(brt, node, child[c], search, newkey, newval, logger);
// searching the child can cause it to get bent out of shape
int rr = maybe_fixup_nonroot(brt, node, child[c], logger);
if (rr!=0) return rr;
if (r == 0) return r;
}
}
/* check the first (left) or last (right) node if nothing has been found */
if (r == DB_NOTFOUND && c == node->u.n.n_children-1)
r = brt_search_child(brt, node, child[c], search, newkey, newval, split, logger);
return r;
}
static int brt_search_leaf_node(BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval) {
PMA pma = node->u.l.buffer;
int r = toku_pma_search(pma, search, newkey, newval);
return r;
}
static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, TOKULOGGER logger) {
if (node->height > 0)
return brt_search_nonleaf_node(brt, node, search, newkey, newval, logger);
else
return brt_search_leaf_node(node, search, newkey, newval);
}
int toku_brt_search(BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, TOKULOGGER logger) {
int r, rr;
rr = toku_read_and_pin_brt_header(brt->cf, &brt->h);
if (rr!=0) {
if (0) { died0: toku_unpin_brt_header(brt); }
return rr;
}
CACHEKEY *rootp = toku_calculate_root_offset_pointer(brt);
void *node_v;
BRTNODE node;
rr = toku_cachetable_get_and_pin(brt->cf, *rootp, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
if (rr!=0) {
if (0) { died1: unpin_brtnode(brt, node); }
goto died0;
}
node = node_v;
r = brt_search_node(brt, node, search, newkey, newval, logger);
rr = maybe_fixup_root(brt, node, rootp, logger);
if (rr!=0) { goto died1; }
rr = unpin_brtnode(brt, node);
if (rr!=0) { goto died0; }
rr = toku_unpin_brt_header(brt);
if (rr!=0) return rr;
return r;
}
static inline void dbt_cleanup(DBT *dbt) {
if (dbt->data && (dbt->flags & DB_DBT_MALLOC)) {
toku_free_n(dbt->data, dbt->size); dbt->data = 0;
}
}
static inline void brt_cursor_cleanup(BRT_CURSOR cursor) {
dbt_cleanup(&cursor->key);
dbt_cleanup(&cursor->val);
}
static inline int brt_cursor_not_set(BRT_CURSOR cursor) {
return cursor->key.data == 0 || cursor->val.data == 0;
}
BOOL toku_brt_cursor_uninitialized(BRT_CURSOR c) {
return brt_cursor_not_set(c);
}
static inline void brt_cursor_set_key_val(BRT_CURSOR cursor, DBT *newkey, DBT *newval) {
brt_cursor_cleanup(cursor);
cursor->key = *newkey; memset(newkey, 0, sizeof *newkey);
cursor->val = *newval; memset(newval, 0, sizeof *newval);
}
int toku_brt_cursor(BRT brt, BRT_CURSOR *cursorptr) {
BRT_CURSOR cursor = toku_malloc(sizeof *cursor);
if (cursor == 0)
return ENOMEM;
cursor->brt = brt;
toku_init_dbt(&cursor->key);
toku_init_dbt(&cursor->val);
list_push(&brt->cursors, &cursor->cursors_link);
*cursorptr = cursor;
return 0;
}
int toku_brt_cursor_close(BRT_CURSOR cursor) {
brt_cursor_cleanup(cursor);
list_remove(&cursor->cursors_link);
toku_free_n(cursor, sizeof *cursor);
return 0;
}
static inline int compare_k_x(BRT brt, DBT *k, DBT *x) {
return brt->compare_fun(brt->db, k, x);
}
static inline int compare_v_y(BRT brt, DBT *v, DBT *y) {
return brt->dup_compare(brt->db, v, y);
}
static inline int compare_kv_xy(BRT brt, DBT *k, DBT *v, DBT *x, DBT *y) {
int cmp = brt->compare_fun(brt->db, k, x);
if (cmp == 0 && v && y)
cmp = brt->dup_compare(brt->db, v, y);
return cmp;
}
static inline int brt_cursor_copyout(BRT_CURSOR cursor, DBT *key, DBT *val) {
int r = 0;
if (key)
r = toku_dbt_set_value(key, cursor->key.data, cursor->key.size, &cursor->brt->skey);
if (r == 0 && val)
r = toku_dbt_set_value(val, cursor->val.data, cursor->val.size, &cursor->brt->sval);
return r;
}
static int brt_cursor_compare_set(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
return compare_kv_xy(brt, search->k, search->v, x, y) <= 0; /* return min xy: kv <= xy */
}
static int brt_cursor_current(BRT_CURSOR cursor, int op, DBT *outkey, DBT *outval, TOKULOGGER logger) {
if (brt_cursor_not_set(cursor))
return EINVAL;
if (op == DB_CURRENT) {
DBT newkey; toku_init_dbt(&newkey);
DBT newval; toku_init_dbt(&newval);
brt_search_t search; brt_search_init(&search, brt_cursor_compare_set, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
int r = toku_brt_search(cursor->brt, &search, &newkey, &newval, logger);
if (r != 0 || compare_kv_xy(cursor->brt, &cursor->key, &cursor->val, &newkey, &newval) != 0)
return DB_KEYEMPTY;
}
return brt_cursor_copyout(cursor, outkey, outval);
}
/* search for the first kv pair that matches the search object */
static int brt_cursor_search(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval, TOKULOGGER logger) {
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC;
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC;
int r = toku_brt_search(cursor->brt, search, &newkey, &newval, logger);
if (r == 0) {
brt_cursor_set_key_val(cursor, &newkey, &newval);
r = brt_cursor_copyout(cursor, outkey, outval);
}
dbt_cleanup(&newkey);
dbt_cleanup(&newval);
return r;
}
/* search for the kv pair that matches the search object and is equal to kv */
static int brt_cursor_search_eq_kv_xy(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval, TOKULOGGER logger) {
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC;
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC;
int r = toku_brt_search(cursor->brt, search, &newkey, &newval, logger);
if (r == 0) {
if (compare_kv_xy(cursor->brt, search->k, search->v, &newkey, &newval) == 0) {
brt_cursor_set_key_val(cursor, &newkey, &newval);
r = brt_cursor_copyout(cursor, outkey, outval);
} else
r = DB_NOTFOUND;
}
dbt_cleanup(&newkey);
dbt_cleanup(&newval);
return r;
}
/* search for the kv pair that matches the search object and is equal to k */
static int brt_cursor_search_eq_k_x(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval, TOKULOGGER logger) {
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC;
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC;
int r = toku_brt_search(cursor->brt, search, &newkey, &newval, logger);
if (r == 0) {
if (compare_k_x(cursor->brt, search->k, &newkey) == 0) {
brt_cursor_set_key_val(cursor, &newkey, &newval);
r = brt_cursor_copyout(cursor, outkey, outval);
} else
r = DB_NOTFOUND;
}
dbt_cleanup(&newkey);
dbt_cleanup(&newval);
return r;
}
static int brt_cursor_compare_one(brt_search_t *search, DBT *x, DBT *y) {
search = search; x = x; y = y;
return 1;
}
static int brt_cursor_first(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_one, BRT_SEARCH_LEFT, 0, 0, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_last(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_one, BRT_SEARCH_RIGHT, 0, 0, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_compare_next(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
return compare_kv_xy(brt, search->k, search->v, x, y) < 0; /* return min xy: kv < xy */
}
static int brt_cursor_next(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_next, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_compare_next_nodup(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context; y = y;
return compare_k_x(brt, search->k, x) < 0; /* return min x: k < x */
}
static int brt_cursor_next_nodup(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_next_nodup, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_compare_next_dup(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
int keycmp = compare_k_x(brt, search->k, x);
if (keycmp < 0)
return 1;
else
return keycmp == 0 && y && compare_v_y(brt, search->v, y) < 0; /* return min xy: k <= x && v < y */
}
static int brt_cursor_next_dup(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_next_dup, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search_eq_k_x(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_compare_get_both_range(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
int keycmp = compare_k_x(brt, search->k, x);
if (keycmp < 0)
return 1;
else
return keycmp == 0 && (y == 0 || compare_v_y(brt, search->v, y) <= 0); /* return min xy: k <= x && v <= y */
}
static int brt_cursor_get_both_range(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_get_both_range, BRT_SEARCH_LEFT, key, val, cursor->brt);
return brt_cursor_search_eq_k_x(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_compare_prev(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
return compare_kv_xy(brt, search->k, search->v, x, y) > 0; /* return max xy: kv > xy */
}
static int brt_cursor_prev(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_prev, BRT_SEARCH_RIGHT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_compare_prev_nodup(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context; y = y;
return compare_k_x(brt, search->k, x) > 0; /* return max x: k > x */
}
static int brt_cursor_prev_nodup(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_prev_nodup, BRT_SEARCH_RIGHT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
#ifdef DB_PREV_DUP
static int brt_cursor_compare_prev_dup(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
int keycmp = compare_k_x(brt, search->k, x);
if (keycmp > 0)
return 1;
else
return keycmp == 0 && y && compare_v_y(brt, search->v, y) > 0; /* return max xy: k >= x && v > y */
}
static int brt_cursor_prev_dup(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_prev_dup, BRT_SEARCH_RIGHT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search_eq_k_x(cursor, &search, outkey, outval, logger);
}
#endif
static int brt_cursor_compare_set_range(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
return compare_kv_xy(brt, search->k, search->v, x, y) <= 0; /* return kv <= xy */
}
static int brt_cursor_set(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_set_range, BRT_SEARCH_LEFT, key, val, cursor->brt);
return brt_cursor_search_eq_kv_xy(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_set_range(BRT_CURSOR cursor, DBT *key, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_set_range, BRT_SEARCH_LEFT, key, 0, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *key, DBT *val, int get_flags, TOKUTXN txn) {
int r;
int op = get_flags & DB_OPFLAGS_MASK;
TOKULOGGER logger = toku_txn_logger(txn);
if (get_flags & ~DB_OPFLAGS_MASK)
return EINVAL;
switch (op) {
case DB_CURRENT:
case DB_CURRENT_BINDING:
r = brt_cursor_current(cursor, op, key, val, logger);
break;
case DB_FIRST:
r = brt_cursor_first(cursor, key, val, logger);
break;
case DB_LAST:
r = brt_cursor_last(cursor, key, val, logger);
break;
case DB_NEXT:
if (brt_cursor_not_set(cursor))
r = brt_cursor_first(cursor, key, val, logger);
else
r = brt_cursor_next(cursor, key, val, logger);
break;
case DB_NEXT_DUP:
if (brt_cursor_not_set(cursor))
r = EINVAL;
else
r = brt_cursor_next_dup(cursor, key, val, logger);
break;
case DB_NEXT_NODUP:
if (brt_cursor_not_set(cursor))
r = brt_cursor_first(cursor, key, val, logger);
else
r = brt_cursor_next_nodup(cursor, key, val, logger);
break;
case DB_PREV:
if (brt_cursor_not_set(cursor))
r = brt_cursor_last(cursor, key, val, logger);
else
r = brt_cursor_prev(cursor, key, val, logger);
break;
#ifdef DB_PREV_DUP
case DB_PREV_DUP:
if (brt_cursor_not_set(cursor))
r = EINVAL;
else
r = brt_cursor_prev_dup(cursor, key, val, logger);
break;
#endif
case DB_PREV_NODUP:
if (brt_cursor_not_set(cursor))
r = brt_cursor_last(cursor, key, val, logger);
else
r = brt_cursor_prev_nodup(cursor, key, val, logger);
break;
case DB_SET:
r = brt_cursor_set(cursor, key, 0, 0, val, logger);
break;
case DB_SET_RANGE:
r = brt_cursor_set_range(cursor, key, key, val, logger);
break;
case DB_GET_BOTH:
r = brt_cursor_set(cursor, key, val, 0, 0, logger);
break;
case DB_GET_BOTH_RANGE:
r = brt_cursor_get_both_range(cursor, key, val, 0, val, logger);
break;
default:
r = EINVAL;
break;
}
return r;
}
int toku_brt_cursor_delete(BRT_CURSOR cursor, int flags, TOKUTXN txn) {
if ((flags & ~DB_DELETE_ANY) != 0)
return EINVAL;
if (brt_cursor_not_set(cursor))
return EINVAL;
int r = 0;
if (!(flags & DB_DELETE_ANY))
r = brt_cursor_current(cursor, DB_CURRENT, 0, 0, toku_txn_logger(txn));
if (r == 0)
r = toku_brt_delete_both(cursor->brt, &cursor->key, &cursor->val, txn);
return r;
}
int toku_brt_height_of_root(BRT brt, int *height) {
// for an open brt, return the current height.
int r;
if ((r = toku_read_and_pin_brt_header(brt->cf, &brt->h))) {
if (0) { died0: toku_unpin_brt_header(brt); }
return r;
}
CACHEKEY *rootp = toku_calculate_root_offset_pointer(brt);
void *node_v;
if ((r=toku_cachetable_get_and_pin(brt->cf, *rootp, &node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt))) {
goto died0;
}
BRTNODE node = node_v;
*height = node->height;
r = unpin_brtnode(brt, node); assert(r==0);
r = toku_unpin_brt_header(brt); assert(r==0);
return 0;
}
int toku_testsetup_leaf(BRT brt, DISKOFF *diskoff) {
BRTNODE node;
int r = toku_read_and_pin_brt_header(brt->cf, &brt->h);
if (r!=0) return r;
create_new_brtnode(brt, &node, 0, (TOKULOGGER)0);
*diskoff = node->thisnodename;
r = unpin_brtnode(brt, node);
if (r!=0) return r;
r = toku_unpin_brt_header(brt);
if (r!=0) return r;
return 0;
}
// Don't bother to clean up carefully if something goes wrong. (E.g., it's OK to have malloced stuff that hasn't been freed.)
int toku_testsetup_nonleaf (BRT brt, int height, DISKOFF *diskoff, int n_children, DISKOFF *children, u_int32_t *subtree_fingerprints, char **keys, int *keylens) {
BRTNODE node;
assert(n_children<=BRT_FANOUT);
int r = toku_read_and_pin_brt_header(brt->cf, &brt->h);
if (r!=0) return r;
create_new_brtnode(brt, &node, height, (TOKULOGGER)0);
node->u.n.n_children=n_children;
node->u.n.totalchildkeylens=0;
node->u.n.n_bytes_in_buffers=0;
int i;
for (i=0; i<n_children; i++) {
node->u.n.childinfos[i] = (struct brtnode_nonleaf_childinfo){ .subtree_fingerprint = subtree_fingerprints[i],
.diskoff = children[i],
.n_bytes_in_buffer = 0 };
r = toku_fifo_create(&BNC_BUFFER(node,i)); if (r!=0) return r;
}
for (i=0; i+1<n_children; i++) {
node->u.n.childkeys[i] = kv_pair_malloc(keys[i], keylens[i], 0, 0);
node->u.n.totalchildkeylens += keylens[i];
}
*diskoff = node->thisnodename;
r = unpin_brtnode(brt, node);
if (r!=0) return r;
r = toku_unpin_brt_header(brt);
if (r!=0) return r;
return 0;
}
int toku_testsetup_root(BRT brt, DISKOFF diskoff) {
int r = toku_read_and_pin_brt_header(brt->cf, &brt->h);
if (r!=0) return r;
brt->h->unnamed_root = diskoff;
r = toku_unpin_brt_header(brt);
return r;
}
int toku_testsetup_get_sersize(BRT brt, DISKOFF diskoff) // Return the size on disk
{
void *node_v;
int r = toku_cachetable_get_and_pin(brt->cf, diskoff, &node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
assert(r==0);
int size = toku_serialize_brtnode_size(node_v);
r = unpin_brtnode(brt, node_v);
assert(r==0);
return size;
}
int toku_testsetup_insert_to_leaf (BRT brt, DISKOFF diskoff, char *key, int keylen, char *val, int vallen, u_int32_t *subtree_fingerprint) {
void *node_v;
int r;
r = toku_cachetable_get_and_pin(brt->cf, diskoff, &node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
if (r!=0) return r;
BRTNODE node=node_v;
assert(node->height==0);
DBT k,v;
int replaced_v_size;
enum pma_errors pma_status =
toku_pma_insert_or_replace(node->u.l.buffer,
toku_fill_dbt(&k, key, keylen),
toku_fill_dbt(&v, val, vallen),
&replaced_v_size,
(TOKULOGGER)0, (TXNID)0,
toku_cachefile_filenum(brt->cf),
node->thisnodename, node->rand4fingerprint,
&node->local_fingerprint,
&node->log_lsn);
assert(pma_status==BRT_OK);
if (replaced_v_size>=0) {
node->u.l.n_bytes_in_buffer += v.size - replaced_v_size;
} else {
node->u.l.n_bytes_in_buffer += k.size + v.size + KEY_VALUE_OVERHEAD + PMA_ITEM_OVERHEAD;
}
node->dirty=1;
*subtree_fingerprint = node->local_fingerprint;
r = unpin_brtnode(brt, node_v);
return r;
}
int toku_testsetup_insert_to_nonleaf (BRT brt, DISKOFF diskoff, enum brt_cmd_type cmdtype, char *key, int keylen, char *val, int vallen, u_int32_t *subtree_fingerprint) {
void *node_v;
int r;
r = toku_cachetable_get_and_pin(brt->cf, diskoff, &node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
if (r!=0) return r;
BRTNODE node=node_v;
assert(node->height>0);
DBT k,v;
int childnum = brtnode_which_child(node,
toku_fill_dbt(&k, key, keylen),
toku_fill_dbt(&v, val, vallen),
brt);
r = toku_fifo_enq(BNC_BUFFER(node, childnum), key, keylen, val, vallen, cmdtype, (TXNID)0);
assert(r==0);
u_int32_t fdelta = node->rand4fingerprint * toku_calccrc32_cmd(cmdtype, (TXNID)0, key, keylen, val, vallen);
node->local_fingerprint += fdelta;
*subtree_fingerprint += fdelta;
int sizediff = keylen + vallen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
node->u.n.n_bytes_in_buffers += sizediff;
BNC_NBYTESINBUF(node, childnum) += sizediff;
node->dirty = 1;
r = unpin_brtnode(brt, node_v);
return r;
}
...@@ -9,6 +9,8 @@ ...@@ -9,6 +9,8 @@
#endif #endif
#define _FILE_OFFSET_BITS 64 #define _FILE_OFFSET_BITS 64
#include "../include/db.h"
typedef struct brt *BRT; typedef struct brt *BRT;
struct brt_header; struct brt_header;
struct wbuf; struct wbuf;
...@@ -68,4 +70,26 @@ typedef struct intpairarray { ...@@ -68,4 +70,26 @@ typedef struct intpairarray {
typedef struct cachetable *CACHETABLE; typedef struct cachetable *CACHETABLE;
typedef struct cachefile *CACHEFILE; typedef struct cachefile *CACHEFILE;
/* tree command types */
enum brt_cmd_type {
BRT_NONE = 0,
BRT_INSERT = 1,
BRT_DELETE = 2,
BRT_DELETE_BOTH = 3,
};
/* tree commands */
struct brt_cmd {
enum brt_cmd_type type;
TXNID xid;
union {
/* insert or delete */
struct brt_cmd_insert_delete {
DBT *key;
DBT *val;
} id;
} u;
};
typedef struct brt_cmd BRT_CMD_S, *BRT_CMD;
#endif #endif
...@@ -106,6 +106,7 @@ int toku_cachefile_of_filenum (CACHETABLE t, FILENUM filenum, CACHEFILE *cf, BRT ...@@ -106,6 +106,7 @@ int toku_cachefile_of_filenum (CACHETABLE t, FILENUM filenum, CACHEFILE *cf, BRT
return ENOENT; return ENOENT;
} }
// If something goes wrong, close the fd. After this, the caller shouldn't close the fd, but instead should close the cachefile.
int toku_cachetable_openfd (CACHEFILE *cf, CACHETABLE t, int fd, BRT brt) { int toku_cachetable_openfd (CACHEFILE *cf, CACHETABLE t, int fd, BRT brt) {
int r; int r;
CACHEFILE extant; CACHEFILE extant;
...@@ -114,7 +115,7 @@ int toku_cachetable_openfd (CACHEFILE *cf, CACHETABLE t, int fd, BRT brt) { ...@@ -114,7 +115,7 @@ int toku_cachetable_openfd (CACHEFILE *cf, CACHETABLE t, int fd, BRT brt) {
struct fileid fileid; struct fileid fileid;
memset(&fileid, 0, sizeof(fileid)); memset(&fileid, 0, sizeof(fileid));
r=fstat(fd, &statbuf); r=fstat(fd, &statbuf);
if (r != 0) return errno; if (r != 0) { r=errno; close(fd); }
fileid.st_dev = statbuf.st_dev; fileid.st_dev = statbuf.st_dev;
fileid.st_ino = statbuf.st_ino; fileid.st_ino = statbuf.st_ino;
for (extant = t->cachefiles; extant; extant=extant->next) { for (extant = t->cachefiles; extant; extant=extant->next) {
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#include "toku_assert.h" #include "toku_assert.h"
#include "memory.h" #include "memory.h"
#include "fifo.h" #include "fifo.h"
#include "ybt.h"
static void fifo_init(struct fifo *fifo) { static void fifo_init(struct fifo *fifo) {
fifo->head = fifo->tail = 0; fifo->head = fifo->tail = 0;
...@@ -78,6 +79,10 @@ int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *d ...@@ -78,6 +79,10 @@ int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *d
return 0; return 0;
} }
int toku_fifo_enq_cmdstruct (FIFO fifo, const BRT_CMD cmd) {
return toku_fifo_enq(fifo, cmd->u.id.key->data, cmd->u.id.key->size, cmd->u.id.val->data, cmd->u.id.val->size, cmd->type, cmd->xid);
}
/* peek at the head (the oldest entry) of the fifo */ /* peek at the head (the oldest entry) of the fifo */
int toku_fifo_peek(FIFO fifo, bytevec *key, unsigned int *keylen, bytevec *data, unsigned int *datalen, int *type, TXNID *xid) { int toku_fifo_peek(FIFO fifo, bytevec *key, unsigned int *keylen, bytevec *data, unsigned int *datalen, int *type, TXNID *xid) {
struct fifo_entry *entry = fifo_peek(fifo); struct fifo_entry *entry = fifo_peek(fifo);
...@@ -91,6 +96,22 @@ int toku_fifo_peek(FIFO fifo, bytevec *key, unsigned int *keylen, bytevec *data, ...@@ -91,6 +96,22 @@ int toku_fifo_peek(FIFO fifo, bytevec *key, unsigned int *keylen, bytevec *data,
return 0; return 0;
} }
// fill in the BRT_CMD, using the two DBTs for the DBT part.
int toku_fifo_peek_cmdstruct (FIFO fifo, BRT_CMD cmd, DBT*key, DBT*data) {
int type;
bytevec keyb,datab;
unsigned int keylen,datalen;
int r = toku_fifo_peek(fifo, &keyb, &keylen, &datab, &datalen, &type, &cmd->xid);
if (r!=0) return r;
cmd->type=type;
toku_fill_dbt(key, keyb, keylen);
toku_fill_dbt(data, datab, datalen);
cmd->u.id.key=key;
cmd->u.id.val=data;
return 0;
}
int toku_fifo_deq(FIFO fifo) { int toku_fifo_deq(FIFO fifo) {
struct fifo_entry *entry = fifo_deq(fifo); struct fifo_entry *entry = fifo_deq(fifo);
if (entry == 0) return -1; // if entry is 0 then it was an empty fifo if (entry == 0) return -1; // if entry is 0 then it was an empty fifo
......
#ifndef FIFO_H
#define FIFO_H
#include "brttypes.h" #include "brttypes.h"
struct fifo_entry { struct fifo_entry {
...@@ -19,8 +21,10 @@ typedef struct fifo *FIFO; ...@@ -19,8 +21,10 @@ typedef struct fifo *FIFO;
int toku_fifo_create(FIFO *); int toku_fifo_create(FIFO *);
void toku_fifo_free(FIFO *); void toku_fifo_free(FIFO *);
int toku_fifo_n_entries(FIFO); int toku_fifo_n_entries(FIFO);
int toku_fifo_enq_cmdstruct (FIFO fifo, const BRT_CMD cmd);
int toku_fifo_enq (FIFO, const void *key, ITEMLEN keylen, const void *data, ITEMLEN datalen, int type, TXNID xid); int toku_fifo_enq (FIFO, const void *key, ITEMLEN keylen, const void *data, ITEMLEN datalen, int type, TXNID xid);
int toku_fifo_peek (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, int *type, TXNID *xid); int toku_fifo_peek (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, int *type, TXNID *xid);
int toku_fifo_peek_cmdstruct (FIFO, BRT_CMD, DBT*, DBT*); // fill in the BRT_CMD, using the two DBTs for the DBT part.
int toku_fifo_deq(FIFO); int toku_fifo_deq(FIFO);
int toku_fifo_peek_deq (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, int *type); int toku_fifo_peek_deq (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, int *type);
void toku_fifo_iterate (FIFO, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen,int type, TXNID xid, void*), void*); void toku_fifo_iterate (FIFO, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen,int type, TXNID xid, void*), void*);
...@@ -38,3 +42,4 @@ void toku_fifo_iterate (FIFO, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,I ...@@ -38,3 +42,4 @@ void toku_fifo_iterate (FIFO, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,I
} \ } \
}) })
#endif
...@@ -633,5 +633,7 @@ int toku_txnid2txn (TOKULOGGER logger, TXNID txnid, TOKUTXN *result) { ...@@ -633,5 +633,7 @@ int toku_txnid2txn (TOKULOGGER logger, TXNID txnid, TOKUTXN *result) {
return 0; return 0;
} }
} }
return -1; // If there is no txn, then we treat it as the null txn.
*result = 0;
return 0;
} }
...@@ -54,11 +54,13 @@ const struct logtype rollbacks[] = { ...@@ -54,11 +54,13 @@ const struct logtype rollbacks[] = {
const struct logtype logtypes[] = { const struct logtype logtypes[] = {
{"commit", 'C', FA{{"TXNID", "txnid", 0},NULLFIELD}}, {"commit", 'C', FA{{"TXNID", "txnid", 0},NULLFIELD}},
{"delete", 'D', FA{{"FILENUM", "filenum", 0}, #if 0
{"tl_delete", 'D', FA{{"FILENUM", "filenum", 0}, // tl logentries can be used, by themselves, to rebuild the whole DB from scratch.
{"DISKOFF", "diskoff", 0}, {"DISKOFF", "diskoff", 0},
{"BYTESTRING", "key", 0}, {"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0}, {"BYTESTRING", "data", 0},
NULLFIELD}}, NULLFIELD}},
#endif
{"fcreate", 'F', FA{{"TXNID", "txnid", 0}, {"fcreate", 'F', FA{{"TXNID", "txnid", 0},
{"BYTESTRING", "fname", 0}, {"BYTESTRING", "fname", 0},
{"u_int32_t", "mode", "0%o"}, {"u_int32_t", "mode", "0%o"},
......
...@@ -717,10 +717,12 @@ int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKULOGGER logger, TXNID xid, FILE ...@@ -717,10 +717,12 @@ int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKULOGGER logger, TXNID xid, FILE
*fingerprint += rand4fingerprint*toku_calccrc32_kvpair(k->data, k->size, v->data, v->size); *fingerprint += rand4fingerprint*toku_calccrc32_kvpair(k->data, k->size, v->data, v->size);
struct kv_pair *pair = pma->pairs[idx]; struct kv_pair *pair = pma->pairs[idx];
if (logger) {
{ {
TOKUTXN txn; TOKUTXN txn;
int r; int r;
if ((r=toku_txnid2txn(logger, xid, &txn))) return r; if ((r=toku_txnid2txn(logger, xid, &txn))) return r;
if (txn) {
const BYTESTRING key = { pair->keylen, toku_memdup(kv_pair_key_const(pair), pair->keylen) }; const BYTESTRING key = { pair->keylen, toku_memdup(kv_pair_key_const(pair), pair->keylen) };
const BYTESTRING data = { pair->vallen, toku_memdup(kv_pair_val_const(pair), pair->vallen) }; const BYTESTRING data = { pair->vallen, toku_memdup(kv_pair_val_const(pair), pair->vallen) };
if ((r = toku_logger_save_rollback_insertatleaf(txn, pma->filenum, key, data))) { if ((r = toku_logger_save_rollback_insertatleaf(txn, pma->filenum, key, data))) {
...@@ -728,12 +730,14 @@ int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKULOGGER logger, TXNID xid, FILE ...@@ -728,12 +730,14 @@ int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKULOGGER logger, TXNID xid, FILE
return r; return r;
} }
} }
}
{ {
const BYTESTRING key = { pair->keylen, kv_pair_key(pair) }; const BYTESTRING key = { pair->keylen, kv_pair_key(pair) };
const BYTESTRING data = { pair->vallen, kv_pair_val(pair) }; const BYTESTRING data = { pair->vallen, kv_pair_val(pair) };
int r = toku_log_insertinleaf (logger, xid, pma->filenum, diskoff, idx, key, data); int r = toku_log_insertinleaf (logger, xid, pma->filenum, diskoff, idx, key, data);
if (r!=0) return r; if (r!=0) return r;
if (logger && node_lsn) *node_lsn = toku_logger_last_lsn(logger); if (node_lsn) *node_lsn = toku_logger_last_lsn(logger);
}
} }
return 0; return 0;
...@@ -773,12 +777,15 @@ static int pma_log_delete (PMA pma, const char *key, int keylen, const char *val ...@@ -773,12 +777,15 @@ static int pma_log_delete (PMA pma, const char *key, int keylen, const char *val
} }
if (logger) { if (logger) {
TOKUTXN txn; TOKUTXN txn;
if (0!=toku_txnid2txn(logger, xid, &txn)) return -1; int r=toku_txnid2txn(logger, xid, &txn);
if (r!=0) return r;
if (txn) {
const BYTESTRING deletedkey = { keylen, toku_memdup(key, keylen) }; const BYTESTRING deletedkey = { keylen, toku_memdup(key, keylen) };
const BYTESTRING deleteddata = { vallen, toku_memdup(val, vallen) }; const BYTESTRING deleteddata = { vallen, toku_memdup(val, vallen) };
int r=toku_logger_save_rollback_deleteatleaf(txn, pma->filenum, deletedkey, deleteddata); r=toku_logger_save_rollback_deleteatleaf(txn, pma->filenum, deletedkey, deleteddata);
if (r!=0) { toku_free(deletedkey.data); toku_free(deleteddata.data); return r; if (r!=0) { toku_free(deletedkey.data); toku_free(deleteddata.data); return r;
} }
}
if (node_lsn) *node_lsn = toku_logger_last_lsn(logger); if (node_lsn) *node_lsn = toku_logger_last_lsn(logger);
} }
return 0; return 0;
...@@ -926,6 +933,7 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v, ...@@ -926,6 +933,7 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
{ {
TOKUTXN txn; TOKUTXN txn;
if ((r=toku_txnid2txn(logger, xid, &txn))) return r; if ((r=toku_txnid2txn(logger, xid, &txn))) return r;
if (txn) {
const BYTESTRING key = { k->size, toku_memdup(k->data, k->size) }; const BYTESTRING key = { k->size, toku_memdup(k->data, k->size) };
const BYTESTRING data = { v->size, toku_memdup(v->data, v->size) }; const BYTESTRING data = { v->size, toku_memdup(v->data, v->size) };
if ((r = toku_logger_save_rollback_insertatleaf(txn, pma->filenum, key, data))) { if ((r = toku_logger_save_rollback_insertatleaf(txn, pma->filenum, key, data))) {
...@@ -933,6 +941,7 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v, ...@@ -933,6 +941,7 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
return r; return r;
} }
} }
}
{ {
const BYTESTRING key = { k->size, k->data }; const BYTESTRING key = { k->size, k->data };
const BYTESTRING data = { v->size, v->data }; const BYTESTRING data = { v->size, v->data };
......
...@@ -96,8 +96,6 @@ void toku_recover_commit (LSN UU(lsn), TXNID UU(txnid)) { ...@@ -96,8 +96,6 @@ void toku_recover_commit (LSN UU(lsn), TXNID UU(txnid)) {
#define ABORTIT { le=le; txn=txn; fprintf(stderr, "%s:%d (%s) not ready to go\n", __FILE__, __LINE__, __func__); abort(); } #define ABORTIT { le=le; txn=txn; fprintf(stderr, "%s:%d (%s) not ready to go\n", __FILE__, __LINE__, __func__); abort(); }
void toku_recover_delete (LSN UU(lsn), FILENUM UU(filenum),DISKOFF UU(diskoff),BYTESTRING UU(key),BYTESTRING UU(data)) { assert(0); }
void toku_recover_fcreate (LSN UU(lsn), TXNID UU(txnid),BYTESTRING fname,u_int32_t mode) { void toku_recover_fcreate (LSN UU(lsn), TXNID UU(txnid),BYTESTRING fname,u_int32_t mode) {
char *fixed_fname = fixup_fname(&fname); char *fixed_fname = fixup_fname(&fname);
int fd = creat(fixed_fname, mode); int fd = creat(fixed_fname, mode);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment