Commit 8c31d028 authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel

[t:4240] Refactoring of Flusher Threads and Cleaner threads to new files complete.

git-svn-id: file:///svn/toku/tokudb@37653 c7de825b-a66e-492c-adef-691d508d4ae1
parent 645458b2
......@@ -49,6 +49,8 @@ BRT_SOURCES = \
brt-serialize \
brt-verify \
brt \
brt-cachetable-wrappers \
brt-flusher \
brt_msg \
brt-test-helpers \
cachetable \
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007-2011 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <brt-cachetable-wrappers.h>
#include <brttypes.h>
#include <brt-internal.h>
#include <cachetable.h>
static void
brtnode_get_key_and_fullhash(
BLOCKNUM* cachekey,
u_int32_t* fullhash,
void* extra)
{
struct brt_header* h = extra;
BLOCKNUM name;
toku_allocate_blocknum(h->blocktable, &name, h);
*cachekey = name;
*fullhash = toku_cachetable_hash(h->cf, name);
}
void
cachetable_put_empty_node_with_dep_nodes(
struct brt_header* h,
u_int32_t num_dependent_nodes,
BRTNODE* dependent_nodes,
BLOCKNUM* name, //output
u_int32_t* fullhash, //output
BRTNODE* result)
{
BRTNODE XMALLOC(new_node);
CACHEFILE dependent_cf[num_dependent_nodes];
BLOCKNUM dependent_keys[num_dependent_nodes];
u_int32_t dependent_fullhash[num_dependent_nodes];
enum cachetable_dirty dependent_dirty_bits[num_dependent_nodes];
for (u_int32_t i = 0; i < num_dependent_nodes; i++) {
dependent_cf[i] = h->cf;
dependent_keys[i] = dependent_nodes[i]->thisnodename;
dependent_fullhash[i] = toku_cachetable_hash(h->cf, dependent_nodes[i]->thisnodename);
dependent_dirty_bits[i] = (enum cachetable_dirty) dependent_nodes[i]->dirty;
}
int r = toku_cachetable_put_with_dep_pairs(
h->cf,
brtnode_get_key_and_fullhash,
new_node,
make_pair_attr(sizeof(BRTNODE)),
toku_brtnode_flush_callback,
toku_brtnode_pe_est_callback,
toku_brtnode_pe_callback,
toku_brtnode_cleaner_callback,
h,
h,
num_dependent_nodes,
dependent_cf,
dependent_keys,
dependent_fullhash,
dependent_dirty_bits,
name,
fullhash);
assert_zero(r);
*result = new_node;
}
void
create_new_brtnode_with_dep_nodes(
struct brt_header* h,
BRTNODE *result,
int height,
int n_children,
u_int32_t num_dependent_nodes,
BRTNODE* dependent_nodes)
{
u_int32_t fullhash = 0;
BLOCKNUM name;
cachetable_put_empty_node_with_dep_nodes(
h,
num_dependent_nodes,
dependent_nodes,
&name,
&fullhash,
result);
assert(h->nodesize > 0);
assert(h->basementnodesize > 0);
if (height == 0) {
assert(n_children > 0);
}
toku_initialize_empty_brtnode(
*result,
name,
height,
n_children,
h->layout_version,
h->nodesize,
h->flags);
assert((*result)->nodesize > 0);
(*result)->fullhash = fullhash;
}
void
toku_create_new_brtnode (
BRT t,
BRTNODE *result,
int height,
int n_children)
{
return create_new_brtnode_with_dep_nodes(
t->h,
result,
height,
n_children,
0,
NULL);
}
//
// The intent of toku_pin_brtnode(_holding_lock) is to abstract the process of retrieving a node from
// the rest of brt.c, so that there is only one place where we need to worry applying ancestor
// messages to a leaf node. The idea is for all of brt.c (search, splits, merges, flushes, etc)
// to access a node via toku_pin_brtnode(_holding_lock)
//
int
toku_pin_brtnode(
BRT brt,
BLOCKNUM blocknum,
u_int32_t fullhash,
UNLOCKERS unlockers,
ANCESTORS ancestors,
const PIVOT_BOUNDS bounds,
BRTNODE_FETCH_EXTRA bfe,
BOOL apply_ancestor_messages, // this BOOL is probably temporary, for #3972, once we know how range query estimates work, will revisit this
BRTNODE *node_p)
{
void *node_v;
int r = toku_cachetable_get_and_pin_nonblocking(
brt->cf,
blocknum,
fullhash,
&node_v,
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_est_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
toku_brtnode_cleaner_callback,
bfe, //read_extraargs
brt->h, //write_extraargs
unlockers);
if (r==0) {
BRTNODE node = node_v;
if (apply_ancestor_messages) {
maybe_apply_ancestors_messages_to_node(brt, node, ancestors, bounds);
}
*node_p = node;
// printf("%*sPin %ld\n", 8-node->height, "", blocknum.b);
} else {
assert(r==TOKUDB_TRY_AGAIN); // Any other error and we should bomb out ASAP.
// printf("%*sPin %ld try again\n", 8, "", blocknum.b);
}
return r;
}
// see comments for toku_pin_brtnode
void
toku_pin_brtnode_holding_lock(
BRT brt,
BLOCKNUM blocknum,
u_int32_t fullhash,
ANCESTORS ancestors,
const PIVOT_BOUNDS bounds,
BRTNODE_FETCH_EXTRA bfe,
BOOL apply_ancestor_messages, // this BOOL is probably temporary, for #3972, once we know how range query estimates work, will revisit this
BRTNODE *node_p)
{
void *node_v;
int r = toku_cachetable_get_and_pin(
brt->cf,
blocknum,
fullhash,
&node_v,
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_est_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
toku_brtnode_cleaner_callback,
bfe,
brt->h
);
assert(r==0);
BRTNODE node = node_v;
if (apply_ancestor_messages) maybe_apply_ancestors_messages_to_node(brt, node, ancestors, bounds);
*node_p = node;
}
void
toku_pin_brtnode_off_client_thread(
struct brt_header* h,
BLOCKNUM blocknum,
u_int32_t fullhash,
BRTNODE_FETCH_EXTRA bfe,
u_int32_t num_dependent_nodes,
BRTNODE* dependent_nodes,
BRTNODE *node_p)
{
void *node_v;
CACHEFILE dependent_cf[num_dependent_nodes];
BLOCKNUM dependent_keys[num_dependent_nodes];
u_int32_t dependent_fullhash[num_dependent_nodes];
enum cachetable_dirty dependent_dirty_bits[num_dependent_nodes];
for (u_int32_t i = 0; i < num_dependent_nodes; i++) {
dependent_cf[i] = h->cf;
dependent_keys[i] = dependent_nodes[i]->thisnodename;
dependent_fullhash[i] = toku_cachetable_hash(h->cf, dependent_nodes[i]->thisnodename);
dependent_dirty_bits[i] = (enum cachetable_dirty) dependent_nodes[i]->dirty;
}
int r = toku_cachetable_get_and_pin_with_dep_pairs(
h->cf,
blocknum,
fullhash,
&node_v,
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_est_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
toku_brtnode_cleaner_callback,
bfe,
h,
num_dependent_nodes,
dependent_cf,
dependent_keys,
dependent_fullhash,
dependent_dirty_bits
);
assert(r==0);
BRTNODE node = node_v;
*node_p = node;
}
void
checkpoint_nodes(struct brt_header* h,
u_int32_t num_dependent_nodes,
BRTNODE* dependent_nodes)
{
CACHEFILE dependent_cf[num_dependent_nodes];
BLOCKNUM dependent_keys[num_dependent_nodes];
u_int32_t dependent_fullhash[num_dependent_nodes];
enum cachetable_dirty dependent_dirty_bits[num_dependent_nodes];
for (u_int32_t i = 0; i < num_dependent_nodes; i++) {
dependent_cf[i] = h->cf;
dependent_keys[i] = dependent_nodes[i]->thisnodename;
dependent_fullhash[i] = toku_cachetable_hash(h->cf, dependent_nodes[i]->thisnodename);
dependent_dirty_bits[i] = (enum cachetable_dirty) dependent_nodes[i]->dirty;
}
toku_checkpoint_pairs(
h->cf,
num_dependent_nodes,
dependent_cf,
dependent_keys,
dependent_fullhash,
dependent_dirty_bits
);
}
void
toku_unpin_brtnode_off_client_thread(struct brt_header* h, BRTNODE node)
// Effect: Unpin a brt node.
{
int r = toku_cachetable_unpin(
h->cf,
node->thisnodename,
node->fullhash,
(enum cachetable_dirty) node->dirty,
make_brtnode_pair_attr(node)
);
assert(r==0);
}
void
toku_unpin_brtnode(BRT brt, BRTNODE node)
// Effect: Unpin a brt node.
{
// printf("%*sUnpin %ld\n", 8-node->height, "", node->thisnodename.b);
VERIFY_NODE(brt,node);
toku_unpin_brtnode_off_client_thread(brt->h, node);
}
/* -*- mode: C; c-basic-offset: 4 -*- */
#ifndef BRT_CACHETABLE_WRAPPERS_H
#define BRT_CACHETABLE_WRAPPERS_H
#ident "$Id$"
#ident "Copyright (c) 2007-2011 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <c_dialects.h>
#include <brttypes.h>
C_BEGIN
void
cachetable_put_empty_node_with_dep_nodes(
struct brt_header* h,
u_int32_t num_dependent_nodes,
BRTNODE* dependent_nodes,
BLOCKNUM* name, //output
u_int32_t* fullhash, //output
BRTNODE* result
);
void
create_new_brtnode_with_dep_nodes(
struct brt_header* h,
BRTNODE *result,
int height,
int n_children,
u_int32_t num_dependent_nodes,
BRTNODE* dependent_nodes
);
void
toku_create_new_brtnode (
BRT t,
BRTNODE *result,
int height,
int n_children
);
void
toku_pin_brtnode_off_client_thread(
struct brt_header* h,
BLOCKNUM blocknum,
u_int32_t fullhash,
BRTNODE_FETCH_EXTRA bfe,
u_int32_t num_dependent_nodes,
BRTNODE* dependent_nodes,
BRTNODE *node_p
);
void
checkpoint_nodes(
struct brt_header* h,
u_int32_t num_dependent_nodes,
BRTNODE* dependent_nodes
);
int
toku_pin_brtnode(
BRT brt,
BLOCKNUM blocknum,
u_int32_t fullhash,
UNLOCKERS unlockers,
ANCESTORS ancestors,
const PIVOT_BOUNDS pbounds,
BRTNODE_FETCH_EXTRA bfe,
BOOL apply_ancestor_messages, // this BOOL is probably temporary, for #3972, once we know how range query estimates work, will revisit this
BRTNODE *node_p
) __attribute__((__warn_unused_result__));
void
toku_pin_brtnode_holding_lock(
BRT brt,
BLOCKNUM blocknum,
u_int32_t fullhash,
ANCESTORS ancestors,
const PIVOT_BOUNDS pbounds,
BRTNODE_FETCH_EXTRA bfe,
BOOL apply_ancestor_messages,
BRTNODE *node_p
);
void
toku_unpin_brtnode_off_client_thread(struct brt_header* h, BRTNODE node);
void
toku_unpin_brtnode(BRT brt, BRTNODE node);
C_END
#endif
This diff is collapsed.
/* -*- mode: C; c-basic-offset: 4 -*- */
#ifndef BRT_FLUSHER
#define BRT_FLUSHER
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
// This must be first to make the 64-bit file mode work right in Linux
#include <brttypes.h>
#include <c_dialects.h>
C_BEGIN
void
toku_flusher_thread_set_callback(
void (*callback_f)(int, void*),
void* extra
);
int
toku_brtnode_cleaner_callback_internal(
void *brtnode_pv,
BLOCKNUM blocknum,
u_int32_t fullhash,
void *extraargs,
BRT_STATUS brt_status
);
void
flush_node_on_background_thread(
BRT brt,
BRTNODE parent,
BRT_STATUS brt_status
);
void
brtleaf_split(
struct brt_header* h,
BRTNODE node,
BRTNODE *nodea,
BRTNODE *nodeb,
DBT *splitk,
BOOL create_new_node,
u_int32_t num_dependent_nodes,
BRTNODE* dependent_nodes
);
void
brt_nonleaf_split(
struct brt_header* h,
BRTNODE node,
BRTNODE *nodea,
BRTNODE *nodeb,
DBT *splitk,
u_int32_t num_dependent_nodes,
BRTNODE* dependent_nodes
);
C_END
#endif // End of header guardian.
......@@ -131,6 +131,12 @@ int toku_bnc_flush_to_child(
NONLEAF_CHILDINFO bnc,
BRTNODE child
);
bool
toku_brt_nonleaf_is_gorged(BRTNODE node);
enum reactivity get_nonleaf_reactivity (BRTNODE node);
enum reactivity get_node_reactivity (BRTNODE node);
// data of an available partition of a leaf brtnode
struct brtnode_leaf_basement_node {
......@@ -316,6 +322,8 @@ enum {
BRT_PIVOT_FRONT_COMPRESS = 8,
};
u_int32_t compute_child_fullhash (CACHEFILE cf, BRTNODE node, int childnum);
struct remembered_hash {
BOOL valid; // set to FALSE if the fullhash is invalid
FILENUM fnum;
......@@ -443,6 +451,7 @@ void destroy_nonleaf_childinfo (NONLEAF_CHILDINFO nl);
void toku_destroy_brtnode_internals(BRTNODE node);
void toku_brtnode_free (BRTNODE *node);
void toku_assert_entire_node_in_memory(BRTNODE node);
void bring_node_fully_into_memory(BRTNODE node, struct brt_header* h);
// append a child node to a parent node
void toku_brt_nonleaf_append_child(BRTNODE node, BRTNODE child, struct kv_pair *pivotkey, size_t pivotkeysize);
......@@ -456,6 +465,20 @@ void toku_brt_append_to_child_buffer(brt_compare_func compare_fun, DESCRIPTOR de
#define DEADBEEF ((void*)0xDEADBEEFDEADBEEF)
#endif
//#define SLOW
#ifdef SLOW
#define VERIFY_NODE(t,n) (toku_verify_or_set_counts(n), toku_verify_estimates(t,n))
#else
#define VERIFY_NODE(t,n) ((void)0)
#endif
//#define BRT_TRACE
#ifdef BRT_TRACE
#define WHEN_BRTTRACE(x) x
#else
#define WHEN_BRTTRACE(x) ((void)0)
#endif
struct brtenv {
CACHETABLE ct;
TOKULOGGER logger;
......@@ -607,7 +630,6 @@ static inline void fill_bfe_for_prefetch(struct brtnode_fetch_extra *bfe,
bfe->child_to_read = -1;
}
typedef struct ancestors *ANCESTORS;
struct ancestors {
BRTNODE node; // This is the root node if next is NULL.
int childnum; // which buffer holds messages destined to the node whose ancestors this list represents.
......@@ -645,23 +667,6 @@ void toku_create_new_brtnode (BRT t, BRTNODE *result, int height, int n_children
void toku_initialize_empty_brtnode (BRTNODE n, BLOCKNUM nodename, int height, int num_children,
int layout_version, unsigned int nodesize, unsigned int flags);
int toku_pin_brtnode_if_clean(
BRT brt, BLOCKNUM blocknum, u_int32_t fullhash,
ANCESTORS ancestors, struct pivot_bounds const * const bounds,
BRTNODE *node_p
);
int toku_pin_brtnode (BRT brt, BLOCKNUM blocknum, u_int32_t fullhash,
UNLOCKERS unlockers,
ANCESTORS ancestors, struct pivot_bounds const * const pbounds,
struct brtnode_fetch_extra *bfe,
BOOL apply_ancestor_messages, // this BOOL is probably temporary, for #3972, once we know how range query estimates work, will revisit this
BRTNODE *node_p)
__attribute__((__warn_unused_result__));
void toku_pin_brtnode_holding_lock (BRT brt, BLOCKNUM blocknum, u_int32_t fullhash,
ANCESTORS ancestors, struct pivot_bounds const * const pbounds,
struct brtnode_fetch_extra *bfe, BOOL apply_ancestor_messages,
BRTNODE *node_p);
void toku_unpin_brtnode (BRT brt, BRTNODE node);
unsigned int toku_brtnode_which_child(BRTNODE node, const DBT *k,
DESCRIPTOR desc, brt_compare_func cmp)
__attribute__((__warn_unused_result__));
......@@ -731,7 +736,7 @@ typedef struct le_status {
void toku_le_get_status(LE_STATUS);
typedef struct brt_status {
struct brt_status {
u_int64_t updates;
u_int64_t updates_broadcast;
u_int64_t descriptor_set;
......@@ -782,7 +787,7 @@ typedef struct brt_status {
uint64_t msg_bytes_max; // how many bytes of messages currently in trees (estimate)
uint64_t msg_num; // how many messages injected at root
uint64_t msg_num_broadcast; // how many broadcast messages injected at root
} BRT_STATUS_S, *BRT_STATUS;
};
void toku_brt_get_status(BRT_STATUS);
......
......@@ -4,6 +4,7 @@
#include "includes.h"
#include "ule.h"
#include <brt-cachetable-wrappers.h>
// dummymsn needed to simulate msn because messages are injected at a lower level than toku_brt_root_put_cmd()
......
This diff is collapsed.
......@@ -19,6 +19,7 @@
extern "C" {
#endif
typedef struct brt_status BRT_STATUS_S, *BRT_STATUS;
typedef struct brt *BRT;
typedef struct brtnode *BRTNODE;
typedef struct brtnode_leaf_basement_node *BASEMENTNODE;
......@@ -30,7 +31,6 @@ struct dbuf;
typedef unsigned int ITEMLEN;
typedef const void *bytevec;
//typedef const void *bytevec;
typedef int64_t DISKOFF; /* Offset in a disk. -1 is the NULL pointer. */
typedef u_int64_t TXNID;
......@@ -244,6 +244,16 @@ typedef enum __TXN_SNAPSHOT_TYPE {
TXN_SNAPSHOT_CHILD=2
} TXN_SNAPSHOT_TYPE;
typedef struct ancestors *ANCESTORS;
typedef struct pivot_bounds const * const PIVOT_BOUNDS;
typedef struct brtnode_fetch_extra *BRTNODE_FETCH_EXTRA;
typedef struct unlockers *UNLOCKERS;
enum reactivity {
RE_STABLE,
RE_FUSIBLE,
RE_FISSIBLE
};
#if defined(__cplusplus) || defined(__cilkplusplus)
};
......
......@@ -309,7 +309,6 @@ int toku_cachetable_get_and_pin (
void* write_extraargs
);
typedef struct unlockers *UNLOCKERS;
struct unlockers {
BOOL locked;
void (*f)(void*extra);
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id: brt-serialize-test.c 36808 2011-11-10 22:17:12Z bperlman $"
#ident "$Id$"
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
#include "test.h"
......
......@@ -13,6 +13,7 @@
#include "includes.h"
#include <brt-cachetable-wrappers.h>
#include "test.h"
static BRTNODE
......
......@@ -17,6 +17,7 @@
// - maybe get counter of messages ignored for old msn (once the counter is implemented in brt.c)
#include "brt-internal.h"
#include <brt-cachetable-wrappers.h>
#include "includes.h"
#include "test.h"
......
......@@ -9,6 +9,7 @@
#include "test.h"
#include "includes.h"
#include <brt-cachetable-wrappers.h>
// Some constants to be used in calculations below
static const int nodesize = 1024; // Target max node size
......
......@@ -16,6 +16,7 @@
#include "includes.h"
#include <brt-cachetable-wrappers.h>
#include "test.h"
static BRTNODE
......
......@@ -5,6 +5,7 @@
// generate a tree with bad pivots and check that brt->verify finds them
#include "includes.h"
#include <brt-cachetable-wrappers.h>
#include "test.h"
static BRTNODE
......
......@@ -6,6 +6,7 @@
// check that brt verify finds them
#include "includes.h"
#include <brt-cachetable-wrappers.h>
#include "test.h"
static BRTNODE
......
......@@ -5,6 +5,7 @@
// generate a tree with duplicate pivots and check that brt->verify finds them
#include "includes.h"
#include <brt-cachetable-wrappers.h>
#include "test.h"
static BRTNODE
......
......@@ -6,6 +6,7 @@
// check that brt verify finds them.
#include "includes.h"
#include <brt-cachetable-wrappers.h>
#include "test.h"
static BRTNODE
......
......@@ -6,6 +6,7 @@
// check that brt verify finds them
#include "includes.h"
#include <brt-cachetable-wrappers.h>
#include "test.h"
static BRTNODE
......
......@@ -5,6 +5,7 @@
// generate a tree with unsorted pivots and check that brt->verify finds them
#include "includes.h"
#include <brt-cachetable-wrappers.h>
#include "test.h"
static BRTNODE
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment