Commit 2b4da5c0 authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel

[t:3315] merge indexed buffers work into mainline

git-svn-id: file:///svn/toku/tokudb@33979 c7de825b-a66e-492c-adef-691d508d4ae1
parent b246fcf4
......@@ -67,6 +67,7 @@ BRT_SOURCES = \
recover \
roll \
rollback \
sort \
sub_block \
ule \
threadpool \
......
......@@ -107,7 +107,8 @@ struct brtnode_fetch_extra {
// used in the case where type == brtnode_fetch_subset
// parameters needed to find out which child needs to be decompressed (so it can be read)
brt_search_t* search;
BRT brt;
DB *cmp_extra;
brt_compare_func cmp;
DBT *range_lock_left_key, *range_lock_right_key;
BOOL left_is_neg_infty, right_is_pos_infty;
// this value will be set during the fetch_callback call by toku_brtnode_fetch_callback or toku_brtnode_pf_req_callback
......@@ -121,11 +122,12 @@ struct brtnode_fetch_extra {
// necessary. Used in cases where the entire node
// is required, such as for flushes.
//
static inline void fill_bfe_for_full_read(struct brtnode_fetch_extra *bfe, struct brt_header *h) {
static inline void fill_bfe_for_full_read(struct brtnode_fetch_extra *bfe, struct brt_header *h, DB *cmp_extra, brt_compare_func cmp) {
bfe->type = brtnode_fetch_all;
bfe->h = h;
bfe->search = NULL;
bfe->brt = NULL;
bfe->cmp_extra = cmp_extra;
bfe->cmp = cmp;
bfe->range_lock_left_key = NULL;
bfe->range_lock_right_key = NULL;
bfe->left_is_neg_infty = FALSE;
......@@ -133,7 +135,7 @@ static inline void fill_bfe_for_full_read(struct brtnode_fetch_extra *bfe, struc
bfe->child_to_read = -1;
}
static inline void fill_bfe_for_prefetch(struct brtnode_fetch_extra *bfe, struct brt_header *h, BRT brt, BRT_CURSOR c);
static inline void fill_bfe_for_prefetch(struct brtnode_fetch_extra *bfe, struct brt_header *h, DB *cmp_extra, brt_compare_func cmp, BRT_CURSOR c);
//
// Helper function to fill a brtnode_fetch_extra with data
......@@ -144,7 +146,8 @@ static inline void fill_bfe_for_prefetch(struct brtnode_fetch_extra *bfe, struct
static inline void fill_bfe_for_subset_read(
struct brtnode_fetch_extra *bfe,
struct brt_header *h,
BRT brt,
DB *cmp_extra,
brt_compare_func cmp,
brt_search_t* search,
DBT *left,
DBT *right,
......@@ -155,7 +158,8 @@ static inline void fill_bfe_for_subset_read(
bfe->type = brtnode_fetch_subset;
bfe->h = h;
bfe->search = search;
bfe->brt = brt;
bfe->cmp_extra = cmp_extra;
bfe->cmp = cmp;
bfe->range_lock_left_key = (left->data ? left : NULL);
bfe->range_lock_right_key = (right->data ? right : NULL);
bfe->left_is_neg_infty = left_is_neg_infty;
......@@ -169,11 +173,12 @@ static inline void fill_bfe_for_subset_read(
// necessary, only the pivots and/or subtree estimates.
// Currently used for stat64.
//
static inline void fill_bfe_for_min_read(struct brtnode_fetch_extra *bfe, struct brt_header *h) {
static inline void fill_bfe_for_min_read(struct brtnode_fetch_extra *bfe, struct brt_header *h, DB *cmp_extra, brt_compare_func cmp) {
bfe->type = brtnode_fetch_none;
bfe->h = h;
bfe->search = NULL;
bfe->brt = NULL;
bfe->cmp_extra = cmp_extra;
bfe->cmp = cmp;
bfe->range_lock_left_key = NULL;
bfe->range_lock_right_key = NULL;
bfe->left_is_neg_infty = FALSE;
......@@ -197,9 +202,35 @@ static inline void destroy_bfe_for_prefetch(struct brtnode_fetch_extra *bfe) {
}
}
struct toku_fifo_entry_key_msn_heaviside_extra {
DB *cmp_extra;
brt_compare_func cmp;
FIFO fifo;
bytevec key;
ITEMLEN keylen;
MSN msn;
};
// comparison function for inserting messages into a
// brtnode_nonleaf_childinfo's message_tree
int
toku_fifo_entry_key_msn_heaviside(OMTVALUE v, void *extrap);
struct toku_fifo_entry_key_msn_cmp_extra {
DB *cmp_extra;
brt_compare_func cmp;
FIFO fifo;
};
// same thing for qsort_r
int
toku_fifo_entry_key_msn_cmp(void *extrap, const void *ap, const void *bp);
// data of an available partition of a nonleaf brtnode
struct brtnode_nonleaf_childinfo {
FIFO buffer;
OMT broadcast_buffer;
OMT message_tree;
unsigned int n_bytes_in_buffer; /* How many bytes are in each buffer (including overheads for the disk-representation) */
};
......@@ -210,7 +241,6 @@ struct brtnode_leaf_basement_node {
unsigned int n_bytes_in_buffer; /* How many bytes to represent the OMT (including the per-key overheads, but not including the overheads for the node. */
unsigned int seqinsert; /* number of sequential inserts to this leaf */
MSN max_msn_applied; // max message sequence number applied
DSN max_dsn_applied; // max deserialization sequence number applied
};
#define PT_INVALID 0
......@@ -277,7 +307,6 @@ struct __attribute__((__packed__)) brtnode_partition {
struct brtnode {
MSN max_msn_applied_to_node_on_disk; // max_msn_applied that will be written to disk
DSN dsn; // deserialization sequence number
unsigned int nodesize;
unsigned int flags;
BLOCKNUM thisnodename; // Which block number is this node?
......@@ -374,6 +403,8 @@ static inline void set_BSB(BRTNODE node, int i, SUB_BLOCK sb) {
// macros for brtnode_nonleaf_childinfo
#define BNC_BUFFER(node,i) (BNC(node,i)->buffer)
#define BNC_BROADCAST_BUFFER(node,i) (BNC(node,i)->broadcast_buffer)
#define BNC_MESSAGE_TREE(node, i) (BNC(node,i)->message_tree)
#define BNC_NBYTESINBUF(node,i) (BNC(node,i)->n_bytes_in_buffer)
// brtnode leaf basementnode macros,
......@@ -443,8 +474,6 @@ struct brt_header {
struct toku_list live_brts;
struct toku_list zombie_brts;
struct toku_list checkpoint_before_commit_link;
DSN curr_dsn;
};
struct brt {
......@@ -488,7 +517,7 @@ int toku_serialize_rollback_log_to (int fd, BLOCKNUM blocknum, ROLLBACK_LOG_NODE
BOOL for_checkpoint);
int toku_deserialize_rollback_log_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash, ROLLBACK_LOG_NODE *logp, struct brt_header *h);
void toku_deserialize_bp_from_disk(BRTNODE node, int childnum, int fd, struct brtnode_fetch_extra* bfe);
void toku_deserialize_bp_from_compressed(BRTNODE node, int childnum);
void toku_deserialize_bp_from_compressed(BRTNODE node, int childnum, DB *cmp_extra, brt_compare_func cmp);
int toku_deserialize_brtnode_from (int fd, BLOCKNUM off, u_int32_t /*fullhash*/, BRTNODE *brtnode, struct brtnode_fetch_extra* bfe);
unsigned int toku_serialize_brtnode_size(BRTNODE node); /* How much space will it take? */
int toku_keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2len);
......@@ -514,7 +543,7 @@ void toku_assert_entire_node_in_memory(BRTNODE node);
void toku_brt_nonleaf_append_child(BRTNODE node, BRTNODE child, struct kv_pair *pivotkey, size_t pivotkeysize);
// append a cmd to a nonleaf node child buffer
void toku_brt_append_to_child_buffer(BRTNODE node, int childnum, int type, MSN msn, XIDS xids, const DBT *key, const DBT *val);
void toku_brt_append_to_child_buffer(BRT brt, BRTNODE node, int childnum, int type, MSN msn, XIDS xids, const DBT *key, const DBT *val);
#if 1
#define DEADBEEF ((void*)0xDEADBEEF)
......@@ -568,11 +597,16 @@ struct brt_cursor {
};
// this is in a strange place because it needs the cursor struct to be defined
static inline void fill_bfe_for_prefetch(struct brtnode_fetch_extra *bfe, struct brt_header *h, BRT brt, BRT_CURSOR c) {
static inline void fill_bfe_for_prefetch(struct brtnode_fetch_extra *bfe,
struct brt_header *h,
DB *cmp_extra,
brt_compare_func cmp,
BRT_CURSOR c) {
bfe->type = brtnode_fetch_prefetch;
bfe->h = h;
bfe->search = NULL;
bfe->brt = brt;
bfe->cmp_extra = cmp_extra;
bfe->cmp = cmp;
{
const DBT *left = &c->range_lock_left_key;
const DBT *right = &c->range_lock_right_key;
......@@ -607,7 +641,8 @@ struct pivot_bounds {
int
toku_brt_search_which_child(
BRT brt,
DB *cmp_extra,
brt_compare_func cmp,
BRTNODE node,
brt_search_t *search
);
......@@ -645,7 +680,8 @@ void toku_pin_brtnode_holding_lock (BRT brt, BLOCKNUM blocknum, u_int32_t fullha
struct brtnode_fetch_extra *bfe,
BRTNODE *node_p);
void toku_unpin_brtnode (BRT brt, BRTNODE node);
unsigned int toku_brtnode_which_child (BRTNODE node , const DBT *k, BRT t)
unsigned int toku_brtnode_which_child(BRTNODE node, const DBT *k,
DB *cmp_extra, brt_compare_func cmp)
__attribute__((__warn_unused_result__));
/* Stuff for testing */
......
This diff is collapsed.
......@@ -78,7 +78,7 @@ int toku_testsetup_get_sersize(BRT brt, BLOCKNUM diskoff) // Return the size on
assert(testsetup_initialized);
void *node_v;
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, brt->h);
fill_bfe_for_full_read(&bfe, brt->h, brt->db, brt->compare_fun);
int r = toku_cachetable_get_and_pin(
brt->cf, diskoff,
toku_cachetable_hash(brt->cf, diskoff),
......@@ -105,7 +105,7 @@ int toku_testsetup_insert_to_leaf (BRT brt, BLOCKNUM blocknum, char *key, int ke
assert(testsetup_initialized);
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, brt->h);
fill_bfe_for_full_read(&bfe, brt->h, brt->db, brt->compare_fun);
r = toku_cachetable_get_and_pin(
brt->cf,
blocknum,
......@@ -176,7 +176,7 @@ int toku_testsetup_insert_to_nonleaf (BRT brt, BLOCKNUM blocknum, enum brt_msg_t
assert(testsetup_initialized);
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, brt->h);
fill_bfe_for_full_read(&bfe, brt->h, brt->db, brt->compare_fun);
r = toku_cachetable_get_and_pin(
brt->cf,
blocknum,
......@@ -198,11 +198,11 @@ int toku_testsetup_insert_to_nonleaf (BRT brt, BLOCKNUM blocknum, enum brt_msg_t
DBT k;
int childnum = toku_brtnode_which_child(node,
toku_fill_dbt(&k, key, keylen),
brt);
brt->db, brt->compare_fun);
XIDS xids_0 = xids_get_root_xids();
MSN msn = next_dummymsn();
r = toku_fifo_enq(BNC_BUFFER(node, childnum), key, keylen, val, vallen, cmdtype, msn, xids_0);
r = toku_fifo_enq(BNC_BUFFER(node, childnum), key, keylen, val, vallen, cmdtype, msn, xids_0, NULL);
assert(r==0);
// Hack to get the test working. The problem is that this test
// is directly queueing something in a FIFO instead of
......
......@@ -114,7 +114,7 @@ toku_verify_brtnode (BRT brt,
u_int32_t fullhash = toku_cachetable_hash(brt->cf, blocknum);
{
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, brt->h);
fill_bfe_for_full_read(&bfe, brt->h, brt->db, brt->compare_fun);
int r = toku_cachetable_get_and_pin(
brt->cf,
blocknum,
......
This diff is collapsed.
......@@ -120,7 +120,7 @@ static void
dump_node (int f, BLOCKNUM blocknum, struct brt_header *h) {
BRTNODE n;
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, h);
fill_bfe_for_full_read(&bfe, h, NULL, NULL);
int r = toku_deserialize_brtnode_from (f, blocknum, 0 /*pass zero for hash, it doesn't matter*/, &n, &bfe);
assert(r==0);
assert(n!=0);
......@@ -230,7 +230,7 @@ fragmentation_helper(BLOCKNUM b, int64_t size, int64_t UU(address), void *extra)
frag_help_extra *info = extra;
BRTNODE n;
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, info->h);
fill_bfe_for_full_read(&bfe, info->h, NULL, NULL);
int r = toku_deserialize_brtnode_from(info->f, b, 0 /*pass zero for hash, it doesn't matter*/, &n, &bfe);
if (r==0) {
info->blocksizes += size;
......
......@@ -11,6 +11,7 @@
#endif
#define _FILE_OFFSET_BITS 64
#include "toku_assert.h"
#include <db.h>
#include <inttypes.h>
......@@ -60,11 +61,6 @@ typedef struct __toku_msn { u_int64_t msn; } MSN;
#define MIN_MSN ((MSN){(u_int64_t)1000*1000*1000}) // first 1B values reserved for messages created before Dr. No (for upgrade)
#define MAX_MSN ((MSN){UINT64_MAX})
typedef struct __toku_dsn { int64_t dsn; } DSN; // DESERIALIZATION sequence number
#define INVALID_DSN ((DSN){-1})
#define MIN_DSN ((DSN){0})
#define MAX_DSN ((DSN){INT64_MAX})
/* At the brt layer, a FILENUM uniquely identifies an open file.
* At the ydb layer, a DICTIONARY_ID uniquely identifies an open dictionary.
* With the introduction of the loader (ticket 2216), it is possible for the file that holds
......@@ -123,6 +119,68 @@ enum brt_msg_type {
BRT_UPDATE_BROADCAST_ALL = 15
};
static inline BOOL
brt_msg_type_applies_once(enum brt_msg_type type)
{
BOOL ret_val;
switch (type) {
case BRT_INSERT_NO_OVERWRITE:
case BRT_INSERT:
case BRT_DELETE_ANY:
case BRT_ABORT_ANY:
case BRT_COMMIT_ANY:
case BRT_UPDATE:
ret_val = TRUE;
break;
case BRT_COMMIT_BROADCAST_ALL:
case BRT_COMMIT_BROADCAST_TXN:
case BRT_ABORT_BROADCAST_TXN:
case BRT_OPTIMIZE:
case BRT_OPTIMIZE_FOR_UPGRADE:
case BRT_UPDATE_BROADCAST_ALL:
case BRT_NONE:
ret_val = FALSE;
break;
default:
assert(FALSE);
}
return ret_val;
}
static inline BOOL
brt_msg_type_applies_all(enum brt_msg_type type)
{
BOOL ret_val;
switch (type) {
case BRT_NONE:
case BRT_INSERT_NO_OVERWRITE:
case BRT_INSERT:
case BRT_DELETE_ANY:
case BRT_ABORT_ANY:
case BRT_COMMIT_ANY:
case BRT_UPDATE:
ret_val = FALSE;
break;
case BRT_COMMIT_BROADCAST_ALL:
case BRT_COMMIT_BROADCAST_TXN:
case BRT_ABORT_BROADCAST_TXN:
case BRT_OPTIMIZE:
case BRT_OPTIMIZE_FOR_UPGRADE:
case BRT_UPDATE_BROADCAST_ALL:
ret_val = TRUE;
break;
default:
assert(FALSE);
}
return ret_val;
}
static inline BOOL
brt_msg_type_does_nothing(enum brt_msg_type type)
{
return (type == BRT_NONE);
}
typedef struct xids_t *XIDS;
typedef struct fifo_msg_t *FIFO_MSG;
/* tree commands */
......
......@@ -42,7 +42,6 @@ static u_int64_t cachetable_puts; // how many times has a newly created
static u_int64_t cachetable_prefetches; // how many times has a block been prefetched into the cachetable?
static u_int64_t cachetable_maybe_get_and_pins; // how many times has maybe_get_and_pin(_clean) been called?
static u_int64_t cachetable_maybe_get_and_pin_hits; // how many times has get_and_pin(_clean) returned with a node?
static u_int64_t cachetable_get_and_pin_if_in_memorys; // how many times has get_and_pin_if_in_memorys been called?
static u_int64_t cachetable_wait_checkpoint; // number of times get_and_pin waits for a node to be written for a checkpoint
static u_int64_t cachetable_misstime; // time spent waiting for disk read
static u_int64_t cachetable_waittime; // time spent waiting for another thread to release lock (e.g. prefetch, writing)
......@@ -1733,42 +1732,6 @@ int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int3
return r;
}
int toku_cachetable_get_and_pin_if_in_memory (CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, void**value)
// Effect: Lookup a key in the cachetable. If it is found then acquire a read lock on the pair, don't update the LRU list, and return success.
// Unlike toku_cachetable_maybe_get_and_pin, which gives up if there is any blocking (e.g., the node is waiting to be checkpointing), this
// version waits.
// Rationale: orthodox pushing needs to get the in-memory state right.
// Don't update the LRU list because we don't want this operation to cause something to stick in memory longer.
{
CACHETABLE ct = cachefile->cachetable;
PAIR p;
int count = 0;
int r = -1;
cachetable_lock(ct);
cachetable_get_and_pin_if_in_memorys++;
for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
count++;
if (p->key.b==key.b && p->cachefile==cachefile) {
// It's the right block. Now we must wait.
if (p->checkpoint_pending) {
write_pair_for_checkpoint(ct, p, FALSE);
}
rwlock_read_lock(&p->rwlock, ct->mutex);
if (p->state == CTPAIR_INVALID) {
assert(0); // This is the branch that returns ENODEV in the get_and_pin code in the 5.0 branch. Let's just crash now.
}
// do not increment PAIR's clock count.
*value = p->value;
cachetable_hit++;
r = 0;
break;
}
}
note_hash_count(count);
cachetable_unlock(ct);
return r;
}
//Used by shortcut query path.
//Same as toku_cachetable_maybe_get_and_pin except that we don't care if the node is clean or dirty (return the node regardless).
//All other conditions remain the same.
......@@ -2955,7 +2918,6 @@ void toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS s) {
s->prefetches = cachetable_prefetches;
s->maybe_get_and_pins = cachetable_maybe_get_and_pins;
s->maybe_get_and_pin_hits = cachetable_maybe_get_and_pin_hits;
s->get_and_pin_if_in_memorys = cachetable_get_and_pin_if_in_memorys;
s->size_current = ct->size_current;
s->size_limit = ct->size_limit;
s->size_max = ct->size_max;
......
......@@ -232,12 +232,6 @@ int toku_cachetable_maybe_get_and_pin (CACHEFILE, CACHEKEY, u_int32_t /*fullhash
// Returns: If the the item is already in memory, then return 0 and store it in the
// void**. If the item is not in memory, then return a nonzero error number.
int toku_cachetable_get_and_pin_if_in_memory (CACHEFILE /*cachefile*/, CACHEKEY /*key*/, u_int32_t /*fullhash*/, void**/*value*/);
// Effect: Get and pin an object if it is in memory, (even if doing so would require blocking, e.g., to wait on a checkpoint).
// This is similar to maybe_get_and_pin except that maybe_get_and_pin won't block waiting on a checkpoint.
// Returns: 0 iff the item is in memory (otherwise return a error)
// Modifies: *value (if returning 0, then the pointer to the value is stored in *value.
int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE, CACHEKEY, u_int32_t /*fullhash*/, void**);
// Effect: Like maybe get and pin, but may pin a clean pair.
......@@ -403,7 +397,6 @@ typedef struct cachetable_status {
u_int64_t prefetches; // how many times has a block been prefetched into the cachetable?
u_int64_t maybe_get_and_pins; // how many times has maybe_get_and_pin(_clean) been called?
u_int64_t maybe_get_and_pin_hits; // how many times has maybe_get_and_pin(_clean) returned with a node?
u_int64_t get_and_pin_if_in_memorys; // how many times has get_and_pin_if_in_memory been called?
int64_t size_current; // the sum of the sizes of the nodes represented in the cachetable
int64_t size_limit; // the limit to the sum of the node sizes
int64_t size_max; // high water mark of size_current (max value size_current ever had)
......
......@@ -69,7 +69,7 @@ void toku_fifo_size_hint(FIFO fifo, size_t size) {
}
}
int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *data, unsigned int datalen, int type, MSN msn, XIDS xids) {
int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *data, unsigned int datalen, int type, MSN msn, XIDS xids, long *dest) {
int need_space_here = sizeof(struct fifo_entry)
+ keylen + datalen
+ xids_get_size(xids)
......@@ -90,12 +90,14 @@ int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *d
if (newmem==0) return ENOMEM;
memcpy(newmem, oldmem+fifo->memory_start, fifo->memory_used);
fifo->memory_size = next_2;
assert(fifo->memory_start == 0);
fifo->memory_start = 0;
fifo->memory = newmem;
toku_free(oldmem);
} else {
// slide things over
memmove(fifo->memory, fifo->memory+fifo->memory_start, fifo->memory_used);
assert(fifo->memory_start == 0);
fifo->memory_start = 0;
}
}
......@@ -108,13 +110,17 @@ int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *d
memcpy(e_key, key, keylen);
entry->vallen = datalen;
memcpy(e_key + keylen, data, datalen);
if (dest) {
assert(fifo->memory_start == 0);
*dest = fifo->memory_used;
}
fifo->n_items_in_fifo++;
fifo->memory_used += need_space_here;
return 0;
}
int toku_fifo_enq_cmdstruct (FIFO fifo, const BRT_MSG cmd) {
return toku_fifo_enq(fifo, cmd->u.id.key->data, cmd->u.id.key->size, cmd->u.id.val->data, cmd->u.id.val->size, cmd->type, cmd->msn, cmd->xids);
int toku_fifo_enq_cmdstruct (FIFO fifo, const BRT_MSG cmd, long *dest) {
return toku_fifo_enq(fifo, cmd->u.id.key->data, cmd->u.id.key->size, cmd->u.id.val->data, cmd->u.id.val->size, cmd->type, cmd->msn, cmd->xids, dest);
}
/* peek at the head (the oldest entry) of the fifo */
......@@ -193,3 +199,10 @@ unsigned long toku_fifo_memory_size(FIFO fifo) {
return sizeof(*fifo)+fifo->memory_size;
}
DBT *fill_dbt_for_fifo_entry(DBT *dbt, const struct fifo_entry *entry) {
return toku_fill_dbt(dbt, xids_get_end_of_array((XIDS) &entry->xids_s), entry->keylen);
}
const struct fifo_entry *toku_fifo_get_entry(FIFO fifo, long off) {
return toku_fifo_iterate_internal_get_entry(fifo, off);
}
......@@ -44,9 +44,9 @@ void toku_fifo_size_is_stabilized(FIFO);
int toku_fifo_n_entries(FIFO);
int toku_fifo_enq_cmdstruct (FIFO fifo, const BRT_MSG cmd);
int toku_fifo_enq_cmdstruct (FIFO fifo, const BRT_MSG cmd, long *dest);
int toku_fifo_enq (FIFO, const void *key, ITEMLEN keylen, const void *data, ITEMLEN datalen, int type, MSN msn, XIDS xids);
int toku_fifo_enq (FIFO, const void *key, ITEMLEN keylen, const void *data, ITEMLEN datalen, int type, MSN msn, XIDS xids, long *dest);
int toku_fifo_peek (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, u_int32_t *type, MSN *msn, XIDS *xids);
......@@ -81,6 +81,9 @@ int toku_fifo_iterate_internal_has_more(FIFO fifo, int off);
int toku_fifo_iterate_internal_next(FIFO fifo, int off);
struct fifo_entry * toku_fifo_iterate_internal_get_entry(FIFO fifo, int off);
DBT *fill_dbt_for_fifo_entry(DBT *dbt, const struct fifo_entry *entry);
const struct fifo_entry *toku_fifo_get_entry(FIFO fifo, long off);
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007-2011 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "includes.h"
#include "sort.h"
#if defined(HAVE_CILK)
#include <cilk/cilk.h>
#define cilk_worker_count (__cilkrts_get_nworkers())
#else
#define cilk_spawn
#define cilk_sync
#define cilk_for for
#define cilk_worker_count 1
#endif
static int
merge_c(void *vdest, void *va, int an, void *vb, int bn, int width,
void *extra, int (*cmp)(void *, const void *, const void *))
{
char *dest = vdest, *a = va, *b = vb;
while (an > 0 && bn > 0) {
int c = cmp(extra, a, b);
if (c < 0) {
memcpy(dest, a, width);
dest+=width; a+=width; an--;
} else {
memcpy(dest, b, width);
dest+=width; b+=width; bn--;
}
}
if (an > 0) {
memcpy(dest, a, an * width);
}
if (bn > 0) {
memcpy(dest, b, bn * width);
}
return 0;
}
static int
binsearch(void *key, void *va, int n, int abefore, int width,
void *extra, int (*cmp)(void *, const void *, const void *))
{
if (n == 0) {
return abefore;
}
char *a = va;
int mid = n / 2;
void *akey = a + mid * width;
int c = cmp(extra, key, akey);
if (c == 0) {
// this won't happen because msns are unique, but is here for completeness
return abefore + mid;
} else if (c < 0) {
if (n == 1) {
return abefore;
} else {
return binsearch(key, a, mid, abefore, width, extra, cmp);
}
} else {
if (n == 1) {
return abefore + 1;
} else {
return binsearch(key, a+mid*width, n-mid, abefore+mid, width, extra, cmp);
}
}
}
static int
merge(void *vdest, void *va, int an, void *vb, int bn, int width,
void *extra, int (*cmp)(void *, const void *, const void *))
{
if (an + bn < 10000) {
return merge_c(vdest, va, an, vb, bn, width, extra, cmp);
}
char *dest = vdest, *a = va, *b = vb;
if (an < bn) {
char *tmp1 = a; a = b; b = tmp1;
int tmp2 = an; an = bn; bn = tmp2;
}
int a2 = an/2;
void *akey = a + a2 * width;
int b2 = binsearch(akey, b, bn, 0, width, extra, cmp);
int ra, rb;
ra = cilk_spawn merge(dest, a, a2, b, b2, width, extra, cmp);
rb = merge(dest+(a2+b2)*width, a+a2*width, an-a2, b+b2*width, bn-b2, width, extra, cmp);
cilk_sync;
if (ra != 0) return ra;
return rb;
}
int
mergesort_r(void *va, int n, int width,
void *extra, int (*cmp)(void *, const void *, const void *))
{
const BOOL use_cilk = (n > 10000);
if (n <= 1) { return 0; }
unsigned char *a = va;
int mid = n/2;
int r1, r2;
if (use_cilk) {
r1 = cilk_spawn mergesort_r(a, mid, width, extra, cmp);
} else {
r1 = mergesort_r(a, mid, width, extra, cmp);
}
r2 = mergesort_r(a+mid*width, n-mid, width, extra, cmp);
cilk_sync;
if (r1 != 0) return r1;
if (r2 != 0) return r2;
void *tmp = toku_xmalloc(n * width);
int r;
if (use_cilk) {
r = merge(tmp, a, mid, a+mid*width, n-mid, width, extra, cmp);
} else {
r = merge_c(tmp, a, mid, a+mid*width, n-mid, width, extra, cmp);
}
if (r != 0) {
toku_free(tmp);
return r;
}
memcpy(a, tmp, n*width);
toku_free(tmp);
return 0;
}
/* -*- mode: C; c-basic-offset: 4 -*- */
#ifndef SORT_H
#define SORT_H
#ident "$Id$"
#ident "Copyright (c) 2007-2011 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
// apes qsort_r which is not available in centos 5's version of libc
// is parallelized with cilk, therefore probably faster than qsort_r on large arrays
// TODO: switch to qsort_r for small arrays (at the bottom of the recursion)
// this requires figuring out what to do about libc
//
// a: array of elements
// n: number of elements
// width: size of each element in bytes
// extra: extra data for comparison function (passed in as first parameter)
// cmp: comparison function, compatible with qsort_r
//
// Returns 0 on success.
int
mergesort_r(void *a, int n, int width,
void *extra, int (*cmp)(void *, const void *, const void *));
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
......@@ -84,18 +84,25 @@ enum brtnode_verify_type {
read_none
};
static int
string_key_cmp(DB *UU(e), const DBT *a, const DBT *b)
{
char *s = a->data, *t = b->data;
return strcmp(s, t);
}
static void
setup_dn(enum brtnode_verify_type bft, int fd, struct brt_header *brt_h, BRTNODE *dn) {
int r;
if (bft == read_all) {
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, brt_h);
fill_bfe_for_full_read(&bfe, brt_h, NULL, string_key_cmp);
r = toku_deserialize_brtnode_from(fd, make_blocknum(20), 0/*pass zero for hash*/, dn, &bfe);
assert(r==0);
}
else if (bft == read_compressed || bft == read_none) {
struct brtnode_fetch_extra bfe;
fill_bfe_for_min_read(&bfe, brt_h);
fill_bfe_for_min_read(&bfe, brt_h, NULL, string_key_cmp);
r = toku_deserialize_brtnode_from(fd, make_blocknum(20), 0/*pass zero for hash*/, dn, &bfe);
assert(r==0);
// assert all bp's are compressed
......@@ -118,7 +125,7 @@ setup_dn(enum brtnode_verify_type bft, int fd, struct brt_header *brt_h, BRTNODE
}
}
// now decompress them
fill_bfe_for_full_read(&bfe, brt_h);
fill_bfe_for_full_read(&bfe, brt_h, NULL, string_key_cmp);
assert(toku_brtnode_pf_req_callback(*dn, &bfe));
long size;
r = toku_brtnode_pf_callback(*dn, &bfe, fd, &size);
......@@ -1067,9 +1074,9 @@ test_serialize_nonleaf(enum brtnode_verify_type bft) {
r = xids_create_child(xids_123, &xids_234, (TXNID)234);
CKERR(r);
r = toku_fifo_enq(BNC_BUFFER(&sn,0), "a", 2, "aval", 5, BRT_NONE, next_dummymsn(), xids_0); assert(r==0);
r = toku_fifo_enq(BNC_BUFFER(&sn,0), "b", 2, "bval", 5, BRT_NONE, next_dummymsn(), xids_123); assert(r==0);
r = toku_fifo_enq(BNC_BUFFER(&sn,1), "x", 2, "xval", 5, BRT_NONE, next_dummymsn(), xids_234); assert(r==0);
r = toku_fifo_enq(BNC_BUFFER(&sn,0), "a", 2, "aval", 5, BRT_NONE, next_dummymsn(), xids_0, NULL); assert(r==0);
r = toku_fifo_enq(BNC_BUFFER(&sn,0), "b", 2, "bval", 5, BRT_NONE, next_dummymsn(), xids_123, NULL); assert(r==0);
r = toku_fifo_enq(BNC_BUFFER(&sn,1), "x", 2, "xval", 5, BRT_NONE, next_dummymsn(), xids_234, NULL); assert(r==0);
BNC_NBYTESINBUF(&sn, 0) = 2*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5) + xids_get_serialize_size(xids_0) + xids_get_serialize_size(xids_123);
BNC_NBYTESINBUF(&sn, 1) = 1*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5) + xids_get_serialize_size(xids_234);
//Cleanup:
......
......@@ -154,7 +154,7 @@ again:
for (i=0; i<my_n_present; i++) {
void *v;
u_int32_t fullhash = toku_cachetable_hash(my_present_items[i].cf, my_present_items[i].key);
int r=toku_cachetable_get_and_pin_if_in_memory(my_present_items[i].cf,
int r=toku_cachetable_maybe_get_and_pin_clean(my_present_items[i].cf,
my_present_items[i].key,
toku_cachetable_hash(my_present_items[i].cf, my_present_items[i].key),
&v);
......
......@@ -57,7 +57,7 @@ test_fifo_enq (int n) {
MSN msn = next_dummymsn();
if (startmsn.msn == ZERO_MSN.msn)
startmsn = msn;
r = toku_fifo_enq(f, thekey, thekeylen, theval, thevallen, i, msn, xids); assert(r == 0);
r = toku_fifo_enq(f, thekey, thekeylen, theval, thevallen, i, msn, xids, NULL); assert(r == 0);
xids_destroy(&xids);
}
......
......@@ -58,13 +58,13 @@ populate_leaf(BRTNODE leafnode, int seq, int n, int *minkey, int *maxkey) {
}
static void
insert_into_child_buffer(BRTNODE node, int childnum, int minkey, int maxkey) {
insert_into_child_buffer(BRT brt, BRTNODE node, int childnum, int minkey, int maxkey) {
for (unsigned int val = htonl(minkey); val <= htonl(maxkey); val++) {
MSN msn = next_dummymsn();
unsigned int key = htonl(val);
DBT thekey; toku_fill_dbt(&thekey, &key, sizeof key);
DBT theval; toku_fill_dbt(&theval, &val, sizeof val);
toku_brt_append_to_child_buffer(node, childnum, BRT_INSERT, msn, xids_get_root_xids(), &thekey, &theval);
toku_brt_append_to_child_buffer(brt, node, childnum, BRT_INSERT, msn, xids_get_root_xids(), &thekey, &theval);
node->max_msn_applied_to_node_on_disk = msn;
}
}
......@@ -89,7 +89,7 @@ make_tree(BRT brt, int height, int fanout, int nperleaf, int *seq, int *minkey,
toku_brt_nonleaf_append_child(node, child, pivotkey, sizeof k);
}
toku_unpin_brtnode(brt, child);
insert_into_child_buffer(node, childnum, minkeys[childnum], maxkeys[childnum]);
insert_into_child_buffer(brt, node, childnum, minkeys[childnum], maxkeys[childnum]);
}
*minkey = minkeys[0];
*maxkey = maxkeys[0];
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007-2011 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "test.h"
#include <stdlib.h>
#include "sort.h"
const int MAX_NUM = 0x0fffffffL;
int MAGIC_EXTRA = 0xd3adb00f;
static int
int_cmp(void *ve, const void *va, const void *vb)
{
int *e = ve;
assert(e);
assert(*e == MAGIC_EXTRA);
const int *a = va, *b = vb;
assert(*a < MAX_NUM);
assert(*b < MAX_NUM);
return (*a > *b) - (*a < *b);
}
static void
check_int_array(int a[], int nelts)
{
assert(a[0] < MAX_NUM);
for (int i = 1; i < nelts; ++i) {
assert(a[i] < MAX_NUM);
assert(a[i-1] <= a[i]);
}
}
static void
zero_array_test(void)
{
mergesort_r(NULL, 0, sizeof(int), NULL, int_cmp);
}
static void
already_sorted_test(int nelts)
{
int *MALLOC_N(nelts, a);
for (int i = 0; i < nelts; ++i) {
a[i] = i;
}
mergesort_r(a, nelts, sizeof a[0], &MAGIC_EXTRA, int_cmp);
check_int_array(a, nelts);
toku_free(a);
}
static void
random_array_test(int nelts)
{
int *MALLOC_N(nelts, a);
for (int i = 0; i < nelts; ++i) {
a[i] = rand() % MAX_NUM;
}
mergesort_r(a, nelts, sizeof a[0], &MAGIC_EXTRA, int_cmp);
check_int_array(a, nelts);
toku_free(a);
}
int
test_main(int argc __attribute__((__unused__)), const char *argv[] __attribute__((__unused__)))
{
zero_array_test();
already_sorted_test(10);
already_sorted_test(1000);
already_sorted_test(10001);
already_sorted_test(10000000);
random_array_test(10);
random_array_test(1000);
random_array_test(10001);
random_array_test(10000000);
return 0;
}
......@@ -44,7 +44,7 @@ static void test_3748 (void) {
if (startmsn.msn == ZERO_MSN.msn)
startmsn = msn;
r = toku_fifo_enq(f, thekey, thekeylen, theval, thevallen, i, msn, xids); assert(r == 0);
r = toku_fifo_enq(f, thekey, thekeylen, theval, thevallen, i, msn, xids, NULL); assert(r == 0);
xids_destroy(&xids);
}
for (int i=N/10; i<N; i++) {
......
......@@ -62,13 +62,13 @@ populate_leaf(BRTNODE leafnode, int seq, int n, int *minkey, int *maxkey) {
}
static void
insert_into_child_buffer(BRTNODE node, int childnum, int minkey, int maxkey) {
insert_into_child_buffer(BRT brt, BRTNODE node, int childnum, int minkey, int maxkey) {
for (unsigned int val = htonl(minkey); val <= htonl(maxkey); val++) {
MSN msn = next_dummymsn();
unsigned int key = htonl(val);
DBT thekey; toku_fill_dbt(&thekey, &key, sizeof key);
DBT theval; toku_fill_dbt(&theval, &val, sizeof val);
toku_brt_append_to_child_buffer(node, childnum, BRT_INSERT, msn, xids_get_root_xids(), &thekey, &theval);
toku_brt_append_to_child_buffer(brt, node, childnum, BRT_INSERT, msn, xids_get_root_xids(), &thekey, &theval);
// Create bad tree (don't do following):
// node->max_msn_applied_to_node = msn;
......@@ -95,7 +95,7 @@ make_tree(BRT brt, int height, int fanout, int nperleaf, int *seq, int *minkey,
toku_brt_nonleaf_append_child(node, child, pivotkey, sizeof k);
}
toku_unpin_brtnode(brt, child);
insert_into_child_buffer(node, childnum, minkeys[childnum], maxkeys[childnum]);
insert_into_child_buffer(brt, node, childnum, minkeys[childnum], maxkeys[childnum]);
}
*minkey = minkeys[0];
*maxkey = maxkeys[0];
......
......@@ -47,13 +47,13 @@ populate_leaf(BRTNODE leafnode, int seq, int n, int *minkey, int *maxkey) {
}
static UU() void
insert_into_child_buffer(BRTNODE node, int childnum, int minkey, int maxkey) {
insert_into_child_buffer(BRT brt, BRTNODE node, int childnum, int minkey, int maxkey) {
for (unsigned int val = htonl(minkey); val <= htonl(maxkey); val++) {
unsigned int key = htonl(val);
DBT thekey; toku_fill_dbt(&thekey, &key, sizeof key);
DBT theval; toku_fill_dbt(&theval, &val, sizeof val);
MSN msn = next_dummymsn();
toku_brt_append_to_child_buffer(node, childnum, BRT_INSERT, msn, xids_get_root_xids(), &thekey, &theval);
toku_brt_append_to_child_buffer(brt, node, childnum, BRT_INSERT, msn, xids_get_root_xids(), &thekey, &theval);
}
}
......
......@@ -47,13 +47,13 @@ populate_leaf(BRTNODE leafnode, int seq, int n, int *minkey, int *maxkey) {
}
static UU() void
insert_into_child_buffer(BRTNODE node, int childnum, int minkey, int maxkey) {
insert_into_child_buffer(BRT brt, BRTNODE node, int childnum, int minkey, int maxkey) {
for (unsigned int val = htonl(minkey); val <= htonl(maxkey); val++) {
unsigned int key = htonl(val);
DBT thekey; toku_fill_dbt(&thekey, &key, sizeof key);
DBT theval; toku_fill_dbt(&theval, &val, sizeof val);
MSN msn = next_dummymsn();
toku_brt_append_to_child_buffer(node, childnum, BRT_INSERT, msn, xids_get_root_xids(), &thekey, &theval);
toku_brt_append_to_child_buffer(brt, node, childnum, BRT_INSERT, msn, xids_get_root_xids(), &thekey, &theval);
}
}
......
......@@ -48,7 +48,7 @@ populate_leaf(BRTNODE leafnode, int seq, int n, int *minkey, int *maxkey) {
}
static void
insert_into_child_buffer(BRTNODE node, int childnum, int minkey, int maxkey) {
insert_into_child_buffer(BRT brt, BRTNODE node, int childnum, int minkey, int maxkey) {
int k = htonl(maxkey);
maxkey = htonl(k+1);
for (unsigned int val = htonl(minkey); val <= htonl(maxkey); val++) {
......@@ -56,7 +56,7 @@ insert_into_child_buffer(BRTNODE node, int childnum, int minkey, int maxkey) {
DBT thekey; toku_fill_dbt(&thekey, &key, sizeof key);
DBT theval; toku_fill_dbt(&theval, &val, sizeof val);
MSN msn = next_dummymsn();
toku_brt_append_to_child_buffer(node, childnum, BRT_INSERT, msn, xids_get_root_xids(), &thekey, &theval);
toku_brt_append_to_child_buffer(brt, node, childnum, BRT_INSERT, msn, xids_get_root_xids(), &thekey, &theval);
}
}
......@@ -80,7 +80,7 @@ make_tree(BRT brt, int height, int fanout, int nperleaf, int *seq, int *minkey,
toku_brt_nonleaf_append_child(node, child, pivotkey, sizeof k);
}
toku_unpin_brtnode(brt, child);
insert_into_child_buffer(node, childnum, minkeys[childnum], maxkeys[childnum]);
insert_into_child_buffer(brt, node, childnum, minkeys[childnum], maxkeys[childnum]);
}
*minkey = minkeys[0];
*maxkey = maxkeys[0];
......
......@@ -47,13 +47,13 @@ populate_leaf(BRTNODE leafnode, int seq, int n, int *minkey, int *maxkey) {
}
static UU() void
insert_into_child_buffer(BRTNODE node, int childnum, int minkey, int maxkey) {
insert_into_child_buffer(BRT brt, BRTNODE node, int childnum, int minkey, int maxkey) {
for (unsigned int val = htonl(minkey); val <= htonl(maxkey); val++) {
unsigned int key = htonl(val);
DBT thekey; toku_fill_dbt(&thekey, &key, sizeof key);
DBT theval; toku_fill_dbt(&theval, &val, sizeof val);
MSN msn = next_dummymsn();
toku_brt_append_to_child_buffer(node, childnum, BRT_INSERT, msn, xids_get_root_xids(), &thekey, &theval);
toku_brt_append_to_child_buffer(brt, node, childnum, BRT_INSERT, msn, xids_get_root_xids(), &thekey, &theval);
}
}
......
......@@ -265,6 +265,7 @@ garbage_collection(ULE ule, OMT snapshot_xids, OMT live_list_reverse) {
done:;
}
/////////////////////////////////////////////////////////////////////////////////
// This is the big enchilada. (Bring Tums.) Note that this level of abstraction
// has no knowledge of the inner structure of either leafentry or msg. It makes
......
......@@ -43,6 +43,13 @@ TXNID uxr_get_txnid(UXRHANDLE uxr);
//1 does much slower debugging
#define GARBAGE_COLLECTION_DEBUG 0
void fast_msg_to_leafentry(
BRT_MSG msg, // message to apply to leafentry
size_t *new_leafentry_memorysize,
size_t *new_leafentry_disksize,
LEAFENTRY *new_leafentry_p) ;
int apply_msg_to_leafentry(BRT_MSG msg,
LEAFENTRY old_leafentry, // NULL if there was no stored data.
size_t *new_leafentry_memorysize,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment