Commit 34341f29 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

More surgery. Most of cursor stuff compiles now. Addresses #1195.

git-svn-id: file:///svn/toku/tokudb.1195@7326 c7de825b-a66e-492c-adef-691d508d4ae1
parent d3eb758a
......@@ -57,26 +57,6 @@ long long n_items_malloced;
static void verify_local_fingerprint_nonleaf (BRTNODE node);
static int toku_dump_brtnode (BRT brt, BLOCKNUM blocknum, int depth, bytevec lorange, ITEMLEN lolen, bytevec hirange, ITEMLEN hilen);
int toku_read_brt_header_and_store_in_cachefile (CACHEFILE cf, struct brt_header **header)
// If the cachefile already has the header, then just get it.
// If the cachefile has not been initialized, then don't modify anything.
{
{
struct brt_header *h;
if ((h=toku_cachefile_get_userdata(cf))!=0) {
*header = h;
return 0;
}
}
struct brt_header *h;
int r = toku_deserialize_brtheader_from(toku_cachefile_fd(cf), make_blocknum(0), &h);
if (r!=0) return r;
h->root_put_counter = global_root_put_counter++;
toku_cachefile_set_userdata(cf, (void*)h, toku_brtheader_close);
*header = h;
return 0;
}
typedef struct kvpair {
bytevec key;
unsigned int keylen;
......@@ -112,24 +92,6 @@ static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *node
//#define MAX_PATHLEN_TO_ROOT 40
static void find_heaviest_child (BRTNODE node, int *childnum) {
int max_child = 0;
int max_weight = BNC_NBYTESINBUF(node, 0);
int i;
if (0) printf("%s:%d weights: %d", __FILE__, __LINE__, max_weight);
assert(node->u.n.n_children>0);
for (i=1; i<node->u.n.n_children; i++) {
int this_weight = BNC_NBYTESINBUF(node,i);
if (0) printf(" %d", this_weight);
if (max_weight < this_weight) {
max_child = i;
max_weight = this_weight;
}
}
*childnum = max_child;
if (0) printf("\n");
}
static const char *unparse_cmd_type (enum brt_cmd_type typ) __attribute__((__unused__));
static const char *unparse_cmd_type (enum brt_cmd_type typ) {
......@@ -608,54 +570,8 @@ static int push_some_brt_cmds_down (BRT t, BRTNODE node, int childnum,
count++;
printf("%s:%d pushing %d count=%d\n", __FILE__, __LINE__, childnum, count);
}
{
bytevec key,val;
ITEMLEN keylen, vallen;
//printf("%s:%d Try random_pick, weight=%d \n", __FILE__, __LINE__, BNC_NBYTESINBUF(node, childnum));
assert(toku_fifo_n_entries(BNC_BUFFER(node,childnum))>0);
u_int32_t type;
TXNID xid;
while(0==toku_fifo_peek(BNC_BUFFER(node,childnum), &key, &keylen, &val, &vallen, &type, &xid)) {
int child_did_split=0; BRTNODE childa, childb;
DBT hk,hv;
DBT childsplitk;
BRT_CMD_S brtcmd = { (enum brt_cmd_type)type, xid, .u.id= {toku_fill_dbt(&hk, key, keylen),
toku_fill_dbt(&hv, val, vallen)} };
//printf("%s:%d random_picked\n", __FILE__, __LINE__);
toku_init_dbt(&childsplitk);
r = push_a_brt_cmd_down (t, node, child, childnum,
&brtcmd,
&child_did_split, &childa, &childb,
&childsplitk,
logger);
...
if (0){
unsigned int sum=0;
FIFO_ITERATE(BNC_BUFFER(node,childnum), subhk __attribute__((__unused__)), hkl, hd __attribute__((__unused__)), hdl, subtype __attribute__((__unused__)), subxid __attribute__((__unused__)),
sum+=hkl+hdl+KEY_VALUE_OVERHEAD+BRT_CMD_OVERHEAD);
printf("%s:%d sum=%u\n", __FILE__, __LINE__, sum);
assert(sum==BNC_NBYTESINBUF(node, childnum));
}
if (BNC_NBYTESINBUF(node, childnum)>0) assert(toku_fifo_n_entries(BNC_BUFFER(node,childnum))>0);
//printf("%s:%d %d=push_a_brt_cmd_down=(); child_did_split=%d (weight=%d)\n", __FILE__, __LINE__, r, child_did_split, BNC_NBYTESINBUF(node, childnum));
if (r!=0) return r;
if (child_did_split) {
// If the child splits, we don't push down any further.
r=handle_split_of_child (t, node, childnum,
childa, childb, &childsplitk,
did_split, nodea, nodeb, splitk,
logger);
//if (*did_split) {
// verify_local_fingerprint_nonleaf(*nodea);
// verify_local_fingerprint_nonleaf(*nodeb);
//}
return r; /* Don't do any more pushing if the child splits. */
}
}
if (0) printf("%s:%d done random picking\n", __FILE__, __LINE__);
}
assert(toku_serialize_brtnode_size(node)<=node->nodesize);
//verify_local_fingerprint_nonleaf(node);
r=toku_unpin_brtnode(t, child);
......@@ -1252,308 +1168,11 @@ static int brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
return r;
}
int toku_brt_create_cachetable(CACHETABLE *ct, long cachesize, LSN initial_lsn, TOKULOGGER logger) {
if (cachesize == 0)
cachesize = 128*1024*1024;
return toku_create_cachetable(ct, cachesize, initial_lsn, logger);
}
static int setup_initial_brt_root_node (BRT t, BLOCKNUM blocknum, TOKULOGGER logger) {
int r;
TAGMALLOC(BRTNODE, node);
assert(node);
node->ever_been_written = 0;
//printf("%s:%d\n", __FILE__, __LINE__);
initialize_brtnode(t, node, blocknum, 0);
// node->brt = t;
if (0) {
printf("%s:%d for tree %p node %p mdict_create--> %p\n", __FILE__, __LINE__, t, node, node->u.l.buffer);
printf("%s:%d put root at %" PRId64 "\n", __FILE__, __LINE__, blocknum.b);
}
//printf("%s:%d putting %p (%lld)\n", __FILE__, __LINE__, node, node->thisnodename);
u_int32_t fullhash = toku_cachetable_hash(t->cf, blocknum);
node->fullhash = fullhash;
r=toku_cachetable_put(t->cf, blocknum, fullhash,
node, brtnode_memory_size(node),
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t->h);
if (r!=0) {
toku_free(node);
return r;
}
// verify_local_fingerprint_nonleaf(node);
toku_log_newbrtnode(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), blocknum, 0, t->h->nodesize, (unsigned char)((t->flags&TOKU_DB_DUPSORT)!=0), node->rand4fingerprint);
r = toku_unpin_brtnode(t, node);
if (r!=0) {
toku_free(node);
return r;
}
return 0;
}
//#define BRT_TRACE
#ifdef BRT_TRACE
#define WHEN_BRTTRACE(x) x
#else
#define WHEN_BRTTRACE(x) ((void)0)
#endif
int toku_brt_set_flags(BRT brt, unsigned int flags) {
brt->did_set_flags = 1;
brt->flags = flags;
return 0;
}
int toku_brt_get_flags(BRT brt, unsigned int *flags) {
*flags = brt->flags;
return 0;
}
int toku_brt_set_nodesize(BRT brt, unsigned int nodesize) {
brt->nodesize = nodesize;
return 0;
}
int toku_brt_get_nodesize(BRT brt, unsigned int *nodesize) {
*nodesize = brt->nodesize;
return 0;
}
int toku_brt_set_bt_compare(BRT brt, int (*bt_compare)(DB *, const DBT*, const DBT*)) {
brt->compare_fun = bt_compare;
return 0;
}
int toku_brt_set_dup_compare(BRT brt, int (*dup_compare)(DB *, const DBT*, const DBT*)) {
brt->dup_compare = dup_compare;
return 0;
}
int toku_brt_get_fd(BRT brt, int *fdp) {
*fdp = toku_cachefile_fd(brt->cf);
return 0;
}
// open a file for use by the brt. if the file does not exist, create it.
static int brt_open_file(BRT brt, const char *fname, const char *fname_in_env, int is_create, TOKUTXN txn, int *fdp) {
brt = brt;
mode_t mode = 0777;
int r;
int fd = open(fname, O_RDWR, mode);
if (fd==-1) {
r = errno;
if (errno == ENOENT) {
if (!is_create) {
return r;
}
fd = open(fname, O_RDWR | O_CREAT, mode);
if (fd == -1) {
r = errno; return r;
}
r = toku_logger_log_fcreate(txn, fname_in_env, mode);
if (r != 0) {
close(fd); return r;
}
} else
return r;
}
*fdp = fd;
return 0;
}
// allocate and initialize a brt header.
// t->cf is not set to anything.
static int brt_alloc_init_header(BRT t, const char *dbname, TOKUTXN txn) {
int r;
BLOCKNUM root = make_blocknum(1);
assert(t->h == 0);
if ((MALLOC(t->h))==0) {
assert(errno==ENOMEM);
r = ENOMEM;
if (0) { died2: toku_free(t->h); }
t->h=0;
return r;
}
t->h->dirty=1;
if ((MALLOC_N(1, t->h->flags_array))==0) { r = errno; if (0) { died3: toku_free(t->h->flags_array); } goto died2; }
t->h->flags_array[0] = t->flags;
t->h->nodesize=t->nodesize;
t->h->free_blocks = make_blocknum(-1);
t->h->unused_blocks=make_blocknum(2);
t->h->translated_blocknum_limit = 0;
t->h->block_translation = 0;
t->h->block_translation_size_on_disk = 0;
t->h->block_translation_address_on_disk = 0;
// printf("%s:%d translated_blocknum_limit=%ld, block_translation_address_on_disk=%ld\n", __FILE__, __LINE__, t->h->translated_blocknum_limit, t->h->block_translation_address_on_disk);
create_block_allocator(&t->h->block_allocator, t->nodesize, BLOCK_ALLOCATOR_ALIGNMENT);
toku_fifo_create(&t->h->fifo);
t->h->root_put_counter = global_root_put_counter++;
if (dbname) {
t->h->n_named_roots = 1;
if ((MALLOC_N(1, t->h->names))==0) { assert(errno==ENOMEM); r=ENOMEM; if (0) { died4: if (dbname) toku_free(t->h->names); } goto died3; }
if ((MALLOC_N(1, t->h->roots))==0) { assert(errno==ENOMEM); r=ENOMEM; if (0) { died5: if (dbname) toku_free(t->h->roots); } goto died4; }
if ((MALLOC_N(1, t->h->root_hashes))==0) { assert(errno==ENOMEM); r=ENOMEM; if (0) { died6: if (dbname) toku_free(t->h->root_hashes); } goto died5; }
if ((t->h->names[0] = toku_strdup(dbname))==0) { assert(errno==ENOMEM); r=ENOMEM; if (0) { died7: if (dbname) toku_free(t->h->names[0]); } goto died6; }
t->h->roots[0] = root; // Block 0 is the header. Block 1 is the root.
compute_and_fill_remembered_hash(t, 0);
} else {
MALLOC_N(1, t->h->roots); assert(t->h->roots);
MALLOC_N(1, t->h->root_hashes); assert(t->h->root_hashes);
t->h->roots[0] = root;
compute_and_fill_remembered_hash(t, 0);
t->h->n_named_roots = -1;
t->h->names=0;
}
{
LOGGEDBRTHEADER lh = {.size= toku_serialize_brt_header_size(t->h),
.flags = t->flags,
.nodesize = t->h->nodesize,
.free_blocks = t->h->free_blocks,
.unused_blocks = t->h->unused_blocks,
.n_named_roots = t->h->n_named_roots };
if (t->h->n_named_roots>=0) {
lh.u.many.names = t->h->names;
lh.u.many.roots = t->h->roots;
} else {
lh.u.one.root = t->h->roots[0];
}
if ((r=toku_log_fheader(toku_txn_logger(txn), (LSN*)0, 0, toku_txn_get_txnid(txn), toku_cachefile_filenum(t->cf), lh))) { goto died7; }
}
if ((r=setup_initial_brt_root_node(t, root, toku_txn_logger(txn)))!=0) { goto died7; }
//printf("%s:%d putting %p (%d)\n", __FILE__, __LINE__, t->h, 0);
assert(t->h->free_blocks.b==-1);
toku_cachefile_set_userdata(t->cf, t->h, toku_brtheader_close);
return r;
}
int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char *dbname, int is_create, int only_create, CACHETABLE cachetable, TOKUTXN txn, DB *db) {
/* If dbname is NULL then we setup to hold a single tree. Otherwise we setup an array. */
int r;
char *malloced_name=0;
int db_index;
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, get_n_items_malloced()); toku_print_malloced_items();
WHEN_BRTTRACE(fprintf(stderr, "BRTTRACE: %s:%d toku_brt_open(%s, \"%s\", %d, %p, %d, %p)\n",
__FILE__, __LINE__, fname, dbname, is_create, newbrt, nodesize, cachetable));
if (0) { died0: assert(r); return r; }
assert(is_create || !only_create);
t->fname = toku_strdup(fname_in_env);
if (t->fname==0) {
r = errno;
if (0) { died00: if (t->fname) toku_free(t->fname); t->fname=0; }
goto died0;
}
if (dbname) {
malloced_name = toku_strdup(dbname);
if (malloced_name==0) {
r = ENOMEM;
if (0) { died0a: if(malloced_name) toku_free(malloced_name); }
goto died00;
}
}
t->database_name = malloced_name;
t->db = db;
t->txn_that_created = 0; // Uses 0 for no transaction.
{
int fd = -1;
r = brt_open_file(t, fname, fname_in_env, is_create, txn, &fd);
if (r != 0) {
t->database_name = 0; goto died0a;
}
r=toku_cachetable_openfd(&t->cf, cachetable, fd, fname_in_env);
if (r != 0) goto died0a;
toku_logger_log_fopen(txn, fname_in_env, toku_cachefile_filenum(t->cf));
}
if (r!=0) {
if (0) { died_after_open: toku_cachefile_close(&t->cf, toku_txn_logger(txn)); }
t->database_name = 0;
goto died0a;
}
assert(t->nodesize>0);
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, get_n_items_malloced()); toku_print_malloced_items();
if (0) {
died_after_read_and_pin:
goto died_after_open;
}
if (is_create) {
r = toku_read_brt_header_and_store_in_cachefile(t->cf, &t->h);
if (r==-1) {
r = brt_alloc_init_header(t, dbname, txn);
if (r != 0) goto died_after_read_and_pin;
}
else if (r!=0) {
goto died_after_read_and_pin;
}
else {
int i;
assert(r==0);
assert(dbname);
if (t->h->n_named_roots<0) { r=EINVAL; goto died_after_read_and_pin; } // Cannot create a subdb in a file that is not enabled for subdbs
assert(t->h->n_named_roots>=0);
for (i=0; i<t->h->n_named_roots; i++) {
if (strcmp(t->h->names[i], dbname)==0) {
if (only_create) {
r = EEXIST;
goto died_after_read_and_pin;
}
else {
db_index = i;
goto found_it;
}
}
}
if ((t->h->names = toku_realloc(t->h->names, (1+t->h->n_named_roots)*sizeof(*t->h->names))) == 0) { assert(errno==ENOMEM); r=ENOMEM; goto died_after_read_and_pin; }
if ((t->h->roots = toku_realloc(t->h->roots, (1+t->h->n_named_roots)*sizeof(*t->h->roots))) == 0) { assert(errno==ENOMEM); r=ENOMEM; goto died_after_read_and_pin; }
if ((t->h->root_hashes = toku_realloc(t->h->root_hashes, (1+t->h->n_named_roots)*sizeof(*t->h->root_hashes))) == 0) { assert(errno==ENOMEM); r=ENOMEM; goto died_after_read_and_pin; }
if ((t->h->flags_array = toku_realloc(t->h->flags_array, (1+t->h->n_named_roots)*sizeof(*t->h->flags_array))) == 0) { assert(errno==ENOMEM); r=ENOMEM; goto died_after_read_and_pin; }
t->h->flags_array[t->h->n_named_roots] = t->flags;
t->h->n_named_roots++;
if ((t->h->names[t->h->n_named_roots-1] = toku_strdup(dbname)) == 0) { assert(errno==ENOMEM); r=ENOMEM; goto died_after_read_and_pin; }
//printf("%s:%d t=%p\n", __FILE__, __LINE__, t);
r = allocate_diskblocknumber(&t->h->roots[t->h->n_named_roots-1], t, toku_txn_logger(txn));
if (r!=0) goto died_after_read_and_pin;
t->h->dirty = 1;
compute_and_fill_remembered_hash(t, t->h->n_named_roots-1);
if ((r=setup_initial_brt_root_node(t, t->h->roots[t->h->n_named_roots-1], toku_txn_logger(txn)))!=0) goto died_after_read_and_pin;
}
} else {
if ((r = toku_read_brt_header_and_store_in_cachefile(t->cf, &t->h))!=0) goto died_after_open;
if (!dbname) {
if (t->h->n_named_roots!=-1) { r = EINVAL; goto died_after_read_and_pin; } // requires a subdb
db_index=0;
} else {
int i;
if (t->h->n_named_roots==-1) { r = EINVAL; goto died_after_read_and_pin; } // no suddbs in the db
// printf("%s:%d n_roots=%d\n", __FILE__, __LINE__, t->h->n_named_roots);
for (i=0; i<t->h->n_named_roots; i++) {
if (strcmp(t->h->names[i], dbname)==0) {
db_index=i;
goto found_it;
}
}
r=ENOENT; /* the database doesn't exist */
goto died_after_read_and_pin;
}
found_it:
t->nodesize = t->h->nodesize; /* inherit the pagesize from the file */
if (!t->did_set_flags) {
t->flags = t->h->flags_array[db_index];
} else {
if (t->flags != t->h->flags_array[db_index]) { /* if flags have been set then flags must match */
r = EINVAL; goto died_after_read_and_pin;
}
}
}
assert(t->h);
WHEN_BRTTRACE(fprintf(stderr, "BRTTRACE -> %p\n", t));
return 0;
}
int toku_brt_reopen(BRT brt, const char *fname, const char *fname_in_env, TOKUTXN txn) {
int r;
......@@ -1613,29 +1232,6 @@ int toku_brt_remove_subdb(BRT brt, const char *dbname, u_int32_t flags) {
}
// This one has no env
int toku_open_brt (const char *fname, const char *dbname, int is_create, BRT *newbrt, int nodesize, CACHETABLE cachetable, TOKUTXN txn,
int (*compare_fun)(DB*,const DBT*,const DBT*), DB *db) {
BRT brt;
int r;
const int only_create = 0;
r = toku_brt_create(&brt);
if (r != 0)
return r;
toku_brt_set_nodesize(brt, nodesize);
toku_brt_set_bt_compare(brt, compare_fun);
r = toku_brt_open(brt, fname, fname, dbname, is_create, only_create, cachetable, txn, db);
if (r != 0) {
return r;
}
*newbrt = brt;
return r;
}
int toku_brt_flush (BRT brt) {
return toku_cachefile_flush(brt->cf);
}
......@@ -1754,37 +1350,6 @@ int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
return r;
}
int toku_brt_lookup (BRT brt, DBT *k, DBT *v) {
int r, rr;
BRT_CURSOR cursor;
rr = toku_brt_cursor(brt, &cursor, 1);
if (rr != 0) return rr;
int op = brt->flags & TOKU_DB_DUPSORT ? DB_GET_BOTH : DB_SET;
r = toku_brt_cursor_get(cursor, k, v, op, 0);
rr = toku_brt_cursor_close(cursor); assert(rr == 0);
return r;
}
int toku_brt_delete(BRT brt, DBT *key, TOKUTXN txn) {
int r;
if (txn && (brt->txn_that_created != toku_txn_get_txnid(txn))) {
BYTESTRING keybs = {key->size, toku_memdup_in_rollback(txn, key->data, key->size)};
toku_cachefile_refup(brt->cf);
r = toku_logger_save_rollback_cmddelete(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), keybs);
if (r!=0) return r;
r = toku_txn_note_brt(txn, brt);
if (r!=0) return r;
}
DBT val;
BRT_CMD_S brtcmd = { BRT_DELETE_ANY, toku_txn_get_txnid(txn), .u.id={key, toku_init_dbt(&val)}};
r = toku_brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn));
return r;
}
int toku_brt_delete_both(BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
//{ unsigned i; printf("del %p keylen=%d key={", brt->db, key->size); for(i=0; i<key->size; i++) printf("%d,", ((char*)key->data)[i]); printf("} datalen=%d data={", val->size); for(i=0; i<val->size; i++) printf("%d,", ((char*)val->data)[i]); printf("}\n"); }
int r;
......@@ -1802,79 +1367,6 @@ int toku_brt_delete_both(BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
return r;
}
static int
toku_dump_brtnode (BRT brt, BLOCKNUM blocknum, int depth, bytevec lorange, ITEMLEN lolen, bytevec hirange, ITEMLEN hilen) {
int result=0;
BRTNODE node;
void *node_v;
u_int32_t fullhash = toku_cachetable_hash(brt->cf, blocknum);
int r = toku_cachetable_get_and_pin(brt->cf, blocknum, fullhash,
&node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt->h);
assert(r==0);
printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v);
node=node_v;
assert(node->fullhash==fullhash);
result=toku_verify_brtnode(brt, blocknum, lorange, lolen, hirange, hilen, 0);
printf("%*sNode=%p\n", depth, "", node);
if (node->height>0) {
printf("%*sNode %"PRId64" nodesize=%u height=%d n_children=%d n_bytes_in_buffers=%u keyrange=%s %s\n",
depth, "", blocknum.b, node->nodesize, node->height, node->u.n.n_children, node->u.n.n_bytes_in_buffers, (char*)lorange, (char*)hirange);
//printf("%s %s\n", lorange ? lorange : "NULL", hirange ? hirange : "NULL");
{
int i;
for (i=0; i< node->u.n.n_children; i++) {
printf("%*schild %d buffered (%d entries):\n", depth+1, "", i, toku_fifo_n_entries(BNC_BUFFER(node,i)));
FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, xid,
{
data=data; datalen=datalen; keylen=keylen;
printf("%*s xid=%"PRIu64" %u (type=%d)\n", depth+2, "", xid, ntohl(*(int*)key), type);
//assert(strlen((char*)key)+1==keylen);
//assert(strlen((char*)data)+1==datalen);
});
}
for (i=0; i<node->u.n.n_children; i++) {
printf("%*schild %d\n", depth, "", i);
if (i>0) {
printf("%*spivot %d len=%u %u\n", depth+1, "", i-1, node->u.n.childkeys[i-1]->keylen, ntohl(*(int*)&node->u.n.childkeys[i-1]->key));
}
toku_dump_brtnode(brt, BNC_BLOCKNUM(node, i), depth+4,
(i==0) ? lorange : node->u.n.childkeys[i-1],
(i==0) ? lolen : toku_brt_pivot_key_len(brt, node->u.n.childkeys[i-1]),
(i==node->u.n.n_children-1) ? hirange : node->u.n.childkeys[i],
(i==node->u.n.n_children-1) ? hilen : toku_brt_pivot_key_len(brt, node->u.n.childkeys[i])
);
}
}
} else {
printf("%*sNode %" PRId64 " nodesize=%u height=%d n_bytes_in_buffer=%u keyrange=%u %u\n",
depth, "", blocknum.b, node->nodesize, node->height, node->u.l.n_bytes_in_buffer, lorange ? ntohl(*(int*)lorange) : 0, hirange ? ntohl(*(int*)hirange) : 0);
int size = toku_omt_size(node->u.l.buffer);
int i;
for (i=0; i<size; i++) {
OMTVALUE v;
r = toku_omt_fetch(node->u.l.buffer, i, &v, 0);
assert(r==0);
printf(" [%d]=", i);
print_leafentry(stdout, v);
}
// printf(" (%d)%u ", len, *(int*)le_any_key(data)));
printf("\n");
}
r = toku_cachetable_unpin(brt->cf, blocknum, fullhash, 0, 0);
assert(r==0);
return result;
}
int toku_dump_brt (BRT brt) {
CACHEKEY *rootp;
assert(brt->h);
u_int32_t fullhash;
rootp = toku_calculate_root_offset_pointer(brt, &fullhash);
printf("split_count=%d\n", split_count);
return toku_dump_brtnode(brt, *rootp, 0, 0, 0, 0, 0);
}
#if 0
static int show_brtnode_blocknumbers (BRT brt, DISKOFF off) {
BRTNODE node;
......@@ -1929,332 +1421,11 @@ static inline void brt_split_init(BRT_SPLIT *split) {
static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKULOGGER logger, OMTCURSOR);
/* search in a node's child */
static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKULOGGER logger, OMTCURSOR omtcursor) {
int r, rr;
/* if the child's buffer is not empty then try to empty it */
if (BNC_NBYTESINBUF(node, childnum) > 0) {
rr = push_some_brt_cmds_down(brt, node, childnum, &split->did_split, &split->nodea, &split->nodeb, &split->splitk, logger);
assert(rr == 0);
/* push down may cause a child split, so childnum may not be appropriate, and the node itself may split, so retry */
return EAGAIN;
}
void *node_v;
BLOCKNUM childblocknum = BNC_BLOCKNUM(node,childnum);
u_int32_t fullhash = compute_child_fullhash(brt->cf, node, childnum);
rr = toku_cachetable_get_and_pin(brt->cf, childblocknum, fullhash, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt->h);
assert(rr == 0);
for (;;) {
BRTNODE childnode = node_v;
BRT_SPLIT childsplit; brt_split_init(&childsplit);
r = brt_search_node(brt, childnode, search, newkey, newval, &childsplit, logger, omtcursor);
if (childsplit.did_split) {
rr = handle_split_of_child(brt, node, childnum, childsplit.nodea, childsplit.nodeb, &childsplit.splitk,
&split->did_split, &split->nodea, &split->nodeb, &split->splitk, logger);
assert(rr == 0);
break;
} else {
if (r == EAGAIN)
continue;
rr = toku_cachetable_unpin(brt->cf, childnode->thisnodename, childnode->fullhash, childnode->dirty, brtnode_memory_size(childnode));
assert(rr == 0);
break;
}
}
return r;
}
static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKULOGGER logger, OMTCURSOR omtcursor) {
int r;
int c;
/* binary search is overkill for a small array */
int child[node->u.n.n_children];
/* scan left to right or right to left depending on the search direction */
for (c = 0; c < node->u.n.n_children; c++)
child[c] = search->direction & BRT_SEARCH_LEFT ? c : node->u.n.n_children - 1 - c;
for (c = 0; c < node->u.n.n_children-1; c++) {
int p = search->direction & BRT_SEARCH_LEFT ? child[c] : child[c] - 1;
struct kv_pair *pivot = node->u.n.childkeys[p];
DBT pivotkey, pivotval;
if (search->compare(search,
toku_fill_dbt(&pivotkey, kv_pair_key(pivot), kv_pair_keylen(pivot)),
brt->flags & TOKU_DB_DUPSORT ? toku_fill_dbt(&pivotval, kv_pair_val(pivot), kv_pair_vallen(pivot)): 0)) {
r = brt_search_child(brt, node, child[c], search, newkey, newval, split, logger, omtcursor);
if (r == 0 || r == EAGAIN) {
return r;
}
}
}
/* check the first (left) or last (right) node if nothing has been found */
return brt_search_child(brt, node, child[c], search, newkey, newval, split, logger, omtcursor);
}
static int
pair_leafval_bessel_le_committed (u_int32_t klen, void *kval,
u_int32_t dlen, void *dval,
brt_search_t *search) {
DBT x,y;
int cmp = search->compare(search,
search->k ? toku_fill_dbt(&x, kval, klen) : 0,
search->v ? toku_fill_dbt(&y, dval, dlen) : 0);
// The search->compare function returns only 0 or 1
switch (search->direction) {
case BRT_SEARCH_LEFT: return cmp==0 ? -1 : +1;
case BRT_SEARCH_RIGHT: return cmp==0 ? +1 : -1; // Because the comparison runs backwards for right searches.
}
assert(0);
return 0;
}
static int
pair_leafval_bessel_le_both (TXNID xid __attribute__((__unused__)),
u_int32_t klen, void *kval,
u_int32_t clen __attribute__((__unused__)), void *cval __attribute__((__unused__)),
u_int32_t plen, void *pval,
brt_search_t *search) {
return pair_leafval_bessel_le_committed(klen, kval, plen, pval, search);
}
static int
pair_leafval_bessel_le_provdel (TXNID xid __attribute__((__unused__)),
u_int32_t klen, void *kval,
u_int32_t clen, void *cval,
brt_search_t *be) {
return pair_leafval_bessel_le_committed(klen, kval, clen, cval, be);
}
static int
pair_leafval_bessel_le_provpair (TXNID xid __attribute__((__unused__)),
u_int32_t klen, void *kval,
u_int32_t plen, void *pval,
brt_search_t *be) {
return pair_leafval_bessel_le_committed(klen, kval, plen, pval, be);
}
static int bessel_from_search_t (OMTVALUE lev, void *extra) {
LEAFENTRY leafval=lev;
brt_search_t *search = extra;
LESWITCHCALL(leafval, pair_leafval_bessel, search);
abort(); return 0;
}
static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, TOKULOGGER logger, OMTCURSOR omtcursor) {
// Now we have to convert from brt_search_t to the bessel function with a direction. What a pain...
int direction;
switch (search->direction) {
case BRT_SEARCH_LEFT: direction = +1; goto ok;
case BRT_SEARCH_RIGHT: direction = -1; goto ok;
}
return EINVAL; // This return and the goto are a hack to get both compile-time and run-time checking on enum
ok: ;
OMTVALUE datav;
u_int32_t idx = 0;
int r = toku_omt_find(node->u.l.buffer,
bessel_from_search_t,
search,
direction,
&datav, &idx, omtcursor);
if (r!=0) return r;
LEAFENTRY le = datav;
if (le_is_provdel(le)) {
TXNID xid = le_any_xid(le);
TOKUTXN txn = 0;
toku_txn_find_by_xid(brt, xid, &txn);
// Provisionally deleted stuff is gone.
// So we need to scan in the direction to see if we can find something
while (1) {
// see if the transaction is alive
TXNID newxid = le_any_xid(le);
if (newxid != xid) {
xid = newxid;
txn = 0;
toku_txn_find_by_xid(brt, xid, &txn);
}
switch (search->direction) {
case BRT_SEARCH_LEFT:
if (txn) {
// printf("xid %llu -> %p\n", (unsigned long long) xid, txn);
idx++;
} else {
// apply a commit message for this leafentry to the node
// printf("apply commit_both %llu\n", (unsigned long long) xid);
DBT key, val;
BRT_CMD_S brtcmd = { BRT_COMMIT_BOTH, xid, .u.id= {toku_fill_dbt(&key, le_latest_key(le), le_latest_keylen(le)),
toku_fill_dbt(&val, le_latest_val(le), le_latest_vallen(le))} };
r = brt_leaf_apply_cmd_once(brt, node, &brtcmd, logger, idx, le);
assert(r == 0);
}
if (idx>=toku_omt_size(node->u.l.buffer)) return DB_NOTFOUND;
break;
case BRT_SEARCH_RIGHT:
if (idx==0) return DB_NOTFOUND;
idx--;
break;
}
if (idx>=toku_omt_size(node->u.l.buffer)) continue;
r = toku_omt_fetch(node->u.l.buffer, idx, &datav, omtcursor);
assert(r==0); // we just validated the index
le = datav;
if (!le_is_provdel(le)) goto got_a_good_value;
}
}
got_a_good_value:
if (newkey || newval) {
bytevec key = newkey ? le_latest_key(le) : NULL;
u_int32_t key_len = newkey ? le_latest_keylen(le) : 0;
bytevec val = newval ? le_latest_val(le) : NULL;
u_int32_t val_len = newval ? le_latest_vallen(le) : 0;
r = toku_dbt_set_two_values(newkey, &key, key_len, &brt->skey, FALSE,
newval, &val, val_len, &brt->sval, FALSE);
if (r!=0) return r;
}
return 0;
}
static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKULOGGER logger, OMTCURSOR omtcursor) {
if (node->height > 0)
return brt_search_nonleaf_node(brt, node, search, newkey, newval, split, logger, omtcursor);
else
return brt_search_leaf_node(brt, node, search, newkey, newval, logger, omtcursor);
}
static int
toku_brt_search (BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, TOKULOGGER logger, OMTCURSOR omtcursor, u_int64_t *root_put_counter)
// Effect: Perform a search. Associate cursor with a leaf if possible.
{
int r, rr;
assert(brt->h);
*root_put_counter = brt->h->root_put_counter;
u_int32_t fullhash;
CACHEKEY *rootp = toku_calculate_root_offset_pointer(brt, &fullhash);
void *node_v;
//assert(fullhash == toku_cachetable_hash(brt->cf, *rootp));
rr = toku_cachetable_get_and_pin(brt->cf, *rootp, fullhash,
&node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt->h);
assert(rr == 0);
BRTNODE node = node_v;
// push the fifo sutff
{
DBT okey,odata;
BRT_CMD_S ocmd;
while (0==toku_fifo_peek_cmdstruct(brt->h->fifo, &ocmd, &okey, &odata)) {
if ((r = push_something_simple(brt, &node, rootp, &ocmd, logger))) return r;
r = toku_fifo_deq(brt->h->fifo);
assert(r==0);
}
}
for (;;) {
BRT_SPLIT split; brt_split_init(&split);
r = brt_search_node(brt, node, search, newkey, newval, &split, logger, omtcursor);
if (split.did_split) {
rr = brt_init_new_root(brt, split.nodea, split.nodeb, split.splitk, rootp, 0, &node);
assert(rr == 0);
}
if (r != EAGAIN)
break;
}
rr = toku_unpin_brtnode(brt, node);
assert(rr == 0);
return r;
}
static BOOL brt_cursor_not_set(BRT_CURSOR cursor) {
return (BOOL)((cursor->key.data == 0)
||
(cursor->val.data == 0));
}
BOOL toku_brt_cursor_uninitialized(BRT_CURSOR c) {
return brt_cursor_not_set(c);
}
static inline void load_dbts_from_omt(BRT_CURSOR c, DBT *key, DBT *val) {
OMTVALUE le = 0;
int r = toku_omt_cursor_current(c->omtcursor, &le);
assert(r==0);
if (key) {
key->data = le_latest_key(le);
key->size = le_latest_keylen(le);
}
if (val) {
val->data = le_latest_val(le);
val->size = le_latest_vallen(le);
}
}
static void brt_cursor_invalidate_callback(OMTCURSOR UU(omt_c), void *extra) {
BRT_CURSOR cursor = extra;
if (cursor->current_in_omt) {
assert(cursor->key.flags==DB_DBT_REALLOC);
assert(cursor->val.flags==DB_DBT_REALLOC);
DBT key,val;
int r;
load_dbts_from_omt(cursor, toku_init_dbt(&key), toku_init_dbt(&val));
//Make certain not to try to free the omt's memory.
toku_init_dbt(&cursor->key)->flags = DB_DBT_REALLOC;
toku_init_dbt(&cursor->val)->flags = DB_DBT_REALLOC;
r = toku_dbt_set_two_values(&cursor->key, (bytevec*)&key.data, key.size, NULL, FALSE,
&cursor->val, (bytevec*)&val.data, val.size, NULL, FALSE);
//TODO: Find some way to deal with ENOMEM here.
assert(r==0);
cursor->current_in_omt = FALSE;
}
if (cursor->prev_in_omt) {
toku_init_dbt(&cursor->prevkey)->flags = DB_DBT_REALLOC;
toku_init_dbt(&cursor->prevval)->flags = DB_DBT_REALLOC;
cursor->prev_in_omt = FALSE;
}
}
int toku_brt_cursor (BRT brt, BRT_CURSOR *cursorptr, int is_temporary_cursor) {
BRT_CURSOR cursor = toku_malloc(sizeof *cursor);
if (cursor == 0)
return ENOMEM;
cursor->brt = brt;
toku_init_dbt(&cursor->key); cursor->key.flags = DB_DBT_REALLOC;
toku_init_dbt(&cursor->val); cursor->val.flags = DB_DBT_REALLOC;
toku_init_dbt(&cursor->prevkey); cursor->prevkey.flags = DB_DBT_REALLOC;
toku_init_dbt(&cursor->prevval); cursor->prevval.flags = DB_DBT_REALLOC;
cursor->current_in_omt = FALSE;
cursor->prev_in_omt = FALSE;
list_push(&brt->cursors, &cursor->cursors_link);
cursor->is_temporary_cursor=is_temporary_cursor;
cursor->skey = cursor->sval = 0;
int r = toku_omt_cursor_create(&cursor->omtcursor);
assert(r==0);
toku_omt_cursor_set_invalidate_callback(cursor->omtcursor,
brt_cursor_invalidate_callback, cursor);
cursor->root_put_counter=0;
*cursorptr = cursor;
return 0;
}
DBT *brt_cursor_peek_prev_key(BRT_CURSOR cursor)
// Effect: Return a pointer to a DBT for the previous key.
// Requires: The caller may not modify that DBT or the memory at which it points.
......@@ -2294,35 +1465,6 @@ DBT *brt_cursor_peek_current_val(BRT_CURSOR cursor)
return &cursor->val;
}
static inline int compare_k_x(BRT brt, DBT *k, DBT *x) {
return brt->compare_fun(brt->db, k, x);
}
static inline int compare_v_y(BRT brt, DBT *v, DBT *y) {
return brt->dup_compare(brt->db, v, y);
}
static inline int compare_kv_xy(BRT brt, DBT *k, DBT *v, DBT *x, DBT *y) {
int cmp = brt->compare_fun(brt->db, k, x);
if (cmp == 0 && v && y)
cmp = brt->dup_compare(brt->db, v, y);
return cmp;
}
static inline int brt_cursor_copyout(BRT_CURSOR cursor, DBT *key, DBT *val) {
//Passing in NULL for both key and val is used with light weight cursors.
//Retrieval of key and val will use the peek functions.
if (!key && !val) return 0;
int r = 0;
void** key_staticp = cursor->is_temporary_cursor ? &cursor->brt->skey : &cursor->skey;
void** val_staticp = cursor->is_temporary_cursor ? &cursor->brt->sval : &cursor->sval;
if (cursor->current_in_omt) load_dbts_from_omt(cursor, &cursor->key, &cursor->val);
r = toku_dbt_set_two_values(key, (bytevec*)&cursor->key.data, cursor->key.size, key_staticp, FALSE,
val, (bytevec*)&cursor->val.data, cursor->val.size, val_staticp, FALSE);
return r;
}
int toku_brt_dbt_set(DBT* key, DBT* key_source) {
int r = toku_dbt_set_value(key, (bytevec*)&key_source->data, key_source->size, NULL, FALSE);
return r;
......@@ -2353,166 +1495,11 @@ int toku_brt_cursor_dbts_set_with_dat(BRT_CURSOR cursor, BRT pdb,
return r;
}
static int brt_cursor_compare_set(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
return compare_kv_xy(brt, search->k, search->v, x, y) <= 0; /* return min xy: kv <= xy */
}
static int brt_cursor_current(BRT_CURSOR cursor, int op, DBT *outkey, DBT *outval, TOKULOGGER logger) {
if (brt_cursor_not_set(cursor))
return EINVAL;
if (op == DB_CURRENT) {
int r = ENOSYS;
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_REALLOC;
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_REALLOC;
brt_cursor_invalidate_callback(cursor->omtcursor, cursor);
brt_search_t search; brt_search_init(&search, brt_cursor_compare_set, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
r = toku_brt_search(cursor->brt, &search, &newkey, &newval, logger, cursor->omtcursor, &cursor->root_put_counter);
if (r != 0 || compare_kv_xy(cursor->brt, &cursor->key, &cursor->val, &newkey, &newval) != 0)
r = DB_KEYEMPTY;
dbt_cleanup(&newkey);
dbt_cleanup(&newval);
if (r!=0) return r;
}
return brt_cursor_copyout(cursor, outkey, outval);
}
static void swap_dbts (DBT *a, DBT *b) {
DBT tmp=*a;
*a=*b;
*b=tmp;
}
static void swap_cursor_dbts (BRT_CURSOR cursor) {
swap_dbts(&cursor->prevkey, &cursor->key);
swap_dbts(&cursor->prevval, &cursor->val);
}
void brt_cursor_restore_state_from_prev(BRT_CURSOR cursor) {
toku_omt_cursor_invalidate(cursor->omtcursor);
swap_cursor_dbts(cursor);
}
/* search for the first kv pair that matches the search object */
static int brt_cursor_search(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval, TOKULOGGER logger) {
assert(cursor->prevkey.flags == DB_DBT_REALLOC);
assert(cursor->prevval.flags == DB_DBT_REALLOC);
brt_cursor_invalidate_callback(cursor->omtcursor, cursor);
int r = toku_brt_search(cursor->brt, search, &cursor->prevkey, &cursor->prevval, logger, cursor->omtcursor, &cursor->root_put_counter);
if (r == 0) {
swap_cursor_dbts(cursor);
r = brt_cursor_copyout(cursor, outkey, outval);
}
return r;
}
/* search for the kv pair that matches the search object and is equal to kv */
static int brt_cursor_search_eq_kv_xy(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval, TOKULOGGER logger) {
assert(cursor->prevkey.flags == DB_DBT_REALLOC);
assert(cursor->prevval.flags == DB_DBT_REALLOC);
brt_cursor_invalidate_callback(cursor->omtcursor, cursor);
int r = toku_brt_search(cursor->brt, search, &cursor->prevkey, &cursor->prevval, logger, cursor->omtcursor, &cursor->root_put_counter);
if (r == 0) {
if (compare_kv_xy(cursor->brt, search->k, search->v, &cursor->prevkey, &cursor->prevval) == 0) {
swap_cursor_dbts(cursor);
r = brt_cursor_copyout(cursor, outkey, outval);
} else {
r = DB_NOTFOUND;
}
}
return r;
}
/* search for the kv pair that matches the search object and is equal to k */
static int brt_cursor_search_eq_k_x(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval, TOKULOGGER logger) {
assert(cursor->prevkey.flags == DB_DBT_REALLOC);
assert(cursor->prevval.flags == DB_DBT_REALLOC);
brt_cursor_invalidate_callback(cursor->omtcursor, cursor);
int r = toku_brt_search(cursor->brt, search, &cursor->prevkey, &cursor->prevval, logger, cursor->omtcursor, &cursor->root_put_counter);
if (r == 0) {
if (compare_k_x(cursor->brt, search->k, &cursor->prevkey) == 0) {
swap_cursor_dbts(cursor);
r = brt_cursor_copyout(cursor, outkey, outval);
} else
r = DB_NOTFOUND;
}
return r;
}
static int brt_cursor_compare_one(brt_search_t *search, DBT *x, DBT *y) {
search = search; x = x; y = y;
return 1;
}
static int brt_cursor_first(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_one, BRT_SEARCH_LEFT, 0, 0, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_last(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_one, BRT_SEARCH_RIGHT, 0, 0, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_compare_next(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
return compare_kv_xy(brt, search->k, search->v, x, y) < 0; /* return min xy: kv < xy */
}
static void save_omtcursor_current_in_prev(BRT_CURSOR cursor) {
if (!cursor->prev_in_omt) {
//Free the data.
if (cursor->prevkey.data) toku_free(cursor->prevkey.data);
if (cursor->prevval.data) toku_free(cursor->prevval.data);
cursor->prev_in_omt = TRUE;
}
load_dbts_from_omt(cursor, &cursor->prevkey, &cursor->prevval);
}
static int brt_cursor_next_shortcut (BRT_CURSOR cursor, DBT *outkey, DBT *outval)
// Effect: If possible, increment the cursor and return the key-value pair
// (i.e., the next one from what the cursor pointed to before.)
// That is, do DB_NEXT on DUP databases, and do DB_NEXT_NODUP on NODUP databases.
{
if (toku_omt_cursor_is_valid(cursor->omtcursor)) {
{
u_int64_t h_counter = cursor->brt->h->root_put_counter;
if (h_counter != cursor->root_put_counter) return -1;
}
OMTVALUE le;
//Save current value in prev.
save_omtcursor_current_in_prev(cursor);
u_int32_t starting_index;
u_int32_t index;
u_int32_t size = toku_omt_size(toku_omt_cursor_get_omt(cursor->omtcursor));
int r = toku_omt_cursor_current_index(cursor->omtcursor, &starting_index);
assert(r==0);
index = starting_index;
while (index+1 < size) {
r = toku_omt_cursor_next(cursor->omtcursor, &le);
assert(r==0);
index++;
if (le_is_provdel(le)) continue;
//Free old current if necessary.
if (!cursor->current_in_omt) {
if (cursor->key.data) toku_free(cursor->key.data);
if (cursor->val.data) toku_free(cursor->val.data);
cursor->current_in_omt = TRUE;
}
return brt_cursor_copyout(cursor, outkey, outval);
}
toku_omt_cursor_set_index(cursor->omtcursor, starting_index);
toku_omt_cursor_invalidate(cursor->omtcursor);
}
return -1;
}
int toku_brt_cursor_peek_prev(BRT_CURSOR cursor, DBT *outkey, DBT *outval) {
if (toku_omt_cursor_is_valid(cursor->omtcursor)) {
{
......@@ -2565,242 +1552,18 @@ get_next:;
return -1;
}
static int brt_cursor_next(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
if (0!=(cursor->brt->flags & TOKU_DB_DUP) &&
brt_cursor_next_shortcut(cursor, outkey, outval)==0)
return 0;
brt_search_t search; brt_search_init(&search, brt_cursor_compare_next, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
int toku_brt_cursor_after(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval, TOKUTXN txn) {
TOKULOGGER logger = toku_txn_logger(txn);
brt_search_t search; brt_search_init(&search, brt_cursor_compare_next, BRT_SEARCH_LEFT, key, val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_compare_next_nodup(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context; y = y;
return compare_k_x(brt, search->k, x) < 0; /* return min x: k < x */
}
static int brt_cursor_next_nodup(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
if (0==(cursor->brt->flags & TOKU_DB_DUP) &&
brt_cursor_next_shortcut(cursor, outkey, outval)==0)
return 0;
brt_search_t search; brt_search_init(&search, brt_cursor_compare_next_nodup, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_compare_next_dup(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
int keycmp = compare_k_x(brt, search->k, x);
if (keycmp < 0)
return 1;
else
return keycmp == 0 && y && compare_v_y(brt, search->v, y) < 0; /* return min xy: k <= x && v < y */
}
static int brt_cursor_next_dup(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_next_dup, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search_eq_k_x(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_compare_get_both_range(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
int keycmp = compare_k_x(brt, search->k, x);
if (keycmp < 0)
return 1;
else
return keycmp == 0 && (y == 0 || compare_v_y(brt, search->v, y) <= 0); /* return min xy: k <= x && v <= y */
}
static int brt_cursor_get_both_range(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_get_both_range, BRT_SEARCH_LEFT, key, val, cursor->brt);
return brt_cursor_search_eq_k_x(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_compare_prev(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
return compare_kv_xy(brt, search->k, search->v, x, y) > 0; /* return max xy: kv > xy */
}
static int brt_cursor_prev_shortcut (BRT_CURSOR cursor, DBT *outkey, DBT *outval)
// Effect: If possible, decrement the cursor and return the key-value pair
// (i.e., the previous one from what the cursor pointed to before.)
// That is, do DB_PREV on DUP databases, and do DB_PREV_NODUP on NODUP databases.
{
if (toku_omt_cursor_is_valid(cursor->omtcursor)) {
{
u_int64_t h_counter = cursor->brt->h->root_put_counter;
if (h_counter != cursor->root_put_counter) return -1;
}
OMTVALUE le;
//Save current value in prev.
save_omtcursor_current_in_prev(cursor);
u_int32_t starting_index = 0;
u_int32_t index;
int r = toku_omt_cursor_current_index(cursor->omtcursor, &starting_index);
assert(r==0);
index = starting_index;
while (index>0) {
r = toku_omt_cursor_prev(cursor->omtcursor, &le);
assert(r==0);
index--;
if (le_is_provdel(le)) continue;
//Free old current if necessary.
if (!cursor->current_in_omt) {
if (cursor->key.data) toku_free(cursor->key.data);
if (cursor->val.data) toku_free(cursor->val.data);
cursor->current_in_omt = TRUE;
}
return brt_cursor_copyout(cursor, outkey, outval);
}
toku_omt_cursor_set_index(cursor->omtcursor, starting_index);
toku_omt_cursor_invalidate(cursor->omtcursor);
}
return -1;
}
int toku_brt_cursor_before(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval, TOKUTXN txn) {
TOKULOGGER logger = toku_txn_logger(txn);
brt_search_t search; brt_search_init(&search, brt_cursor_compare_prev, BRT_SEARCH_RIGHT, key, val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_prev(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
if (0!=(cursor->brt->flags & TOKU_DB_DUP) &&
brt_cursor_prev_shortcut(cursor, outkey, outval)==0)
return 0;
brt_search_t search; brt_search_init(&search, brt_cursor_compare_prev, BRT_SEARCH_RIGHT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_compare_prev_nodup(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context; y = y;
return compare_k_x(brt, search->k, x) > 0; /* return max x: k > x */
}
static int brt_cursor_prev_nodup(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
if (0==(cursor->brt->flags & TOKU_DB_DUP) &&
brt_cursor_prev_shortcut(cursor, outkey, outval)==0)
return 0;
brt_search_t search; brt_search_init(&search, brt_cursor_compare_prev_nodup, BRT_SEARCH_RIGHT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
#ifdef DB_PREV_DUP
static int brt_cursor_compare_prev_dup(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
int keycmp = compare_k_x(brt, search->k, x);
if (keycmp > 0)
return 1;
else
return keycmp == 0 && y && compare_v_y(brt, search->v, y) > 0; /* return max xy: k >= x && v > y */
}
static int brt_cursor_prev_dup(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_prev_dup, BRT_SEARCH_RIGHT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search_eq_k_x(cursor, &search, outkey, outval, logger);
}
#endif
static int brt_cursor_compare_set_range(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
return compare_kv_xy(brt, search->k, search->v, x, y) <= 0; /* return kv <= xy */
}
static int brt_cursor_set(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_set_range, BRT_SEARCH_LEFT, key, val, cursor->brt);
return brt_cursor_search_eq_kv_xy(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_set_range(BRT_CURSOR cursor, DBT *key, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_set_range, BRT_SEARCH_LEFT, key, 0, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *key, DBT *val, int get_flags, TOKUTXN txn) {
int r;
int op = get_flags & DB_OPFLAGS_MASK;
TOKULOGGER logger = toku_txn_logger(txn);
if (get_flags & ~DB_OPFLAGS_MASK)
return EINVAL;
switch (op) {
case DB_CURRENT:
case DB_CURRENT_BINDING:
r = brt_cursor_current(cursor, op, key, val, logger);
break;
case DB_FIRST:
r = brt_cursor_first(cursor, key, val, logger);
break;
case DB_LAST:
r = brt_cursor_last(cursor, key, val, logger);
break;
case DB_NEXT:
if (brt_cursor_not_set(cursor))
r = brt_cursor_first(cursor, key, val, logger);
else
r = brt_cursor_next(cursor, key, val, logger);
break;
case DB_NEXT_DUP:
if (brt_cursor_not_set(cursor))
r = EINVAL;
else
r = brt_cursor_next_dup(cursor, key, val, logger);
break;
case DB_NEXT_NODUP:
if (brt_cursor_not_set(cursor))
r = brt_cursor_first(cursor, key, val, logger);
else
r = brt_cursor_next_nodup(cursor, key, val, logger);
break;
case DB_PREV:
if (brt_cursor_not_set(cursor))
r = brt_cursor_last(cursor, key, val, logger);
else
r = brt_cursor_prev(cursor, key, val, logger);
break;
#ifdef DB_PREV_DUP
case DB_PREV_DUP:
if (brt_cursor_not_set(cursor))
r = EINVAL;
else
r = brt_cursor_prev_dup(cursor, key, val, logger);
break;
#endif
case DB_PREV_NODUP:
if (brt_cursor_not_set(cursor))
r = brt_cursor_last(cursor, key, val, logger);
else
r = brt_cursor_prev_nodup(cursor, key, val, logger);
break;
case DB_SET:
r = brt_cursor_set(cursor, key, 0, 0, val, logger);
break;
case DB_SET_RANGE:
r = brt_cursor_set_range(cursor, key, key, val, logger);
break;
case DB_GET_BOTH:
r = brt_cursor_set(cursor, key, val, 0, 0, logger);
break;
case DB_GET_BOTH_RANGE:
r = brt_cursor_get_both_range(cursor, key, val, 0, val, logger);
break;
default:
r = EINVAL;
break;
}
return r;
}
static int brt_cursor_compare_heavi(brt_search_t *search, DBT *x, DBT *y) {
HEAVI_WRAPPER wrapper = search->context;
int r = wrapper->h(x, y, wrapper->extra_h);
......
......@@ -196,6 +196,20 @@ get_leaf_reactivity (BRTNODE node) {
return RE_STABLE;
}
static enum reactivity
get_nonleaf_reactivity (BRTNODE node) {
assert(node->height>0);
int n_children = node->u.n.n_children;
if (n_children > TREE_FANOUT) return RE_FISSIBLE;
if (n_children*4 < TREE_FANOUT) return RE_FUSIBLE;
return RE_STABLE;
}
static BOOL
nonleaf_node_is_gorged (BRTNODE node) {
return toku_serialize_brtnode_size(node) > node->nodesize;
}
static int
brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, enum reactivity *re, BOOL *did_io);
......@@ -208,6 +222,13 @@ int toku_brt_debug_mode = 0;
#define VERIFY_NODE(t,n) ((void)0)
#endif
//#define BRT_TRACE
#ifdef BRT_TRACE
#define WHEN_BRTTRACE(x) x
#else
#define WHEN_BRTTRACE(x) ((void)0)
#endif
static u_int32_t compute_child_fullhash (CACHEFILE cf, BRTNODE node, int childnum) {
switch (BNC_HAVE_FULLHASH(node, childnum)) {
case TRUE:
......@@ -1774,6 +1795,85 @@ brt_merge_child (BRT t, BRTNODE node, int childnum, BOOL *did_io)
return 0;
}
static void find_heaviest_child (BRTNODE node, int *childnum) {
int max_child = 0;
int max_weight = BNC_NBYTESINBUF(node, 0);
int i;
if (0) printf("%s:%d weights: %d", __FILE__, __LINE__, max_weight);
assert(node->u.n.n_children>0);
for (i=1; i<node->u.n.n_children; i++) {
int this_weight = BNC_NBYTESINBUF(node,i);
if (0) printf(" %d", this_weight);
if (max_weight < this_weight) {
max_child = i;
max_weight = this_weight;
}
}
*childnum = max_child;
if (0) printf("\n");
}
static int
flush_this_child (BRT t, BRTNODE node, int childnum, TOKULOGGER logger, enum reactivity *child_re, BOOL *did_io) {
assert(node->height>0);
BLOCKNUM targetchild = BNC_BLOCKNUM(node, childnum);
assert(targetchild.b>=0 && targetchild.b<t->h->unused_blocks.b); // This assertion could fail in a concurrent setting since another process might have bumped unused memory.
u_int32_t childfullhash = compute_child_fullhash(t->cf, node, childnum);
BRTNODE child;
{
void *childnode_v;
int r = toku_cachetable_get_and_pin(t->cf, targetchild, childfullhash, &childnode_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t->h);
if (r!=0) return r;
child = childnode_v;
}
assert(child->thisnodename.b!=0);
VERIFY_NODE(t, child);
int r = 0;
{
bytevec key,val;
ITEMLEN keylen, vallen;
//printf("%s:%d Try random_pick, weight=%d \n", __FILE__, __LINE__, BNC_NBYTESINBUF(node, childnum));
assert(toku_fifo_n_entries(BNC_BUFFER(node,childnum))>0);
u_int32_t type;
TXNID xid;
while(0==toku_fifo_peek(BNC_BUFFER(node,childnum), &key, &keylen, &val, &vallen, &type, &xid)) {
DBT hk,hv;
BRT_CMD_S brtcmd = { (enum brt_cmd_type)type, xid, .u.id= {toku_fill_dbt(&hk, key, keylen),
toku_fill_dbt(&hv, val, vallen)} };
//printf("%s:%d random_picked\n", __FILE__, __LINE__);
r = brtnode_put_cmd (t, child, &brtcmd, logger, child_re, did_io);
//printf("%s:%d %d=push_a_brt_cmd_down=(); child_did_split=%d (weight=%d)\n", __FILE__, __LINE__, r, child_did_split, BNC_NBYTESINBUF(node, childnum));
if (r!=0) goto return_r;
}
if (0) printf("%s:%d done random picking\n", __FILE__, __LINE__);
}
//verify_local_fingerprint_nonleaf(node);
return_r:
{
int rr=toku_unpin_brtnode(t, child);
if (rr!=0) return rr;
}
return r;
}
static int
flush_some_child (BRT t, BRTNODE node, TOKULOGGER logger, enum reactivity re_array[], BOOL *did_io)
{
assert(node->height>0);
int childnum;
find_heaviest_child(node, &childnum);
assert(toku_fifo_n_entries(BNC_BUFFER(node, childnum))>0);
return flush_this_child (t, node, childnum, logger, &re_array[childnum], did_io);
}
static int
brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, enum reactivity *re, BOOL *did_io)
// Effect: Push CMD into the subtree rooted at NODE, and indicate whether as a result NODE should split or should merge.
......@@ -1783,6 +1883,7 @@ brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, enum react
// actually putting it into the child.
// Set *re to the reactivity of the node. If node becomes reactive, we don't change its shape (but if a child becomes reactive, we fix it.)
// If we perform I/O then set *did_io to true.
// If a nonleaf node becomes overfull then we will flush some child.
{
if (node->height==0) {
return brt_leaf_put_cmd(t, node, cmd, logger, re);
......@@ -1790,6 +1891,14 @@ brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, enum react
enum reactivity *MALLOC_N(node->u.n.n_children, child_re);
int r = brt_nonleaf_put_cmd(t, node, cmd, logger, child_re, did_io);
if (r!=0) goto return_r;
// Now we may have overfilled node. So we'll flush the heaviest child until we are happy.
while (!*did_io // Don't flush if we've done I/O.
&& nonleaf_node_is_gorged(node) // Don't flush if the node is small enough.
&& (node->u.n.n_bytes_in_buffers > 0) // Don't try to flush if everything is flushed.
) {
r = flush_some_child(t, node, logger, child_re, did_io);
if (r!=0) goto return_r;
}
// Now all those children may need fixing.
int i;
for (i=0; i<node->u.n.n_children; i++) {
......@@ -1985,6 +2094,22 @@ int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn)
return r;
}
int toku_brt_delete(BRT brt, DBT *key, TOKUTXN txn) {
int r;
if (txn && (brt->txn_that_created != toku_txn_get_txnid(txn))) {
BYTESTRING keybs = {key->size, toku_memdup_in_rollback(txn, key->data, key->size)};
toku_cachefile_refup(brt->cf);
r = toku_logger_save_rollback_cmddelete(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), keybs);
if (r!=0) return r;
r = toku_txn_note_brt(txn, brt);
if (r!=0) return r;
}
DBT val;
BRT_CMD_S brtcmd = { BRT_DELETE_ANY, toku_txn_get_txnid(txn), .u.id={key, toku_init_dbt(&val)}};
r = toku_brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn));
return r;
}
struct omt_compressor_state {
struct mempool *new_kvspace;
OMT omt;
......@@ -2033,6 +2158,338 @@ void *mempool_malloc_from_omt(OMT omt, struct mempool *mp, size_t size) {
/* ******************** open,close and create ********************** */
// This one has no env
int toku_open_brt (const char *fname, const char *dbname, int is_create, BRT *newbrt, int nodesize, CACHETABLE cachetable, TOKUTXN txn,
int (*compare_fun)(DB*,const DBT*,const DBT*), DB *db) {
BRT brt;
int r;
const int only_create = 0;
r = toku_brt_create(&brt);
if (r != 0)
return r;
toku_brt_set_nodesize(brt, nodesize);
toku_brt_set_bt_compare(brt, compare_fun);
r = toku_brt_open(brt, fname, fname, dbname, is_create, only_create, cachetable, txn, db);
if (r != 0) {
return r;
}
*newbrt = brt;
return r;
}
static int setup_initial_brt_root_node (BRT t, BLOCKNUM blocknum, TOKULOGGER logger) {
int r;
TAGMALLOC(BRTNODE, node);
assert(node);
node->ever_been_written = 0;
//printf("%s:%d\n", __FILE__, __LINE__);
initialize_empty_brtnode(t, node, blocknum, 0);
// node->brt = t;
if (0) {
printf("%s:%d for tree %p node %p mdict_create--> %p\n", __FILE__, __LINE__, t, node, node->u.l.buffer);
printf("%s:%d put root at %" PRId64 "\n", __FILE__, __LINE__, blocknum.b);
}
//printf("%s:%d putting %p (%lld)\n", __FILE__, __LINE__, node, node->thisnodename);
u_int32_t fullhash = toku_cachetable_hash(t->cf, blocknum);
node->fullhash = fullhash;
r=toku_cachetable_put(t->cf, blocknum, fullhash,
node, brtnode_memory_size(node),
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t->h);
if (r!=0) {
toku_free(node);
return r;
}
// verify_local_fingerprint_nonleaf(node);
toku_log_newbrtnode(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), blocknum, 0, t->h->nodesize, (unsigned char)((t->flags&TOKU_DB_DUPSORT)!=0), node->rand4fingerprint);
r = toku_unpin_brtnode(t, node);
if (r!=0) {
toku_free(node);
return r;
}
return 0;
}
// open a file for use by the brt. if the file does not exist, create it.
static int brt_open_file(BRT brt, const char *fname, const char *fname_in_env, int is_create, TOKUTXN txn, int *fdp) {
brt = brt;
mode_t mode = 0777;
int r;
int fd = open(fname, O_RDWR, mode);
if (fd==-1) {
r = errno;
if (errno == ENOENT) {
if (!is_create) {
return r;
}
fd = open(fname, O_RDWR | O_CREAT, mode);
if (fd == -1) {
r = errno; return r;
}
r = toku_logger_log_fcreate(txn, fname_in_env, mode);
if (r != 0) {
close(fd); return r;
}
} else
return r;
}
*fdp = fd;
return 0;
}
// allocate and initialize a brt header.
// t->cf is not set to anything.
static int brt_alloc_init_header(BRT t, const char *dbname, TOKUTXN txn) {
int r;
BLOCKNUM root = make_blocknum(1);
assert(t->h == 0);
if ((MALLOC(t->h))==0) {
assert(errno==ENOMEM);
r = ENOMEM;
if (0) { died2: toku_free(t->h); }
t->h=0;
return r;
}
t->h->dirty=1;
if ((MALLOC_N(1, t->h->flags_array))==0) { r = errno; if (0) { died3: toku_free(t->h->flags_array); } goto died2; }
t->h->flags_array[0] = t->flags;
t->h->nodesize=t->nodesize;
t->h->free_blocks = make_blocknum(-1);
t->h->unused_blocks=make_blocknum(2);
t->h->translated_blocknum_limit = 0;
t->h->block_translation = 0;
t->h->block_translation_size_on_disk = 0;
t->h->block_translation_address_on_disk = 0;
// printf("%s:%d translated_blocknum_limit=%ld, block_translation_address_on_disk=%ld\n", __FILE__, __LINE__, t->h->translated_blocknum_limit, t->h->block_translation_address_on_disk);
create_block_allocator(&t->h->block_allocator, t->nodesize, BLOCK_ALLOCATOR_ALIGNMENT);
toku_fifo_create(&t->h->fifo);
t->h->root_put_counter = global_root_put_counter++;
if (dbname) {
t->h->n_named_roots = 1;
if ((MALLOC_N(1, t->h->names))==0) { assert(errno==ENOMEM); r=ENOMEM; if (0) { died4: if (dbname) toku_free(t->h->names); } goto died3; }
if ((MALLOC_N(1, t->h->roots))==0) { assert(errno==ENOMEM); r=ENOMEM; if (0) { died5: if (dbname) toku_free(t->h->roots); } goto died4; }
if ((MALLOC_N(1, t->h->root_hashes))==0) { assert(errno==ENOMEM); r=ENOMEM; if (0) { died6: if (dbname) toku_free(t->h->root_hashes); } goto died5; }
if ((t->h->names[0] = toku_strdup(dbname))==0) { assert(errno==ENOMEM); r=ENOMEM; if (0) { died7: if (dbname) toku_free(t->h->names[0]); } goto died6; }
t->h->roots[0] = root; // Block 0 is the header. Block 1 is the root.
compute_and_fill_remembered_hash(t, 0);
} else {
MALLOC_N(1, t->h->roots); assert(t->h->roots);
MALLOC_N(1, t->h->root_hashes); assert(t->h->root_hashes);
t->h->roots[0] = root;
compute_and_fill_remembered_hash(t, 0);
t->h->n_named_roots = -1;
t->h->names=0;
}
{
LOGGEDBRTHEADER lh = {.size= toku_serialize_brt_header_size(t->h),
.flags = t->flags,
.nodesize = t->h->nodesize,
.free_blocks = t->h->free_blocks,
.unused_blocks = t->h->unused_blocks,
.n_named_roots = t->h->n_named_roots };
if (t->h->n_named_roots>=0) {
lh.u.many.names = t->h->names;
lh.u.many.roots = t->h->roots;
} else {
lh.u.one.root = t->h->roots[0];
}
if ((r=toku_log_fheader(toku_txn_logger(txn), (LSN*)0, 0, toku_txn_get_txnid(txn), toku_cachefile_filenum(t->cf), lh))) { goto died7; }
}
if ((r=setup_initial_brt_root_node(t, root, toku_txn_logger(txn)))!=0) { goto died7; }
//printf("%s:%d putting %p (%d)\n", __FILE__, __LINE__, t->h, 0);
assert(t->h->free_blocks.b==-1);
toku_cachefile_set_userdata(t->cf, t->h, toku_brtheader_close);
return r;
}
int toku_read_brt_header_and_store_in_cachefile (CACHEFILE cf, struct brt_header **header)
// If the cachefile already has the header, then just get it.
// If the cachefile has not been initialized, then don't modify anything.
{
{
struct brt_header *h;
if ((h=toku_cachefile_get_userdata(cf))!=0) {
*header = h;
return 0;
}
}
struct brt_header *h;
int r = toku_deserialize_brtheader_from(toku_cachefile_fd(cf), make_blocknum(0), &h);
if (r!=0) return r;
h->root_put_counter = global_root_put_counter++;
toku_cachefile_set_userdata(cf, (void*)h, toku_brtheader_close);
*header = h;
return 0;
}
int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char *dbname, int is_create, int only_create, CACHETABLE cachetable, TOKUTXN txn, DB *db) {
/* If dbname is NULL then we setup to hold a single tree. Otherwise we setup an array. */
int r;
char *malloced_name=0;
int db_index;
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, get_n_items_malloced()); toku_print_malloced_items();
WHEN_BRTTRACE(fprintf(stderr, "BRTTRACE: %s:%d toku_brt_open(%s, \"%s\", %d, %p, %d, %p)\n",
__FILE__, __LINE__, fname, dbname, is_create, newbrt, nodesize, cachetable));
if (0) { died0: assert(r); return r; }
assert(is_create || !only_create);
t->fname = toku_strdup(fname_in_env);
if (t->fname==0) {
r = errno;
if (0) { died00: if (t->fname) toku_free(t->fname); t->fname=0; }
goto died0;
}
if (dbname) {
malloced_name = toku_strdup(dbname);
if (malloced_name==0) {
r = ENOMEM;
if (0) { died0a: if(malloced_name) toku_free(malloced_name); }
goto died00;
}
}
t->database_name = malloced_name;
t->db = db;
t->txn_that_created = 0; // Uses 0 for no transaction.
{
int fd = -1;
r = brt_open_file(t, fname, fname_in_env, is_create, txn, &fd);
if (r != 0) {
t->database_name = 0; goto died0a;
}
r=toku_cachetable_openfd(&t->cf, cachetable, fd, fname_in_env);
if (r != 0) goto died0a;
toku_logger_log_fopen(txn, fname_in_env, toku_cachefile_filenum(t->cf));
}
if (r!=0) {
if (0) { died_after_open: toku_cachefile_close(&t->cf, toku_txn_logger(txn)); }
t->database_name = 0;
goto died0a;
}
assert(t->nodesize>0);
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, get_n_items_malloced()); toku_print_malloced_items();
if (0) {
died_after_read_and_pin:
goto died_after_open;
}
if (is_create) {
r = toku_read_brt_header_and_store_in_cachefile(t->cf, &t->h);
if (r==-1) {
r = brt_alloc_init_header(t, dbname, txn);
if (r != 0) goto died_after_read_and_pin;
}
else if (r!=0) {
goto died_after_read_and_pin;
}
else {
int i;
assert(r==0);
assert(dbname);
if (t->h->n_named_roots<0) { r=EINVAL; goto died_after_read_and_pin; } // Cannot create a subdb in a file that is not enabled for subdbs
assert(t->h->n_named_roots>=0);
for (i=0; i<t->h->n_named_roots; i++) {
if (strcmp(t->h->names[i], dbname)==0) {
if (only_create) {
r = EEXIST;
goto died_after_read_and_pin;
}
else {
db_index = i;
goto found_it;
}
}
}
if ((t->h->names = toku_realloc(t->h->names, (1+t->h->n_named_roots)*sizeof(*t->h->names))) == 0) { assert(errno==ENOMEM); r=ENOMEM; goto died_after_read_and_pin; }
if ((t->h->roots = toku_realloc(t->h->roots, (1+t->h->n_named_roots)*sizeof(*t->h->roots))) == 0) { assert(errno==ENOMEM); r=ENOMEM; goto died_after_read_and_pin; }
if ((t->h->root_hashes = toku_realloc(t->h->root_hashes, (1+t->h->n_named_roots)*sizeof(*t->h->root_hashes))) == 0) { assert(errno==ENOMEM); r=ENOMEM; goto died_after_read_and_pin; }
if ((t->h->flags_array = toku_realloc(t->h->flags_array, (1+t->h->n_named_roots)*sizeof(*t->h->flags_array))) == 0) { assert(errno==ENOMEM); r=ENOMEM; goto died_after_read_and_pin; }
t->h->flags_array[t->h->n_named_roots] = t->flags;
t->h->n_named_roots++;
if ((t->h->names[t->h->n_named_roots-1] = toku_strdup(dbname)) == 0) { assert(errno==ENOMEM); r=ENOMEM; goto died_after_read_and_pin; }
//printf("%s:%d t=%p\n", __FILE__, __LINE__, t);
r = allocate_diskblocknumber(&t->h->roots[t->h->n_named_roots-1], t, toku_txn_logger(txn));
if (r!=0) goto died_after_read_and_pin;
t->h->dirty = 1;
compute_and_fill_remembered_hash(t, t->h->n_named_roots-1);
if ((r=setup_initial_brt_root_node(t, t->h->roots[t->h->n_named_roots-1], toku_txn_logger(txn)))!=0) goto died_after_read_and_pin;
}
} else {
if ((r = toku_read_brt_header_and_store_in_cachefile(t->cf, &t->h))!=0) goto died_after_open;
if (!dbname) {
if (t->h->n_named_roots!=-1) { r = EINVAL; goto died_after_read_and_pin; } // requires a subdb
db_index=0;
} else {
int i;
if (t->h->n_named_roots==-1) { r = EINVAL; goto died_after_read_and_pin; } // no suddbs in the db
// printf("%s:%d n_roots=%d\n", __FILE__, __LINE__, t->h->n_named_roots);
for (i=0; i<t->h->n_named_roots; i++) {
if (strcmp(t->h->names[i], dbname)==0) {
db_index=i;
goto found_it;
}
}
r=ENOENT; /* the database doesn't exist */
goto died_after_read_and_pin;
}
found_it:
t->nodesize = t->h->nodesize; /* inherit the pagesize from the file */
if (!t->did_set_flags) {
t->flags = t->h->flags_array[db_index];
} else {
if (t->flags != t->h->flags_array[db_index]) { /* if flags have been set then flags must match */
r = EINVAL; goto died_after_read_and_pin;
}
}
}
assert(t->h);
WHEN_BRTTRACE(fprintf(stderr, "BRTTRACE -> %p\n", t));
return 0;
}
int toku_brt_set_flags(BRT brt, unsigned int flags) {
brt->did_set_flags = 1;
brt->flags = flags;
return 0;
}
int toku_brt_get_flags(BRT brt, unsigned int *flags) {
*flags = brt->flags;
return 0;
}
int toku_brt_set_nodesize(BRT brt, unsigned int nodesize) {
brt->nodesize = nodesize;
return 0;
}
int toku_brt_get_nodesize(BRT brt, unsigned int *nodesize) {
*nodesize = brt->nodesize;
return 0;
}
int toku_brt_set_bt_compare(BRT brt, int (*bt_compare)(DB *, const DBT*, const DBT*)) {
brt->compare_fun = bt_compare;
return 0;
}
int toku_brt_set_dup_compare(BRT brt, int (*dup_compare)(DB *, const DBT*, const DBT*)) {
brt->dup_compare = dup_compare;
return 0;
}
int toku_brt_create_cachetable(CACHETABLE *ct, long cachesize, LSN initial_lsn, TOKULOGGER logger) {
if (cachesize == 0)
cachesize = 128*1024*1024;
return toku_create_cachetable(ct, cachesize, initial_lsn, logger);
}
int toku_brtheader_close (CACHEFILE cachefile, void *header_v) {
struct brt_header *h = header_v;
//printf("%s:%d allocated_limit=%lu writing queue to %lu\n", __FILE__, __LINE__,
......@@ -2106,6 +2563,79 @@ static inline void dbt_cleanup(DBT *dbt) {
}
}
static void swap_dbts (DBT *a, DBT *b) {
DBT tmp=*a;
*a=*b;
*b=tmp;
}
static void swap_cursor_dbts (BRT_CURSOR cursor) {
swap_dbts(&cursor->prevkey, &cursor->key);
swap_dbts(&cursor->prevval, &cursor->val);
}
static inline void load_dbts_from_omt(BRT_CURSOR c, DBT *key, DBT *val) {
OMTVALUE le = 0;
int r = toku_omt_cursor_current(c->omtcursor, &le);
assert(r==0);
if (key) {
key->data = le_latest_key(le);
key->size = le_latest_keylen(le);
}
if (val) {
val->data = le_latest_val(le);
val->size = le_latest_vallen(le);
}
}
static void brt_cursor_invalidate_callback(OMTCURSOR UU(omt_c), void *extra) {
BRT_CURSOR cursor = extra;
if (cursor->current_in_omt) {
assert(cursor->key.flags==DB_DBT_REALLOC);
assert(cursor->val.flags==DB_DBT_REALLOC);
DBT key,val;
int r;
load_dbts_from_omt(cursor, toku_init_dbt(&key), toku_init_dbt(&val));
//Make certain not to try to free the omt's memory.
toku_init_dbt(&cursor->key)->flags = DB_DBT_REALLOC;
toku_init_dbt(&cursor->val)->flags = DB_DBT_REALLOC;
r = toku_dbt_set_two_values(&cursor->key, (bytevec*)&key.data, key.size, NULL, FALSE,
&cursor->val, (bytevec*)&val.data, val.size, NULL, FALSE);
//TODO: Find some way to deal with ENOMEM here.
assert(r==0);
cursor->current_in_omt = FALSE;
}
if (cursor->prev_in_omt) {
toku_init_dbt(&cursor->prevkey)->flags = DB_DBT_REALLOC;
toku_init_dbt(&cursor->prevval)->flags = DB_DBT_REALLOC;
cursor->prev_in_omt = FALSE;
}
}
int toku_brt_cursor (BRT brt, BRT_CURSOR *cursorptr, int is_temporary_cursor) {
BRT_CURSOR cursor = toku_malloc(sizeof *cursor);
if (cursor == 0)
return ENOMEM;
cursor->brt = brt;
toku_init_dbt(&cursor->key); cursor->key.flags = DB_DBT_REALLOC;
toku_init_dbt(&cursor->val); cursor->val.flags = DB_DBT_REALLOC;
toku_init_dbt(&cursor->prevkey); cursor->prevkey.flags = DB_DBT_REALLOC;
toku_init_dbt(&cursor->prevval); cursor->prevval.flags = DB_DBT_REALLOC;
cursor->current_in_omt = FALSE;
cursor->prev_in_omt = FALSE;
list_push(&brt->cursors, &cursor->cursors_link);
cursor->is_temporary_cursor=is_temporary_cursor;
cursor->skey = cursor->sval = 0;
int r = toku_omt_cursor_create(&cursor->omtcursor);
assert(r==0);
toku_omt_cursor_set_invalidate_callback(cursor->omtcursor,
brt_cursor_invalidate_callback, cursor);
cursor->root_put_counter=0;
*cursorptr = cursor;
return 0;
}
int toku_brt_cursor_close(BRT_CURSOR cursor) {
if (!cursor->current_in_omt) {
dbt_cleanup(&cursor->key);
......@@ -2123,3 +2653,760 @@ int toku_brt_cursor_close(BRT_CURSOR cursor) {
toku_free_n(cursor, sizeof *cursor);
return 0;
}
static void save_omtcursor_current_in_prev(BRT_CURSOR cursor) {
if (!cursor->prev_in_omt) {
//Free the data.
if (cursor->prevkey.data) toku_free(cursor->prevkey.data);
if (cursor->prevval.data) toku_free(cursor->prevval.data);
cursor->prev_in_omt = TRUE;
}
load_dbts_from_omt(cursor, &cursor->prevkey, &cursor->prevval);
}
static BOOL brt_cursor_not_set(BRT_CURSOR cursor) {
return (BOOL)((cursor->key.data == 0)
||
(cursor->val.data == 0));
}
static inline int brt_cursor_copyout(BRT_CURSOR cursor, DBT *key, DBT *val) {
//Passing in NULL for both key and val is used with light weight cursors.
//Retrieval of key and val will use the peek functions.
if (!key && !val) return 0;
int r = 0;
void** key_staticp = cursor->is_temporary_cursor ? &cursor->brt->skey : &cursor->skey;
void** val_staticp = cursor->is_temporary_cursor ? &cursor->brt->sval : &cursor->sval;
if (cursor->current_in_omt) load_dbts_from_omt(cursor, &cursor->key, &cursor->val);
r = toku_dbt_set_two_values(key, (bytevec*)&cursor->key.data, cursor->key.size, key_staticp, FALSE,
val, (bytevec*)&cursor->val.data, cursor->val.size, val_staticp, FALSE);
return r;
}
static int
pair_leafval_heaviside_le_committed (u_int32_t klen, void *kval,
u_int32_t dlen, void *dval,
brt_search_t *search) {
DBT x,y;
int cmp = search->compare(search,
search->k ? toku_fill_dbt(&x, kval, klen) : 0,
search->v ? toku_fill_dbt(&y, dval, dlen) : 0);
// The search->compare function returns only 0 or 1
switch (search->direction) {
case BRT_SEARCH_LEFT: return cmp==0 ? -1 : +1;
case BRT_SEARCH_RIGHT: return cmp==0 ? +1 : -1; // Because the comparison runs backwards for right searches.
}
assert(0);
return 0;
}
static int
pair_leafval_heaviside_le_both (TXNID xid __attribute__((__unused__)),
u_int32_t klen, void *kval,
u_int32_t clen __attribute__((__unused__)), void *cval __attribute__((__unused__)),
u_int32_t plen, void *pval,
brt_search_t *search) {
return pair_leafval_heaviside_le_committed(klen, kval, plen, pval, search);
}
static int
pair_leafval_heaviside_le_provdel (TXNID xid __attribute__((__unused__)),
u_int32_t klen, void *kval,
u_int32_t clen, void *cval,
brt_search_t *be) {
return pair_leafval_heaviside_le_committed(klen, kval, clen, cval, be);
}
static int
pair_leafval_heaviside_le_provpair (TXNID xid __attribute__((__unused__)),
u_int32_t klen, void *kval,
u_int32_t plen, void *pval,
brt_search_t *be) {
return pair_leafval_heaviside_le_committed(klen, kval, plen, pval, be);
}
static int heaviside_from_search_t (OMTVALUE lev, void *extra) {
LEAFENTRY leafval=lev;
brt_search_t *search = extra;
LESWITCHCALL(leafval, pair_leafval_heaviside, search);
abort(); return 0;
}
static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, TOKULOGGER logger, OMTCURSOR omtcursor) {
// Now we have to convert from brt_search_t to the heaviside function with a direction. What a pain...
int direction;
switch (search->direction) {
case BRT_SEARCH_LEFT: direction = +1; goto ok;
case BRT_SEARCH_RIGHT: direction = -1; goto ok;
}
return EINVAL; // This return and the goto are a hack to get both compile-time and run-time checking on enum
ok: ;
OMTVALUE datav;
u_int32_t idx = 0;
int r = toku_omt_find(node->u.l.buffer,
heaviside_from_search_t,
search,
direction,
&datav, &idx, omtcursor);
if (r!=0) return r;
LEAFENTRY le = datav;
if (le_is_provdel(le)) {
TXNID xid = le_any_xid(le);
TOKUTXN txn = 0;
toku_txn_find_by_xid(brt, xid, &txn);
// Provisionally deleted stuff is gone.
// So we need to scan in the direction to see if we can find something
while (1) {
// see if the transaction is alive
TXNID newxid = le_any_xid(le);
if (newxid != xid) {
xid = newxid;
txn = 0;
toku_txn_find_by_xid(brt, xid, &txn);
}
switch (search->direction) {
case BRT_SEARCH_LEFT:
if (txn) {
// printf("xid %llu -> %p\n", (unsigned long long) xid, txn);
idx++;
} else {
// apply a commit message for this leafentry to the node
// printf("apply commit_both %llu\n", (unsigned long long) xid);
DBT key, val;
BRT_CMD_S brtcmd = { BRT_COMMIT_BOTH, xid, .u.id= {toku_fill_dbt(&key, le_latest_key(le), le_latest_keylen(le)),
toku_fill_dbt(&val, le_latest_val(le), le_latest_vallen(le))} };
r = brt_leaf_apply_cmd_once(brt, node, &brtcmd, logger, idx, le);
assert(r == 0);
}
if (idx>=toku_omt_size(node->u.l.buffer)) return DB_NOTFOUND;
break;
case BRT_SEARCH_RIGHT:
if (idx==0) return DB_NOTFOUND;
idx--;
break;
}
if (idx>=toku_omt_size(node->u.l.buffer)) continue;
r = toku_omt_fetch(node->u.l.buffer, idx, &datav, omtcursor);
assert(r==0); // we just validated the index
le = datav;
if (!le_is_provdel(le)) goto got_a_good_value;
}
}
got_a_good_value:
if (newkey || newval) {
bytevec key = newkey ? le_latest_key(le) : NULL;
u_int32_t key_len = newkey ? le_latest_keylen(le) : 0;
bytevec val = newval ? le_latest_val(le) : NULL;
u_int32_t val_len = newval ? le_latest_vallen(le) : 0;
r = toku_dbt_set_two_values(newkey, &key, key_len, &brt->skey, FALSE,
newval, &val, val_len, &brt->sval, FALSE);
if (r!=0) return r;
}
return 0;
}
static int
brt_search_node (BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, enum reactivity *re, TOKULOGGER logger, OMTCURSOR omtcursor);
/* search in a node's child */
static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, DBT *newkey, DBT *newval, enum reactivity *parent_re, TOKULOGGER logger, OMTCURSOR omtcursor) {
/* if the child's buffer is not empty then empty it */
if (BNC_NBYTESINBUF(node, childnum) > 0) {
BOOL did_io = FALSE;
enum reactivity child_re = RE_STABLE;
int rr = flush_this_child(brt, node, childnum, logger, &child_re, &did_io);
assert(rr == 0);
/* push down may cause the child to be overfull, but that's OK. We'll search the child anyway, and recompute the ractivity. */
}
void *node_v;
BLOCKNUM childblocknum = BNC_BLOCKNUM(node,childnum);
u_int32_t fullhash = compute_child_fullhash(brt->cf, node, childnum);
{
int rr = toku_cachetable_get_and_pin(brt->cf, childblocknum, fullhash, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt->h);
assert(rr == 0);
}
BRTNODE childnode = node_v;
enum reactivity child_re = RE_STABLE;
int r = brt_search_node(brt, childnode, search, newkey, newval, &child_re, logger, omtcursor);
if (r!=0) goto return_r;
switch (child_re) {
case RE_STABLE: goto return_r;
case RE_FISSIBLE:
r = brt_split_child(brt, node, childnum, logger);
goto return_r;
case RE_FUSIBLE:
{
BOOL did_io = FALSE;
r = brt_merge_child(brt, node, childnum, &did_io);
goto return_r;
}
}
assert(0); // enum is broken
return_r:
{
int rr = toku_cachetable_unpin(brt->cf, childnode->thisnodename, childnode->fullhash, childnode->dirty, brtnode_memory_size(childnode));
assert(rr == 0);
}
*parent_re = get_nonleaf_reactivity(node);
return r;
}
static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, enum reactivity *re, TOKULOGGER logger, OMTCURSOR omtcursor) {
int c;
/* binary search is overkill for a small array */
int child[node->u.n.n_children];
/* scan left to right or right to left depending on the search direction */
for (c = 0; c < node->u.n.n_children; c++)
child[c] = search->direction & BRT_SEARCH_LEFT ? c : node->u.n.n_children - 1 - c;
for (c = 0; c < node->u.n.n_children-1; c++) {
int p = search->direction & BRT_SEARCH_LEFT ? child[c] : child[c] - 1;
struct kv_pair *pivot = node->u.n.childkeys[p];
DBT pivotkey, pivotval;
if (search->compare(search,
toku_fill_dbt(&pivotkey, kv_pair_key(pivot), kv_pair_keylen(pivot)),
brt->flags & TOKU_DB_DUPSORT ? toku_fill_dbt(&pivotval, kv_pair_val(pivot), kv_pair_vallen(pivot)): 0)) {
int r = brt_search_child(brt, node, child[c], search, newkey, newval, re, logger, omtcursor);
assert(r != EAGAIN);
if (r == 0) return r;
}
}
/* check the first (left) or last (right) node if nothing has been found */
return brt_search_child(brt, node, child[c], search, newkey, newval, re, logger, omtcursor);
}
static int
brt_search_node (BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, enum reactivity *re, TOKULOGGER logger, OMTCURSOR omtcursor)
{
if (node->height > 0)
return brt_search_nonleaf_node(brt, node, search, newkey, newval, re, logger, omtcursor);
else
return brt_search_leaf_node(brt, node, search, newkey, newval, logger, omtcursor);
}
static int
toku_brt_search (BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, TOKULOGGER logger, OMTCURSOR omtcursor, u_int64_t *root_put_counter)
// Effect: Perform a search. Associate cursor with a leaf if possible.
// All searches are performed through this function.
{
int r, rr;
assert(brt->h);
*root_put_counter = brt->h->root_put_counter;
u_int32_t fullhash;
CACHEKEY *rootp = toku_calculate_root_offset_pointer(brt, &fullhash);
void *node_v;
//assert(fullhash == toku_cachetable_hash(brt->cf, *rootp));
rr = toku_cachetable_get_and_pin(brt->cf, *rootp, fullhash,
&node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt->h);
assert(rr == 0);
BRTNODE node = node_v;
// push the fifo stuff
{
DBT okey,odata;
BRT_CMD_S ocmd;
while (0==toku_fifo_peek_cmdstruct(brt->h->fifo, &ocmd, &okey, &odata)) {
if ((r = push_something_at_root(brt, &node, rootp, &ocmd, logger))) return r;
r = toku_fifo_deq(brt->h->fifo);
assert(r==0);
}
}
{
enum reactivity re;
r = brt_search_node(brt, node, search, newkey, newval, &re, logger, omtcursor);
if (r!=0) goto return_r;
switch (re) {
case RE_STABLE: goto return_r;
case RE_FUSIBLE:
// The root node should split, so make a new root.
{
BRTNODE nodea,nodeb;
DBT splitk;
if (node->height==0) {
r = brtleaf_split(logger, toku_cachefile_filenum(brt->cf), brt, node, &nodea, &nodeb, &splitk);
if (r!=0) goto return_r;
} else {
r = brt_nonleaf_split(brt, node, &nodea, &nodeb, &splitk, logger);
if (r!=0) goto return_r;
}
r = brt_init_new_root(brt, nodea, nodeb, splitk, rootp, logger, &node);
goto return_r;
}
case RE_FISSIBLE:
goto return_r; // Cannot merge anything at the root, so return happy.
}
assert(0); // cannot happen
}
return_r:
rr = toku_unpin_brtnode(brt, node);
assert(rr == 0);
return r;
}
/* search for the first kv pair that matches the search object */
static int brt_cursor_search(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval, TOKULOGGER logger) {
assert(cursor->prevkey.flags == DB_DBT_REALLOC);
assert(cursor->prevval.flags == DB_DBT_REALLOC);
brt_cursor_invalidate_callback(cursor->omtcursor, cursor);
int r = toku_brt_search(cursor->brt, search, &cursor->prevkey, &cursor->prevval, logger, cursor->omtcursor, &cursor->root_put_counter);
if (r == 0) {
swap_cursor_dbts(cursor);
r = brt_cursor_copyout(cursor, outkey, outval);
}
return r;
}
static inline int compare_kv_xy(BRT brt, DBT *k, DBT *v, DBT *x, DBT *y) {
int cmp = brt->compare_fun(brt->db, k, x);
if (cmp == 0 && v && y)
cmp = brt->dup_compare(brt->db, v, y);
return cmp;
}
static inline int compare_k_x(BRT brt, DBT *k, DBT *x) {
return brt->compare_fun(brt->db, k, x);
}
static inline int compare_v_y(BRT brt, DBT *v, DBT *y) {
return brt->dup_compare(brt->db, v, y);
}
static int brt_cursor_compare_one(brt_search_t *search, DBT *x, DBT *y) {
search = search; x = x; y = y;
return 1;
}
/* search for the kv pair that matches the search object and is equal to kv */
static int brt_cursor_search_eq_kv_xy(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval, TOKULOGGER logger) {
assert(cursor->prevkey.flags == DB_DBT_REALLOC);
assert(cursor->prevval.flags == DB_DBT_REALLOC);
brt_cursor_invalidate_callback(cursor->omtcursor, cursor);
int r = toku_brt_search(cursor->brt, search, &cursor->prevkey, &cursor->prevval, logger, cursor->omtcursor, &cursor->root_put_counter);
if (r == 0) {
if (compare_kv_xy(cursor->brt, search->k, search->v, &cursor->prevkey, &cursor->prevval) == 0) {
swap_cursor_dbts(cursor);
r = brt_cursor_copyout(cursor, outkey, outval);
} else {
r = DB_NOTFOUND;
}
}
return r;
}
static int brt_cursor_compare_set(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
return compare_kv_xy(brt, search->k, search->v, x, y) <= 0; /* return min xy: kv <= xy */
}
static int brt_cursor_current(BRT_CURSOR cursor, int op, DBT *outkey, DBT *outval, TOKULOGGER logger) {
if (brt_cursor_not_set(cursor))
return EINVAL;
if (op == DB_CURRENT) {
int r = ENOSYS;
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_REALLOC;
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_REALLOC;
brt_cursor_invalidate_callback(cursor->omtcursor, cursor);
brt_search_t search; brt_search_init(&search, brt_cursor_compare_set, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
r = toku_brt_search(cursor->brt, &search, &newkey, &newval, logger, cursor->omtcursor, &cursor->root_put_counter);
if (r != 0 || compare_kv_xy(cursor->brt, &cursor->key, &cursor->val, &newkey, &newval) != 0)
r = DB_KEYEMPTY;
dbt_cleanup(&newkey);
dbt_cleanup(&newval);
if (r!=0) return r;
}
return brt_cursor_copyout(cursor, outkey, outval);
}
static int brt_cursor_first(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_one, BRT_SEARCH_LEFT, 0, 0, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_last(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_one, BRT_SEARCH_RIGHT, 0, 0, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_compare_next(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
return compare_kv_xy(brt, search->k, search->v, x, y) < 0; /* return min xy: kv < xy */
}
static int brt_cursor_next_shortcut (BRT_CURSOR cursor, DBT *outkey, DBT *outval)
// Effect: If possible, increment the cursor and return the key-value pair
// (i.e., the next one from what the cursor pointed to before.)
// That is, do DB_NEXT on DUP databases, and do DB_NEXT_NODUP on NODUP databases.
{
if (toku_omt_cursor_is_valid(cursor->omtcursor)) {
{
u_int64_t h_counter = cursor->brt->h->root_put_counter;
if (h_counter != cursor->root_put_counter) return -1;
}
OMTVALUE le;
//Save current value in prev.
save_omtcursor_current_in_prev(cursor);
u_int32_t starting_index;
u_int32_t index;
u_int32_t size = toku_omt_size(toku_omt_cursor_get_omt(cursor->omtcursor));
int r = toku_omt_cursor_current_index(cursor->omtcursor, &starting_index);
assert(r==0);
index = starting_index;
while (index+1 < size) {
r = toku_omt_cursor_next(cursor->omtcursor, &le);
assert(r==0);
index++;
if (le_is_provdel(le)) continue;
//Free old current if necessary.
if (!cursor->current_in_omt) {
if (cursor->key.data) toku_free(cursor->key.data);
if (cursor->val.data) toku_free(cursor->val.data);
cursor->current_in_omt = TRUE;
}
return brt_cursor_copyout(cursor, outkey, outval);
}
toku_omt_cursor_set_index(cursor->omtcursor, starting_index);
toku_omt_cursor_invalidate(cursor->omtcursor);
}
return -1;
}
static int brt_cursor_next(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
if (0!=(cursor->brt->flags & TOKU_DB_DUP) &&
brt_cursor_next_shortcut(cursor, outkey, outval)==0)
return 0;
brt_search_t search; brt_search_init(&search, brt_cursor_compare_next, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_compare_next_nodup(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context; y = y;
return compare_k_x(brt, search->k, x) < 0; /* return min x: k < x */
}
static int brt_cursor_next_nodup(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
if (0==(cursor->brt->flags & TOKU_DB_DUP) &&
brt_cursor_next_shortcut(cursor, outkey, outval)==0)
return 0;
brt_search_t search; brt_search_init(&search, brt_cursor_compare_next_nodup, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_compare_next_dup(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
int keycmp = compare_k_x(brt, search->k, x);
if (keycmp < 0)
return 1;
else
return keycmp == 0 && y && compare_v_y(brt, search->v, y) < 0; /* return min xy: k <= x && v < y */
}
/* search for the kv pair that matches the search object and is equal to k */
static int brt_cursor_search_eq_k_x(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval, TOKULOGGER logger) {
assert(cursor->prevkey.flags == DB_DBT_REALLOC);
assert(cursor->prevval.flags == DB_DBT_REALLOC);
brt_cursor_invalidate_callback(cursor->omtcursor, cursor);
int r = toku_brt_search(cursor->brt, search, &cursor->prevkey, &cursor->prevval, logger, cursor->omtcursor, &cursor->root_put_counter);
if (r == 0) {
if (compare_k_x(cursor->brt, search->k, &cursor->prevkey) == 0) {
swap_cursor_dbts(cursor);
r = brt_cursor_copyout(cursor, outkey, outval);
} else
r = DB_NOTFOUND;
}
return r;
}
static int brt_cursor_next_dup(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_next_dup, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search_eq_k_x(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_compare_get_both_range(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
int keycmp = compare_k_x(brt, search->k, x);
if (keycmp < 0)
return 1;
else
return keycmp == 0 && (y == 0 || compare_v_y(brt, search->v, y) <= 0); /* return min xy: k <= x && v <= y */
}
static int brt_cursor_get_both_range(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_get_both_range, BRT_SEARCH_LEFT, key, val, cursor->brt);
return brt_cursor_search_eq_k_x(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_prev_shortcut (BRT_CURSOR cursor, DBT *outkey, DBT *outval)
// Effect: If possible, decrement the cursor and return the key-value pair
// (i.e., the previous one from what the cursor pointed to before.)
// That is, do DB_PREV on DUP databases, and do DB_PREV_NODUP on NODUP databases.
{
if (toku_omt_cursor_is_valid(cursor->omtcursor)) {
{
u_int64_t h_counter = cursor->brt->h->root_put_counter;
if (h_counter != cursor->root_put_counter) return -1;
}
OMTVALUE le;
//Save current value in prev.
save_omtcursor_current_in_prev(cursor);
u_int32_t starting_index = 0;
u_int32_t index;
int r = toku_omt_cursor_current_index(cursor->omtcursor, &starting_index);
assert(r==0);
index = starting_index;
while (index>0) {
r = toku_omt_cursor_prev(cursor->omtcursor, &le);
assert(r==0);
index--;
if (le_is_provdel(le)) continue;
//Free old current if necessary.
if (!cursor->current_in_omt) {
if (cursor->key.data) toku_free(cursor->key.data);
if (cursor->val.data) toku_free(cursor->val.data);
cursor->current_in_omt = TRUE;
}
return brt_cursor_copyout(cursor, outkey, outval);
}
toku_omt_cursor_set_index(cursor->omtcursor, starting_index);
toku_omt_cursor_invalidate(cursor->omtcursor);
}
return -1;
}
static int brt_cursor_compare_prev(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
return compare_kv_xy(brt, search->k, search->v, x, y) > 0; /* return max xy: kv > xy */
}
static int brt_cursor_prev(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
if (0!=(cursor->brt->flags & TOKU_DB_DUP) &&
brt_cursor_prev_shortcut(cursor, outkey, outval)==0)
return 0;
brt_search_t search; brt_search_init(&search, brt_cursor_compare_prev, BRT_SEARCH_RIGHT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_compare_prev_nodup(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context; y = y;
return compare_k_x(brt, search->k, x) > 0; /* return max x: k > x */
}
static int brt_cursor_prev_nodup(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
if (0==(cursor->brt->flags & TOKU_DB_DUP) &&
brt_cursor_prev_shortcut(cursor, outkey, outval)==0)
return 0;
brt_search_t search; brt_search_init(&search, brt_cursor_compare_prev_nodup, BRT_SEARCH_RIGHT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_compare_set_range(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
return compare_kv_xy(brt, search->k, search->v, x, y) <= 0; /* return kv <= xy */
}
static int brt_cursor_set(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_set_range, BRT_SEARCH_LEFT, key, val, cursor->brt);
return brt_cursor_search_eq_kv_xy(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_set_range(BRT_CURSOR cursor, DBT *key, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_set_range, BRT_SEARCH_LEFT, key, 0, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *key, DBT *val, int get_flags, TOKUTXN txn) {
int r;
int op = get_flags & DB_OPFLAGS_MASK;
TOKULOGGER logger = toku_txn_logger(txn);
if (get_flags & ~DB_OPFLAGS_MASK)
return EINVAL;
switch (op) {
case DB_CURRENT:
case DB_CURRENT_BINDING:
r = brt_cursor_current(cursor, op, key, val, logger);
break;
case DB_FIRST:
r = brt_cursor_first(cursor, key, val, logger);
break;
case DB_LAST:
r = brt_cursor_last(cursor, key, val, logger);
break;
case DB_NEXT:
if (brt_cursor_not_set(cursor))
r = brt_cursor_first(cursor, key, val, logger);
else
r = brt_cursor_next(cursor, key, val, logger);
break;
case DB_NEXT_DUP:
if (brt_cursor_not_set(cursor))
r = EINVAL;
else
r = brt_cursor_next_dup(cursor, key, val, logger);
break;
case DB_NEXT_NODUP:
if (brt_cursor_not_set(cursor))
r = brt_cursor_first(cursor, key, val, logger);
else
r = brt_cursor_next_nodup(cursor, key, val, logger);
break;
case DB_PREV:
if (brt_cursor_not_set(cursor))
r = brt_cursor_last(cursor, key, val, logger);
else
r = brt_cursor_prev(cursor, key, val, logger);
break;
#ifdef DB_PREV_DUP
case DB_PREV_DUP:
if (brt_cursor_not_set(cursor))
r = EINVAL;
else
r = brt_cursor_prev_dup(cursor, key, val, logger);
break;
#endif
case DB_PREV_NODUP:
if (brt_cursor_not_set(cursor))
r = brt_cursor_last(cursor, key, val, logger);
else
r = brt_cursor_prev_nodup(cursor, key, val, logger);
break;
case DB_SET:
r = brt_cursor_set(cursor, key, 0, 0, val, logger);
break;
case DB_SET_RANGE:
r = brt_cursor_set_range(cursor, key, key, val, logger);
break;
case DB_GET_BOTH:
r = brt_cursor_set(cursor, key, val, 0, 0, logger);
break;
case DB_GET_BOTH_RANGE:
r = brt_cursor_get_both_range(cursor, key, val, 0, val, logger);
break;
default:
r = EINVAL;
break;
}
return r;
}
/* ********************************* lookup **************************************/
int toku_brt_lookup (BRT brt, DBT *k, DBT *v) {
int r, rr;
BRT_CURSOR cursor;
rr = toku_brt_cursor(brt, &cursor, 1);
if (rr != 0) return rr;
int op = brt->flags & TOKU_DB_DUPSORT ? DB_GET_BOTH : DB_SET;
r = toku_brt_cursor_get(cursor, k, v, op, 0);
rr = toku_brt_cursor_close(cursor); assert(rr == 0);
return r;
}
/* ********************* debugging dump ************************ */
static int
toku_dump_brtnode (BRT brt, BLOCKNUM blocknum, int depth, bytevec lorange, ITEMLEN lolen, bytevec hirange, ITEMLEN hilen) {
int result=0;
BRTNODE node;
void *node_v;
u_int32_t fullhash = toku_cachetable_hash(brt->cf, blocknum);
int r = toku_cachetable_get_and_pin(brt->cf, blocknum, fullhash,
&node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt->h);
assert(r==0);
printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v);
node=node_v;
assert(node->fullhash==fullhash);
result=toku_verify_brtnode(brt, blocknum, lorange, lolen, hirange, hilen, 0);
printf("%*sNode=%p\n", depth, "", node);
if (node->height>0) {
printf("%*sNode %"PRId64" nodesize=%u height=%d n_children=%d n_bytes_in_buffers=%u keyrange=%s %s\n",
depth, "", blocknum.b, node->nodesize, node->height, node->u.n.n_children, node->u.n.n_bytes_in_buffers, (char*)lorange, (char*)hirange);
//printf("%s %s\n", lorange ? lorange : "NULL", hirange ? hirange : "NULL");
{
int i;
for (i=0; i< node->u.n.n_children; i++) {
printf("%*schild %d buffered (%d entries):\n", depth+1, "", i, toku_fifo_n_entries(BNC_BUFFER(node,i)));
FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, xid,
{
data=data; datalen=datalen; keylen=keylen;
printf("%*s xid=%"PRIu64" %u (type=%d)\n", depth+2, "", xid, ntohl(*(int*)key), type);
//assert(strlen((char*)key)+1==keylen);
//assert(strlen((char*)data)+1==datalen);
});
}
for (i=0; i<node->u.n.n_children; i++) {
printf("%*schild %d\n", depth, "", i);
if (i>0) {
printf("%*spivot %d len=%u %u\n", depth+1, "", i-1, node->u.n.childkeys[i-1]->keylen, ntohl(*(int*)&node->u.n.childkeys[i-1]->key));
}
toku_dump_brtnode(brt, BNC_BLOCKNUM(node, i), depth+4,
(i==0) ? lorange : node->u.n.childkeys[i-1],
(i==0) ? lolen : toku_brt_pivot_key_len(brt, node->u.n.childkeys[i-1]),
(i==node->u.n.n_children-1) ? hirange : node->u.n.childkeys[i],
(i==node->u.n.n_children-1) ? hilen : toku_brt_pivot_key_len(brt, node->u.n.childkeys[i])
);
}
}
} else {
printf("%*sNode %" PRId64 " nodesize=%u height=%d n_bytes_in_buffer=%u keyrange=%u %u\n",
depth, "", blocknum.b, node->nodesize, node->height, node->u.l.n_bytes_in_buffer, lorange ? ntohl(*(int*)lorange) : 0, hirange ? ntohl(*(int*)hirange) : 0);
int size = toku_omt_size(node->u.l.buffer);
int i;
for (i=0; i<size; i++) {
OMTVALUE v;
r = toku_omt_fetch(node->u.l.buffer, i, &v, 0);
assert(r==0);
printf(" [%d]=", i);
print_leafentry(stdout, v);
}
// printf(" (%d)%u ", len, *(int*)le_any_key(data)));
printf("\n");
}
r = toku_cachetable_unpin(brt->cf, blocknum, fullhash, 0, 0);
assert(r==0);
return result;
}
int toku_dump_brt (BRT brt) {
CACHEKEY *rootp;
assert(brt->h);
u_int32_t fullhash;
rootp = toku_calculate_root_offset_pointer(brt, &fullhash);
return toku_dump_brtnode(brt, *rootp, 0, 0, 0, 0, 0);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment