Commit f2c4fe13 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:4875], [t:4887], merge from tokudb.4875 to main

git-svn-id: file:///svn/toku/tokudb@43896 c7de825b-a66e-492c-adef-691d508d4ae1
parent 939721e7
......@@ -84,15 +84,15 @@ static inline void unlock_for_blocktable (BLOCK_TABLE bt);
static void
ft_set_dirty(FT h, BOOL for_checkpoint){
assert(toku_mutex_is_locked(&h->blocktable->mutex));
assert(h->type == FT_CURRENT);
ft_set_dirty(FT ft, BOOL for_checkpoint){
assert(toku_mutex_is_locked(&ft->blocktable->mutex));
assert(ft->h->type == FT_CURRENT);
if (for_checkpoint) {
assert(h->checkpoint_header->type == FT_CHECKPOINT_INPROGRESS);
h->checkpoint_header->dirty = 1;
assert(ft->checkpoint_header->type == FT_CHECKPOINT_INPROGRESS);
ft->checkpoint_header->dirty = 1;
}
else {
h->dirty = 1;
ft->h->dirty = 1;
}
}
......@@ -449,9 +449,9 @@ PRNTF("blokAllokator", 1L, size, offset, bt);
//Fills wbuf with bt
//A clean shutdown runs checkpoint start so that current and inprogress are copies.
void
toku_serialize_translation_to_wbuf_unlocked(BLOCK_TABLE bt, struct wbuf *w,
toku_serialize_translation_to_wbuf(BLOCK_TABLE bt, struct wbuf *w,
int64_t *address, int64_t *size) {
assert(toku_mutex_is_locked(&bt->mutex));
lock_for_blocktable(bt);
struct translation *t = &bt->inprogress;
BLOCKNUM b = make_blocknum(RESERVED_BLOCKNUM_TRANSLATION);
......@@ -478,6 +478,7 @@ toku_serialize_translation_to_wbuf_unlocked(BLOCK_TABLE bt, struct wbuf *w,
wbuf_int(w, checksum);
*address = t->block_translation[b.b].u.diskoff;
*size = t->block_translation[b.b].size;
unlock_for_blocktable(bt);
}
......
......@@ -52,7 +52,7 @@ void toku_blocknum_realloc_on_disk(BLOCK_TABLE bt, BLOCKNUM b, DISKOFF size, DIS
void toku_translate_blocknum_to_offset_size(BLOCK_TABLE bt, BLOCKNUM b, DISKOFF *offset, DISKOFF *size);
//Serialization
void toku_serialize_translation_to_wbuf_unlocked(BLOCK_TABLE bt, struct wbuf *w, int64_t *address, int64_t *size);
void toku_serialize_translation_to_wbuf(BLOCK_TABLE bt, struct wbuf *w, int64_t *address, int64_t *size);
void toku_block_table_swap_for_redirect(BLOCK_TABLE old_bt, BLOCK_TABLE new_bt);
......
......@@ -67,9 +67,9 @@
static CHECKPOINT_STATUS_S cp_status;
#define STATUS_INIT(k,t,l) { \
cp_status.status[k].keyname = #k; \
cp_status.status[k].type = t; \
cp_status.status[k].legend = "checkpoint: " l; \
cp_status.status[k].keyname = #k; \
cp_status.status[k].type = t; \
cp_status.status[k].legend = "checkpoint: " l; \
}
static void
......@@ -106,7 +106,7 @@ status_init(void) {
void
toku_checkpoint_get_status(CACHETABLE ct, CHECKPOINT_STATUS statp) {
if (!cp_status.initialized)
status_init();
status_init();
STATUS_VALUE(CP_PERIOD) = toku_get_checkpoint_period_unlocked(ct);
*statp = cp_status;
}
......@@ -193,7 +193,7 @@ checkpoint_safe_checkpoint_unlock(void) {
void
toku_multi_operation_client_lock(void) {
if (locked_mo)
(void) __sync_fetch_and_add(&STATUS_VALUE(CP_CLIENT_WAIT_ON_MO), 1);
(void) __sync_fetch_and_add(&STATUS_VALUE(CP_CLIENT_WAIT_ON_MO), 1);
toku_pthread_rwlock_rdlock(&multi_operation_lock);
}
......@@ -205,7 +205,7 @@ toku_multi_operation_client_unlock(void) {
void
toku_checkpoint_safe_client_lock(void) {
if (locked_cs)
(void) __sync_fetch_and_add(&STATUS_VALUE(CP_CLIENT_WAIT_ON_CS), 1);
(void) __sync_fetch_and_add(&STATUS_VALUE(CP_CLIENT_WAIT_ON_CS), 1);
toku_pthread_rwlock_rdlock(&checkpoint_safe_lock);
toku_multi_operation_client_lock();
}
......@@ -241,23 +241,23 @@ toku_checkpoint_destroy(void) {
// Take a checkpoint of all currently open dictionaries
int
toku_checkpoint(CACHETABLE ct, TOKULOGGER logger,
void (*callback_f)(void*), void * extra,
void (*callback2_f)(void*), void * extra2,
checkpoint_caller_t caller_id) {
void (*callback_f)(void*), void * extra,
void (*callback2_f)(void*), void * extra2,
checkpoint_caller_t caller_id) {
int r;
int footprint_offset = (int) caller_id * 1000;
assert(initialized);
if (locked_cs) {
if (caller_id == SCHEDULED_CHECKPOINT)
(void) __sync_fetch_and_add(&STATUS_VALUE(CP_WAIT_SCHED_CS), 1);
else if (caller_id == CLIENT_CHECKPOINT)
(void) __sync_fetch_and_add(&STATUS_VALUE(CP_WAIT_CLIENT_CS), 1);
else if (caller_id == TXN_COMMIT_CHECKPOINT)
(void) __sync_fetch_and_add(&STATUS_VALUE(CP_WAIT_TXN_CS), 1);
else
(void) __sync_fetch_and_add(&STATUS_VALUE(CP_WAIT_OTHER_CS), 1);
if (caller_id == SCHEDULED_CHECKPOINT)
(void) __sync_fetch_and_add(&STATUS_VALUE(CP_WAIT_SCHED_CS), 1);
else if (caller_id == CLIENT_CHECKPOINT)
(void) __sync_fetch_and_add(&STATUS_VALUE(CP_WAIT_CLIENT_CS), 1);
else if (caller_id == TXN_COMMIT_CHECKPOINT)
(void) __sync_fetch_and_add(&STATUS_VALUE(CP_WAIT_TXN_CS), 1);
else
(void) __sync_fetch_and_add(&STATUS_VALUE(CP_WAIT_OTHER_CS), 1);
}
(void) __sync_fetch_and_add(&STATUS_VALUE(CP_WAITERS_NOW), 1);
......@@ -265,27 +265,29 @@ toku_checkpoint(CACHETABLE ct, TOKULOGGER logger,
(void) __sync_fetch_and_sub(&STATUS_VALUE(CP_WAITERS_NOW), 1);
if (STATUS_VALUE(CP_WAITERS_NOW) > STATUS_VALUE(CP_WAITERS_MAX))
STATUS_VALUE(CP_WAITERS_MAX) = STATUS_VALUE(CP_WAITERS_NOW); // threadsafe, within checkpoint_safe lock
STATUS_VALUE(CP_WAITERS_MAX) = STATUS_VALUE(CP_WAITERS_NOW); // threadsafe, within checkpoint_safe lock
SET_CHECKPOINT_FOOTPRINT(10);
if (locked_mo) {
if (caller_id == SCHEDULED_CHECKPOINT)
STATUS_VALUE(CP_WAIT_SCHED_MO)++; // threadsafe, within checkpoint_safe lock
else if (caller_id == CLIENT_CHECKPOINT)
STATUS_VALUE(CP_WAIT_CLIENT_MO)++;
else if (caller_id == TXN_COMMIT_CHECKPOINT)
STATUS_VALUE(CP_WAIT_TXN_MO)++;
else
STATUS_VALUE(CP_WAIT_OTHER_MO)++;
if (caller_id == SCHEDULED_CHECKPOINT)
STATUS_VALUE(CP_WAIT_SCHED_MO)++; // threadsafe, within checkpoint_safe lock
else if (caller_id == CLIENT_CHECKPOINT)
STATUS_VALUE(CP_WAIT_CLIENT_MO)++;
else if (caller_id == TXN_COMMIT_CHECKPOINT)
STATUS_VALUE(CP_WAIT_TXN_MO)++;
else
STATUS_VALUE(CP_WAIT_OTHER_MO)++;
}
multi_operation_checkpoint_lock();
SET_CHECKPOINT_FOOTPRINT(20);
ydb_lock();
toku_ft_open_close_lock();
SET_CHECKPOINT_FOOTPRINT(30);
STATUS_VALUE(CP_TIME_LAST_CHECKPOINT_BEGIN) = time(NULL);
r = toku_cachetable_begin_checkpoint(ct, logger);
toku_ft_open_close_unlock();
multi_operation_checkpoint_unlock();
ydb_unlock();
......@@ -299,7 +301,7 @@ toku_checkpoint(CACHETABLE ct, TOKULOGGER logger,
if (r==0 && logger) {
last_completed_checkpoint_lsn = logger->last_completed_checkpoint_lsn;
r = toku_logger_maybe_trim_log(logger, last_completed_checkpoint_lsn);
STATUS_VALUE(CP_LAST_LSN) = last_completed_checkpoint_lsn.lsn;
STATUS_VALUE(CP_LAST_LSN) = last_completed_checkpoint_lsn.lsn;
}
SET_CHECKPOINT_FOOTPRINT(60);
......@@ -307,9 +309,9 @@ toku_checkpoint(CACHETABLE ct, TOKULOGGER logger,
STATUS_VALUE(CP_TIME_LAST_CHECKPOINT_BEGIN_COMPLETE) = STATUS_VALUE(CP_TIME_LAST_CHECKPOINT_BEGIN);
if (r == 0)
STATUS_VALUE(CP_CHECKPOINT_COUNT)++;
STATUS_VALUE(CP_CHECKPOINT_COUNT)++;
else
STATUS_VALUE(CP_CHECKPOINT_COUNT_FAIL)++;
STATUS_VALUE(CP_CHECKPOINT_COUNT_FAIL)++;
STATUS_VALUE(CP_FOOTPRINT) = 0;
checkpoint_safe_checkpoint_unlock();
......
......@@ -65,7 +65,7 @@ cachetable_put_empty_node_with_dep_nodes(
void
create_new_ftnode_with_dep_nodes(
FT h,
FT ft,
FTNODE *result,
int height,
int n_children,
......@@ -76,15 +76,15 @@ create_new_ftnode_with_dep_nodes(
BLOCKNUM name;
cachetable_put_empty_node_with_dep_nodes(
h,
ft,
num_dependent_nodes,
dependent_nodes,
&name,
&fullhash,
result);
assert(h->nodesize > 0);
assert(h->basementnodesize > 0);
assert(ft->h->nodesize > 0);
assert(ft->h->basementnodesize > 0);
if (height == 0) {
assert(n_children > 0);
}
......@@ -94,9 +94,9 @@ create_new_ftnode_with_dep_nodes(
name,
height,
n_children,
h->layout_version,
h->nodesize,
h->flags);
ft->h->layout_version,
ft->h->nodesize,
ft->h->flags);
assert((*result)->nodesize > 0);
(*result)->fullhash = fullhash;
......@@ -208,10 +208,10 @@ toku_pin_ftnode_off_client_thread(
}
void
toku_unpin_ftnode_off_client_thread(FT h, FTNODE node)
toku_unpin_ftnode_off_client_thread(FT ft, FTNODE node)
{
int r = toku_cachetable_unpin(
h->cf,
ft->cf,
node->thisnodename,
node->fullhash,
(enum cachetable_dirty) node->dirty,
......@@ -221,11 +221,11 @@ toku_unpin_ftnode_off_client_thread(FT h, FTNODE node)
}
void
toku_unpin_ftnode(FT h, FTNODE node)
toku_unpin_ftnode(FT ft, FTNODE node)
{
// printf("%*sUnpin %ld\n", 8-node->height, "", node->thisnodename.b);
//VERIFY_NODE(brt,node);
toku_unpin_ftnode_off_client_thread(h, node);
toku_unpin_ftnode_off_client_thread(ft, node);
}
void
......
......@@ -718,15 +718,15 @@ ftleaf_split(
invariant(node->height == 0);
STATUS_VALUE(FT_FLUSHER_SPLIT_LEAF)++;
if (node->n_children) {
// First move all the accumulated stat64info deltas into the first basement.
// After the split, either both nodes or neither node will be included in the next checkpoint.
// The accumulated stats in the dictionary will be correct in either case.
// By moving all the deltas into one (arbitrary) basement, we avoid the need to maintain
// correct information for a basement that is divided between two leafnodes (i.e. when split is
// not on a basement boundary).
STAT64INFO_S delta_for_leafnode = toku_get_and_clear_basement_stats(node);
BASEMENTNODE bn = BLB(node,0);
bn->stat64_delta = delta_for_leafnode;
// First move all the accumulated stat64info deltas into the first basement.
// After the split, either both nodes or neither node will be included in the next checkpoint.
// The accumulated stats in the dictionary will be correct in either case.
// By moving all the deltas into one (arbitrary) basement, we avoid the need to maintain
// correct information for a basement that is divided between two leafnodes (i.e. when split is
// not on a basement boundary).
STAT64INFO_S delta_for_leafnode = toku_get_and_clear_basement_stats(node);
BASEMENTNODE bn = BLB(node,0);
bn->stat64_delta = delta_for_leafnode;
}
......@@ -807,9 +807,9 @@ ftleaf_split(
name,
0,
num_children_in_b,
h->layout_version,
h->nodesize,
h->flags);
h->h->layout_version,
h->h->nodesize,
h->h->flags);
assert(B->nodesize > 0);
B->fullhash = fullhash;
}
......@@ -1002,7 +1002,7 @@ ft_split_child(
FTNODE nodea, nodeb;
DBT splitk;
// printf("%s:%d node %" PRIu64 "->u.n.n_children=%d height=%d\n", __FILE__, __LINE__, node->thisnodename.b, node->u.n.n_children, node->height);
assert(h->nodesize>=node->nodesize); /* otherwise we might be in trouble because the nodesize shrank. */
assert(h->h->nodesize>=node->nodesize); /* otherwise we might be in trouble because the nodesize shrank. */
// for test
call_flusher_thread_callback(flt_flush_before_split);
......
......@@ -335,67 +335,129 @@ u_int32_t compute_child_fullhash (CACHEFILE cf, FTNODE node, int childnum);
enum ft_type {FT_CURRENT=1, FT_CHECKPOINT_INPROGRESS};
struct ft_header {
enum ft_type type;
int dirty;
// Free-running counter incremented once per checkpoint (toggling LSB).
// LSB indicates which header location is used on disk so this
// counter is effectively a boolean which alternates with each checkpoint.
uint64_t checkpoint_count;
// LSN of creation of "checkpoint-begin" record in log.
LSN checkpoint_lsn;
// see brt_layout_version.h. maybe don't need this if we assume
// it's always the current version after deserializing
const int layout_version;
// different (<) from layout_version if upgraded from a previous
// version (useful for debugging)
const int layout_version_original;
// build_id (svn rev number) of software that wrote this node to
// disk. (read from disk, overwritten when written to disk, I
// think).
const uint32_t build_id;
// build_id of software that created this tree
const uint32_t build_id_original;
// time this tree was created
const uint64_t time_of_creation;
// and the root transaction id that created it
TXNID root_xid_that_created;
// last time this header was serialized to disk (read from disk,
// overwritten when written to disk)
uint64_t time_of_last_modification;
// last time that this tree was verified
uint64_t time_of_last_verification;
// this field is protected by tree_lock, see comment for tree_lock
BLOCKNUM root_blocknum;
const unsigned int flags;
const unsigned int nodesize;
const unsigned int basementnodesize;
const enum toku_compression_method compression_method;
// Current Minimum MSN to be used when upgrading pre-MSN BRT's.
// This is decremented from our currnt MIN_MSN so as not to clash
// with any existing 'normal' MSN's.
MSN highest_unused_msn_for_upgrade;
// last time that a hot optimize operation was begun
uint64_t time_of_last_optimize_begin;
// last time that a hot optimize operation was successfully completed
uint64_t time_of_last_optimize_end;
// the number of hot optimize operations currently in progress on this tree
uint32_t count_of_optimize_in_progress;
// the number of hot optimize operations in progress on this tree at the time of the last crash (this field is in-memory only)
uint32_t count_of_optimize_in_progress_read_from_disk;
// all messages before this msn have been applied to leaf nodes
MSN msn_at_start_of_last_completed_optimize;
STAT64INFO_S on_disk_stats;
};
// brt_header is always the current version.
struct ft {
enum ft_type type;
FT checkpoint_header;
FT_HEADER h;
FT_HEADER checkpoint_header;
// These are (mostly) read-only.
CACHEFILE cf;
// unique id for dictionary
DICTIONARY_ID dict_id;
ft_compare_func compare_fun;
ft_update_func update_fun;
// protected by locktree
DESCRIPTOR_S descriptor;
// protected by locktree and user. User
// makes sure this is only changed
// when no activity on tree
DESCRIPTOR_S cmp_descriptor;
// These are not read-only:
// lock used by a thread to pin the root node to start a descent into
// the tree. This lock protects the blocknum of the root node (root_blocknum). Any
// thread that wants to descend down the tree starting at the root
// must grab this lock before pinning the root.
toku_mutex_t tree_lock;
u_int64_t checkpoint_count; // Free-running counter incremented once per checkpoint (toggling LSB).
// LSB indicates which header location is used on disk so this
// counter is effectively a boolean which alternates with each checkpoint.
LSN checkpoint_lsn; // LSN of creation of "checkpoint-begin" record in log.
int dirty;
DICTIONARY_ID dict_id; // unique id for dictionary
int panic; // If nonzero there was a write error. Don't write any more, because it probably only gets worse. This is the error code.
char *panic_string; // A malloced string that can indicate what went wrong.
int layout_version;
int layout_version_original; // different (<) from layout_version if upgraded from a previous version (useful for debugging)
int layout_version_read_from_disk; // transient, not serialized to disk
uint32_t build_id; // build_id (svn rev number) of software that wrote this node to disk
uint32_t build_id_original; // build_id of software that created this tree (read from disk, overwritten when written to disk)
uint64_t time_of_creation; // time this tree was created
uint64_t time_of_last_modification; // last time this header was serialized to disk (read from disk, overwritten when written to disk)
uint64_t time_of_last_verification; // last time that this tree was verified
unsigned int nodesize;
unsigned int basementnodesize;
// this field is protected by tree_lock, see comment for tree_lock
BLOCKNUM root_blocknum; // roots of the dictionary
unsigned int flags;
DESCRIPTOR_S descriptor;
DESCRIPTOR_S cmp_descriptor;
toku_mutex_t tree_lock;
// protected by blocktable lock
BLOCK_TABLE blocktable;
// protected by atomic builtins
STAT64INFO_S in_memory_stats;
// transient, not serialized to disk. updated when we do write to
// disk. tells us whether we can do partial eviction (we can't if
// the on-disk layout version is from before basement nodes)
int layout_version_read_from_disk;
// If a transaction created this BRT, which one?
// If a transaction locked the BRT when it was empty, which transaction? (Only the latest one matters)
// 0 if no such transaction
// only one thread can write to these at once, this is enforced by
// the lock tree
TXNID txnid_that_created_or_locked_when_empty;
TXNID root_that_created_or_locked_when_empty;
TXNID txnid_that_suppressed_recovery_logs;
TXNID root_xid_that_created;
struct toku_list live_ft_handles;
OMT txns; // transactions that are using this header
bool pinned_by_checkpoint; //Keep this header around for checkpoint, like a transaction
ft_compare_func compare_fun;
ft_update_func update_fun;
STAT64INFO_S in_memory_stats;
STAT64INFO_S on_disk_stats;
STAT64INFO_S checkpoint_staging_stats;
uint64_t time_of_last_optimize_begin; // last time that a hot optimize operation was begun
uint64_t time_of_last_optimize_end; // last time that a hot optimize operation was successfully completed
uint32_t count_of_optimize_in_progress; // the number of hot optimize operations currently in progress on this tree
uint32_t count_of_optimize_in_progress_read_from_disk; // the number of hot optimize operations in progress on this tree at the time of the last crash (this field is in-memory only)
MSN msn_at_start_of_last_completed_optimize; // all messages before this msn have been applied to leaf nodes
enum toku_compression_method compression_method;
// Current Minimum MSN to be used when upgrading pre-MSN BRT's.
// This is decremented from our currnt MIN_MSN so as not to clash
// with any existing 'normal' MSN's.
MSN highest_unused_msn_for_upgrade;
// protects modifying live_ft_handles, txns, and pinned_by_checkpoint
toku_mutex_t ft_ref_lock;
struct toku_list live_ft_handles;
// transactions that are using this header. you should only be able
// to modify this if you have a valid handle in the list of live brts
OMT txns;
// Keep this header around for checkpoint, like a transaction
bool pinned_by_checkpoint;
// If nonzero there was a write error. Don't write any more, because it probably only gets worse. This is the error code.
int panic;
// A malloced string that can indicate what went wrong.
char *panic_string;
};
// Copy the descriptor into a temporary variable, and tell DRD that subsequent code happens after reading that pointer.
......@@ -464,9 +526,14 @@ int toku_keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2le
void toku_verify_or_set_counts(FTNODE);
int toku_serialize_ft_size (FT h);
int toku_serialize_ft_to (int fd, FT h);
int toku_serialize_ft_to_wbuf (struct wbuf *, FT h, int64_t address_translation, int64_t size_translation);
int toku_serialize_ft_size (FT_HEADER h);
int toku_serialize_ft_to (int fd, FT_HEADER h, BLOCK_TABLE blocktable, CACHEFILE cf);
int toku_serialize_ft_to_wbuf (
struct wbuf *wbuf,
FT_HEADER h,
DISKOFF translation_location_on_disk,
DISKOFF translation_size_on_disk
);
enum deserialize_error_code toku_deserialize_ft_from (int fd, LSN max_acceptable_lsn, FT *ft);
int toku_serialize_descriptor_contents_to_fd(int fd, const DESCRIPTOR desc, DISKOFF offset);
void toku_serialize_descriptor_contents_to_wbuf(struct wbuf *wb, const DESCRIPTOR desc);
......@@ -579,7 +646,6 @@ struct ft_cursor {
// is required, such as for flushes.
//
static inline void fill_bfe_for_full_read(struct ftnode_fetch_extra *bfe, FT h) {
invariant(h->type == FT_CURRENT);
bfe->type = ftnode_fetch_all;
bfe->h = h;
bfe->search = NULL;
......@@ -608,7 +674,7 @@ static inline void fill_bfe_for_subset_read(
BOOL disable_prefetching
)
{
invariant(h->type == FT_CURRENT);
invariant(h->h->type == FT_CURRENT);
bfe->type = ftnode_fetch_subset;
bfe->h = h;
bfe->search = search;
......@@ -627,7 +693,7 @@ static inline void fill_bfe_for_subset_read(
// Currently used for stat64.
//
static inline void fill_bfe_for_min_read(struct ftnode_fetch_extra *bfe, FT h) {
invariant(h->type == FT_CURRENT);
invariant(h->h->type == FT_CURRENT);
bfe->type = ftnode_fetch_none;
bfe->h = h;
bfe->search = NULL;
......@@ -659,7 +725,7 @@ static inline void destroy_bfe_for_prefetch(struct ftnode_fetch_extra *bfe) {
static inline void fill_bfe_for_prefetch(struct ftnode_fetch_extra *bfe,
FT h,
FT_CURSOR c) {
invariant(h->type == FT_CURRENT);
invariant(h->h->type == FT_CURRENT);
bfe->type = ftnode_fetch_prefetch;
bfe->h = h;
bfe->search = NULL;
......
......@@ -150,6 +150,8 @@ static volatile FT_STATUS_S ft_status;
ft_status.status[k].legend = "brt: " l; \
}
static toku_mutex_t ft_open_close_lock;
static void
status_init(void)
{
......@@ -307,8 +309,8 @@ toku_ft_nonleaf_is_gorged (FTNODE node) {
(!buffers_are_empty));
}
static void ft_verify_flags(FT h, FTNODE node) {
assert(h->flags == node->flags);
static void ft_verify_flags(FT ft, FTNODE node) {
assert(ft->h->flags == node->flags);
}
int toku_ft_debug_mode = 0;
......@@ -599,16 +601,25 @@ static void ft_status_update_flush_reason(FTNODE node, BOOL for_checkpoint) {
static void ftnode_update_disk_stats(
FTNODE ftnode,
FT h,
FT ft,
BOOL for_checkpoint
)
{
STAT64INFO_S deltas = ZEROSTATS;
// capture deltas before rebalancing basements for serialization
deltas = toku_get_and_clear_basement_stats(ftnode);
toku_ft_update_stats(&h->on_disk_stats, deltas);
// locking not necessary here with respect to checkpointing
// in Clayface (because of the pending lock and cachetable lock
// in toku_cachetable_begin_checkpoint)
// essentially, if we are dealing with a for_checkpoint
// parameter in a function that is called by the flush_callback,
// then the cachetable needs to ensure that this is called in a safe
// manner that does not interfere with the beginning
// of a checkpoint, which it does with the cachetable lock
// and pending lock
toku_ft_update_stats(&ft->h->on_disk_stats, deltas);
if (for_checkpoint) {
toku_ft_update_stats(&h->checkpoint_staging_stats, deltas);
toku_ft_update_stats(&ft->checkpoint_header->on_disk_stats, deltas);
}
}
......@@ -637,15 +648,15 @@ void toku_ftnode_clone_callback(
{
FTNODE node = value_data;
toku_assert_entire_node_in_memory(node);
FT h = write_extraargs;
FT ft = write_extraargs;
FTNODE XMALLOC(cloned_node);
//FTNODE cloned_node = (FTNODE)toku_xmalloc(sizeof(*FTNODE));
memset(cloned_node, 0, sizeof(*cloned_node));
if (node->height == 0) {
// set header stats, must be done before rebalancing
ftnode_update_disk_stats(node, h, for_checkpoint);
ftnode_update_disk_stats(node, ft, for_checkpoint);
// rebalance the leaf node
rebalance_ftnode_leaf(node, h->basementnodesize);
rebalance_ftnode_leaf(node, ft->h->basementnodesize);
}
cloned_node->max_msn_applied_to_node_on_disk = node->max_msn_applied_to_node_on_disk;
......@@ -870,7 +881,7 @@ void toku_evict_bn_from_memory(FTNODE node, int childnum, FT h) {
// callback for partially evicting a node
int toku_ftnode_pe_callback (void *ftnode_pv, PAIR_ATTR UU(old_attr), PAIR_ATTR* new_attr, void* extraargs) {
FTNODE node = (FTNODE)ftnode_pv;
FT h = extraargs;
FT ft = extraargs;
// Don't partially evict dirty nodes
if (node->dirty) {
goto exit;
......@@ -888,7 +899,7 @@ int toku_ftnode_pe_callback (void *ftnode_pv, PAIR_ATTR UU(old_attr), PAIR_ATTR*
if (BP_STATE(node,i) == PT_AVAIL) {
if (BP_SHOULD_EVICT(node,i)) {
STATUS_VALUE(FT_PARTIAL_EVICTIONS_NONLEAF)++;
cilk_spawn compress_internal_node_partition(node, i, h->compression_method);
cilk_spawn compress_internal_node_partition(node, i, ft->h->compression_method);
}
else {
BP_SWEEP_CLOCK(node,i);
......@@ -919,7 +930,7 @@ int toku_ftnode_pe_callback (void *ftnode_pv, PAIR_ATTR UU(old_attr), PAIR_ATTR*
else if (BP_STATE(node,i) == PT_AVAIL) {
if (BP_SHOULD_EVICT(node,i)) {
STATUS_VALUE(FT_PARTIAL_EVICTIONS_LEAF)++;
toku_evict_bn_from_memory(node, i, h);
toku_evict_bn_from_memory(node, i, ft);
}
else {
BP_SWEEP_CLOCK(node,i);
......@@ -1272,7 +1283,7 @@ toku_initialize_empty_ftnode (FTNODE n, BLOCKNUM nodename, int height, int num_c
}
static void
ft_init_new_root(FT h, FTNODE nodea, FTNODE nodeb, DBT splitk, CACHEKEY *rootp, FTNODE *newrootp)
ft_init_new_root(FT ft, FTNODE nodea, FTNODE nodeb, DBT splitk, CACHEKEY *rootp, FTNODE *newrootp)
// Effect: Create a new root node whose two children are NODEA and NODEB, and the pivotkey is SPLITK.
// Store the new root's identity in *ROOTP, and the node in *NEWROOTP.
// Unpin nodea and nodeb.
......@@ -1281,11 +1292,11 @@ ft_init_new_root(FT h, FTNODE nodea, FTNODE nodeb, DBT splitk, CACHEKEY *rootp,
FTNODE XMALLOC(newroot);
int new_height = nodea->height+1;
BLOCKNUM newroot_diskoff;
toku_allocate_blocknum(h->blocktable, &newroot_diskoff, h);
toku_allocate_blocknum(ft->blocktable, &newroot_diskoff, ft);
assert(newroot);
*rootp=newroot_diskoff;
assert(new_height > 0);
toku_initialize_empty_ftnode (newroot, newroot_diskoff, new_height, 2, h->layout_version, h->nodesize, h->flags);
toku_initialize_empty_ftnode (newroot, newroot_diskoff, new_height, 2, ft->h->layout_version, ft->h->nodesize, ft->h->flags);
//printf("new_root %lld %d %lld %lld\n", newroot_diskoff, newroot->height, nodea->thisnodename, nodeb->thisnodename);
//printf("%s:%d Splitkey=%p %s\n", __FILE__, __LINE__, splitkey, splitkey);
toku_copyref_dbt(&newroot->childkeys[0], splitk);
......@@ -1301,12 +1312,19 @@ ft_init_new_root(FT h, FTNODE nodea, FTNODE nodeb, DBT splitk, CACHEKEY *rootp,
BP_STATE(newroot,0) = PT_AVAIL;
BP_STATE(newroot,1) = PT_AVAIL;
newroot->dirty = 1;
toku_unpin_ftnode(h, nodea);
toku_unpin_ftnode(h, nodeb);
//printf("%s:%d put %lld\n", __FILE__, __LINE__, newroot_diskoff);
u_int32_t fullhash = toku_cachetable_hash(h->cf, newroot_diskoff);
u_int32_t fullhash = toku_cachetable_hash(ft->cf, newroot_diskoff);
newroot->fullhash = fullhash;
toku_cachetable_put(h->cf, newroot_diskoff, fullhash, newroot, make_ftnode_pair_attr(newroot), get_write_callbacks_for_node(h));
toku_cachetable_put(ft->cf, newroot_diskoff, fullhash, newroot, make_ftnode_pair_attr(newroot), get_write_callbacks_for_node(ft));
//at this point, newroot is associated with newroot_diskoff, nodea is associated with root_blocknum
// make newroot_diskoff point to nodea
// make root_blocknum point to newroot
// also modify the blocknum and fullhash of nodea and newroot
// before doing this, assert(nodea->blocknum == ft->root_blocknum)
toku_unpin_ftnode(ft, nodea);
toku_unpin_ftnode(ft, nodeb);
*newrootp = newroot;
}
......@@ -2048,7 +2066,7 @@ ft_nonleaf_put_cmd (ft_compare_func compare_fun, DESCRIPTOR desc, FTNODE node, F
// return TRUE if root changed, FALSE otherwise
static BOOL
ft_process_maybe_reactive_root (FT h, CACHEKEY *rootp, FTNODE *nodep) {
ft_process_maybe_reactive_root (FT ft, CACHEKEY *rootp, FTNODE *nodep) {
FTNODE node = *nodep;
toku_assert_entire_node_in_memory(node);
enum reactivity re = get_node_reactivity(node);
......@@ -2060,18 +2078,18 @@ ft_process_maybe_reactive_root (FT h, CACHEKEY *rootp, FTNODE *nodep) {
// The root node should split, so make a new root.
FTNODE nodea,nodeb;
DBT splitk;
assert(h->nodesize>=node->nodesize); /* otherwise we might be in trouble because the nodesize shrank. */
assert(ft->h->nodesize>=node->nodesize); /* otherwise we might be in trouble because the nodesize shrank. */
//
// This happens on the client thread with the ydb lock, so it is safe to
// not pass in dependent nodes. Although if we wanted to, we could pass
// in just node. That would be correct.
//
if (node->height==0) {
ftleaf_split(h, node, &nodea, &nodeb, &splitk, TRUE, 0, NULL);
ftleaf_split(ft, node, &nodea, &nodeb, &splitk, TRUE, 0, NULL);
} else {
ft_nonleaf_split(h, node, &nodea, &nodeb, &splitk, 0, NULL);
ft_nonleaf_split(ft, node, &nodea, &nodeb, &splitk, 0, NULL);
}
ft_init_new_root(h, nodea, nodeb, splitk, rootp, nodep);
ft_init_new_root(ft, nodea, nodeb, splitk, rootp, nodep);
return TRUE;
}
case RE_FUSIBLE:
......@@ -2695,7 +2713,7 @@ toku_ft_maybe_insert (FT_HANDLE brt, DBT *key, DBT *val, TOKUTXN txn, BOOL oplsn
//We have transactions, and this is not 2440. We must send the full root-to-leaf-path
message_xids = toku_txn_get_xids(txn);
}
else if (txn->ancestor_txnid64 != brt->ft->root_xid_that_created) {
else if (txn->ancestor_txnid64 != brt->ft->h->root_xid_that_created) {
//We have transactions, and this is 2440, however the txn doing 2440 did not create the dictionary. We must send the full root-to-leaf-path
message_xids = toku_txn_get_xids(txn);
}
......@@ -2800,6 +2818,7 @@ toku_ft_maybe_update_broadcast(FT_HANDLE brt, const DBT *update_function_extra,
if (r != 0) { goto cleanup; }
}
//TODO(yoni): remove treelsn here and similar calls (no longer being used)
LSN treelsn;
if (oplsn_valid &&
oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(brt->ft)).lsn) {
......@@ -2890,7 +2909,7 @@ toku_ft_maybe_delete(FT_HANDLE brt, DBT *key, TOKUTXN txn, BOOL oplsn_valid, LSN
//We have transactions, and this is not 2440. We must send the full root-to-leaf-path
message_xids = toku_txn_get_xids(txn);
}
else if (txn->ancestor_txnid64 != brt->ft->root_xid_that_created) {
else if (txn->ancestor_txnid64 != brt->ft->h->root_xid_that_created) {
//We have transactions, and this is 2440, however the txn doing 2440 did not create the dictionary. We must send the full root-to-leaf-path
message_xids = toku_txn_get_xids(txn);
}
......@@ -3126,10 +3145,10 @@ toku_ft_change_descriptor(
static void
toku_ft_handle_inherit_options(FT_HANDLE t, FT ft) {
struct ft_options options = {
.nodesize = ft->nodesize,
.basementnodesize = ft->basementnodesize,
.compression_method = ft->compression_method,
.flags = ft->flags,
.nodesize = ft->h->nodesize,
.basementnodesize = ft->h->basementnodesize,
.compression_method = ft->h->compression_method,
.flags = ft->h->flags,
.compare_fun = ft->compare_fun,
.update_fun = ft->update_fun
};
......@@ -3148,6 +3167,7 @@ ft_handle_open(FT_HANDLE t, const char *fname_in_env, int is_create, int only_cr
CACHEFILE cf = NULL;
FT ft = NULL;
BOOL did_create = FALSE;
toku_ft_open_close_lock();
if (t->did_set_flags) {
r = verify_builtin_comparisons_consistent(t, t->options.flags);
......@@ -3211,7 +3231,7 @@ ft_handle_open(FT_HANDLE t, const char *fname_in_env, int is_create, int only_cr
if (!t->did_set_flags) {
r = verify_builtin_comparisons_consistent(t, t->options.flags);
if (r) { goto exit; }
} else if (t->options.flags != ft->flags) { /* if flags have been set then flags must match */
} else if (t->options.flags != ft->h->flags) { /* if flags have been set then flags must match */
r = EINVAL;
goto exit;
}
......@@ -3277,7 +3297,10 @@ ft_handle_open(FT_HANDLE t, const char *fname_in_env, int is_create, int only_cr
// but we have not linked it to this brt. So,
// we can simply try to remove the header.
// We don't need to unlink this brt from the header
if (!toku_ft_needed(ft)) {
toku_ft_grab_reflock(ft);
BOOL needed = toku_ft_needed_unlocked(ft);
toku_ft_release_reflock(ft);
if (!needed) {
//Close immediately.
char *error_string = NULL;
r = toku_remove_ft(ft, &error_string, false, ZERO_LSN);
......@@ -3288,6 +3311,7 @@ ft_handle_open(FT_HANDLE t, const char *fname_in_env, int is_create, int only_cr
toku_cachefile_close(&cf, 0, FALSE, ZERO_LSN);
}
}
toku_ft_open_close_unlock();
return r;
}
......@@ -3406,36 +3430,24 @@ ft_compare_func toku_ft_get_bt_compare (FT_HANDLE brt) {
return brt->options.compare_fun;
}
static void
ft_remove_handle_ref_callback(FT UU(ft), void *extra) {
FT_HANDLE handle = extra;
toku_list_remove(&handle->live_ft_handle_link);
}
int
toku_ft_handle_close (FT_HANDLE brt, bool oplsn_valid, LSN oplsn)
// Effect: See ft-ops.h for the specification of this function.
{
int r = 0;
FT h = brt->ft;
// it is possible that a header was never opened
// for the brt
if (brt->ft) {
// TODO: figure out the proper locking here
// what really protects live_ft_handle_link?
toku_ft_lock(h);
toku_list_remove(&brt->live_ft_handle_link);
toku_ft_unlock(h);
if (!toku_ft_needed(brt->ft)) {
// close header
char *error_string = NULL;
r = toku_remove_ft(h, &error_string, oplsn_valid, oplsn);
assert_zero(r);
assert(error_string == NULL);
}
FT ft = brt->ft;
if (ft) {
toku_ft_remove_reference(brt->ft, oplsn_valid, oplsn, ft_remove_handle_ref_callback, brt);
}
toku_free(brt);
return r;
return 0;
}
// test function
int toku_close_ft_handle_nolsn (FT_HANDLE brt, char** UU(error_string)) {
return toku_ft_handle_close(brt, FALSE, ZERO_LSN);
}
......@@ -3530,7 +3542,7 @@ int toku_ft_cursor (
{
if (is_snapshot_read) {
invariant(ttxn != NULL);
int accepted = does_txn_read_entry(brt->ft->root_xid_that_created, ttxn);
int accepted = does_txn_read_entry(brt->ft->h->root_xid_that_created, ttxn);
if (accepted!=TOKUDB_ACCEPT) {
invariant(accepted==0);
return TOKUDB_MVCC_DICTIONARY_TOO_NEW;
......@@ -5434,17 +5446,23 @@ int toku_ft_layer_init(void (*ydb_lock_callback)(void),
void (*ydb_unlock_callback)(void)) {
int r = 0;
//Portability must be initialized first
if (r==0)
r = toku_portability_init();
if (r==0)
toku_checkpoint_init(ydb_lock_callback, ydb_unlock_callback);
if (r == 0)
r = toku_ft_serialize_layer_init();
r = toku_portability_init();
if (r) { goto exit; }
toku_checkpoint_init(ydb_lock_callback, ydb_unlock_callback);
r = toku_ft_serialize_layer_init();
if (r) { goto exit; }
toku_mutex_init(&ft_open_close_lock, NULL);
exit:
return r;
}
int toku_ft_layer_destroy(void) {
int r = 0;
int r = 0;
toku_mutex_destroy(&ft_open_close_lock);
if (r == 0)
r = toku_ft_serialize_layer_destroy();
if (r==0)
......@@ -5455,6 +5473,14 @@ int toku_ft_layer_destroy(void) {
return r;
}
void toku_ft_open_close_lock(void) {
toku_mutex_lock(&ft_open_close_lock);
}
void toku_ft_open_close_unlock(void) {
toku_mutex_unlock(&ft_open_close_lock);
}
//Suppress both rollback and recovery logs.
void
toku_ft_suppress_recovery_logs (FT_HANDLE brt, TOKUTXN txn) {
......
......@@ -241,6 +241,8 @@ toku_ft_handle_stat64 (FT_HANDLE, TOKUTXN, struct ftstat64_s *stat) __attribute_
int toku_ft_layer_init(void (*ydb_lock_callback)(void),
void (*ydb_unlock_callback)(void))
__attribute__ ((warn_unused_result));
void toku_ft_open_close_lock(void);
void toku_ft_open_close_unlock(void);
int toku_ft_layer_destroy(void) __attribute__ ((warn_unused_result));
int toku_ft_serialize_layer_init(void) __attribute__ ((warn_unused_result));
int toku_ft_serialize_layer_destroy(void) __attribute__ ((warn_unused_result));
......@@ -259,10 +261,6 @@ void toku_ft_suppress_recovery_logs (FT_HANDLE brt, TOKUTXN txn);
int toku_ft_get_fragmentation(FT_HANDLE brt, TOKU_DB_FRAGMENTATION report) __attribute__ ((warn_unused_result));
BOOL toku_ft_is_empty_fast (FT_HANDLE brt);
// Effect: Return TRUE if there are no messages or leaf entries in the tree. If so, it's empty. If there are messages or leaf entries, we say it's not empty
// even though if we were to optimize the tree it might turn out that they are empty.
BOOL toku_ft_is_empty_fast (FT_HANDLE brt) __attribute__ ((warn_unused_result));
// Effect: Return TRUE if there are no messages or leaf entries in the tree. If so, it's empty. If there are messages or leaf entries, we say it's not empty
// even though if we were to optimize the tree it might turn out that they are empty.
......
......@@ -139,10 +139,10 @@ deserialize_descriptor_from(int fd, BLOCK_TABLE bt, DESCRIPTOR desc, int layout_
// We only deserialize brt header once and then share everything with all the brts.
static enum deserialize_error_code
deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ft, uint32_t version)
deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ftp, uint32_t version)
{
enum deserialize_error_code e = DS_OK;
FT h = NULL;
FT ft = NULL;
invariant(version >= FT_LAYOUT_MIN_SUPPORTED_VERSION);
invariant(version <= FT_LAYOUT_VERSION);
// We already know:
......@@ -155,28 +155,25 @@ deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ft, uint32_t version)
rbuf_literal_bytes(rb, &magic, 8);
lazy_assert(memcmp(magic,"tokudata",8)==0);
CALLOC(h);
if (!h) {
XCALLOC(ft);
if (!ft) {
e = DS_ERRNO;
goto exit;
}
h->type = FT_CURRENT;
h->checkpoint_header = NULL;
h->dirty = 0;
h->panic = 0;
h->panic_string = 0;
toku_list_init(&h->live_ft_handles);
int r = toku_omt_create(&h->txns);
ft->checkpoint_header = NULL;
ft->panic = 0;
ft->panic_string = 0;
toku_list_init(&ft->live_ft_handles);
int r = toku_omt_create(&ft->txns);
assert_zero(r);
//version MUST be in network order on disk regardless of disk order
h->layout_version_read_from_disk = rbuf_network_int(rb);
invariant(h->layout_version_read_from_disk >= FT_LAYOUT_MIN_SUPPORTED_VERSION);
invariant(h->layout_version_read_from_disk <= FT_LAYOUT_VERSION);
h->layout_version = FT_LAYOUT_VERSION;
ft->layout_version_read_from_disk = rbuf_network_int(rb);
invariant(ft->layout_version_read_from_disk >= FT_LAYOUT_MIN_SUPPORTED_VERSION);
invariant(ft->layout_version_read_from_disk <= FT_LAYOUT_VERSION);
//build_id MUST be in network order on disk regardless of disk order
h->build_id = rbuf_network_int(rb);
uint32_t build_id = rbuf_network_int(rb);
//Size MUST be in network order regardless of disk order.
u_int32_t size = rbuf_network_int(rb);
......@@ -188,16 +185,17 @@ deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ft, uint32_t version)
int64_t byte_order_stored = *(int64_t*)tmp_byte_order_check;
lazy_assert(byte_order_stored == toku_byte_order_host);
h->checkpoint_count = rbuf_ulonglong(rb);
h->checkpoint_lsn = rbuf_lsn(rb);
h->nodesize = rbuf_int(rb);
uint64_t checkpoint_count = rbuf_ulonglong(rb);
LSN checkpoint_lsn = rbuf_lsn(rb);
unsigned nodesize = rbuf_int(rb);
DISKOFF translation_address_on_disk = rbuf_diskoff(rb);
DISKOFF translation_size_on_disk = rbuf_diskoff(rb);
lazy_assert(translation_address_on_disk > 0);
lazy_assert(translation_size_on_disk > 0);
// initialize the tree lock
toku_ft_init_treelock(h);
toku_ft_init_treelock(ft);
toku_ft_init_reflock(ft);
//Load translation table
{
......@@ -213,7 +211,7 @@ deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ft, uint32_t version)
}
toku_unlock_for_pwrite();
// Create table and read in data.
e = toku_blocktable_create_from_buffer(&h->blocktable,
e = toku_blocktable_create_from_buffer(&ft->blocktable,
translation_address_on_disk,
translation_size_on_disk,
tbuf);
......@@ -223,73 +221,69 @@ deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ft, uint32_t version)
}
}
h->root_blocknum = rbuf_blocknum(rb);
h->flags = rbuf_int(rb);
if (h->layout_version_read_from_disk <= FT_LAYOUT_VERSION_13) {
BLOCKNUM root_blocknum = rbuf_blocknum(rb);
unsigned flags = rbuf_int(rb);
if (ft->layout_version_read_from_disk <= FT_LAYOUT_VERSION_13) {
// deprecate 'TOKU_DB_VALCMP_BUILTIN'. just remove the flag
h->flags &= ~TOKU_DB_VALCMP_BUILTIN_13;
}
h->layout_version_original = rbuf_int(rb);
h->build_id_original = rbuf_int(rb);
h->time_of_creation = rbuf_ulonglong(rb);
h->time_of_last_modification = rbuf_ulonglong(rb);
h->time_of_last_verification = 0;
if (h->layout_version_read_from_disk <= FT_LAYOUT_VERSION_18) {
flags &= ~TOKU_DB_VALCMP_BUILTIN_13;
}
int layout_version_original = rbuf_int(rb);
uint32_t build_id_original = rbuf_int(rb);
uint64_t time_of_creation = rbuf_ulonglong(rb);
uint64_t time_of_last_modification = rbuf_ulonglong(rb);
if (ft->layout_version_read_from_disk <= FT_LAYOUT_VERSION_18) {
// 17 was the last version with these fields, we no longer store
// them, so read and discard them
(void) rbuf_ulonglong(rb); // num_blocks_to_upgrade_13
if (h->layout_version_read_from_disk >= FT_LAYOUT_VERSION_15) {
if (ft->layout_version_read_from_disk >= FT_LAYOUT_VERSION_15) {
(void) rbuf_ulonglong(rb); // num_blocks_to_upgrade_14
}
}
if (h->layout_version_read_from_disk >= FT_LAYOUT_VERSION_14) {
rbuf_TXNID(rb, &h->root_xid_that_created);
} else {
// fake creation during the last checkpoint
h->root_xid_that_created = h->checkpoint_lsn.lsn;
// fake creation during the last checkpoint
TXNID root_xid_that_created = checkpoint_lsn.lsn;
if (ft->layout_version_read_from_disk >= FT_LAYOUT_VERSION_14) {
rbuf_TXNID(rb, &root_xid_that_created);
}
if (h->layout_version_read_from_disk >= FT_LAYOUT_VERSION_15) {
h->basementnodesize = rbuf_int(rb);
h->time_of_last_verification = rbuf_ulonglong(rb);
} else {
h->basementnodesize = FT_DEFAULT_BASEMENT_NODE_SIZE;
h->time_of_last_verification = 0;
}
if (h->layout_version_read_from_disk >= FT_LAYOUT_VERSION_18) {
h->on_disk_stats.numrows = rbuf_ulonglong(rb);
h->on_disk_stats.numbytes = rbuf_ulonglong(rb);
h->in_memory_stats = h->on_disk_stats;
h->time_of_last_optimize_begin = rbuf_ulonglong(rb);
h->time_of_last_optimize_end = rbuf_ulonglong(rb);
h->count_of_optimize_in_progress = rbuf_int(rb);
h->count_of_optimize_in_progress_read_from_disk = h->count_of_optimize_in_progress;
h->msn_at_start_of_last_completed_optimize = rbuf_msn(rb);
} else {
e = toku_upgrade_subtree_estimates_to_stat64info(fd, h);
if (e != DS_OK) {
goto exit;
}
h->time_of_last_optimize_begin = 0;
h->time_of_last_optimize_end = 0;
h->count_of_optimize_in_progress = 0;
h->count_of_optimize_in_progress_read_from_disk = 0;
h->msn_at_start_of_last_completed_optimize = ZERO_MSN;
// TODO(leif): get this to default to what's specified, not the
// hard-coded default
unsigned basementnodesize = FT_DEFAULT_BASEMENT_NODE_SIZE;
uint64_t time_of_last_verification = 0;
if (ft->layout_version_read_from_disk >= FT_LAYOUT_VERSION_15) {
basementnodesize = rbuf_int(rb);
time_of_last_verification = rbuf_ulonglong(rb);
}
if (h->layout_version_read_from_disk >= FT_LAYOUT_VERSION_19) {
STAT64INFO_S on_disk_stats = ZEROSTATS;
uint64_t time_of_last_optimize_begin = 0;
uint64_t time_of_last_optimize_end = 0;
uint32_t count_of_optimize_in_progress = 0;
MSN msn_at_start_of_last_completed_optimize = ZERO_MSN;
if (ft->layout_version_read_from_disk >= FT_LAYOUT_VERSION_18) {
on_disk_stats.numrows = rbuf_ulonglong(rb);
on_disk_stats.numbytes = rbuf_ulonglong(rb);
ft->in_memory_stats = on_disk_stats;
time_of_last_optimize_begin = rbuf_ulonglong(rb);
time_of_last_optimize_end = rbuf_ulonglong(rb);
count_of_optimize_in_progress = rbuf_int(rb);
msn_at_start_of_last_completed_optimize = rbuf_msn(rb);
}
enum toku_compression_method compression_method;
MSN highest_unused_msn_for_upgrade = (MSN) { .msn = (MIN_MSN.msn - 1) };
if (ft->layout_version_read_from_disk >= FT_LAYOUT_VERSION_19) {
unsigned char method = rbuf_char(rb);
h->compression_method = (enum toku_compression_method) method;
h->highest_unused_msn_for_upgrade = rbuf_msn(rb);
compression_method = (enum toku_compression_method) method;
highest_unused_msn_for_upgrade = rbuf_msn(rb);
} else {
// we hard coded zlib until 5.2, then quicklz in 5.2
if (h->layout_version_read_from_disk < FT_LAYOUT_VERSION_18) {
h->compression_method = TOKU_ZLIB_METHOD;
if (ft->layout_version_read_from_disk < FT_LAYOUT_VERSION_18) {
compression_method = TOKU_ZLIB_METHOD;
} else {
h->compression_method = TOKU_QUICKLZ_METHOD;
compression_method = TOKU_QUICKLZ_METHOD;
}
h->highest_unused_msn_for_upgrade.msn = MIN_MSN.msn - 1;
}
(void) rbuf_int(rb); //Read in checksum and ignore (already verified).
......@@ -300,21 +294,57 @@ deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ft, uint32_t version)
goto exit;
}
invariant(h);
invariant((uint32_t) h->layout_version_read_from_disk == version);
e = deserialize_descriptor_from(fd, h->blocktable, &h->descriptor, version);
struct ft_header h = {
.type = FT_CURRENT,
.dirty = 0,
.checkpoint_count = checkpoint_count,
.checkpoint_lsn = checkpoint_lsn,
.layout_version = FT_LAYOUT_VERSION,
.layout_version_original = layout_version_original,
.build_id = build_id,
.build_id_original = build_id_original,
.time_of_creation = time_of_creation,
.root_xid_that_created = root_xid_that_created,
.time_of_last_modification = time_of_last_modification,
.time_of_last_verification = time_of_last_verification,
.root_blocknum = root_blocknum,
.flags = flags,
.nodesize = nodesize,
.basementnodesize = basementnodesize,
.compression_method = compression_method,
.highest_unused_msn_for_upgrade = highest_unused_msn_for_upgrade,
.time_of_last_optimize_begin = time_of_last_optimize_begin,
.time_of_last_optimize_end = time_of_last_optimize_end,
.count_of_optimize_in_progress = count_of_optimize_in_progress,
.count_of_optimize_in_progress_read_from_disk = count_of_optimize_in_progress,
.msn_at_start_of_last_completed_optimize = msn_at_start_of_last_completed_optimize,
.on_disk_stats = on_disk_stats
};
ft->h = toku_xmemdup(&h, sizeof h);
if (ft->layout_version_read_from_disk < FT_LAYOUT_VERSION_18) {
// This needs ft->h to be non-null, so we have to do it after we
// read everything else.
e = toku_upgrade_subtree_estimates_to_stat64info(fd, ft);
if (e != DS_OK) {
goto exit;
}
}
invariant((uint32_t) ft->layout_version_read_from_disk == version);
e = deserialize_descriptor_from(fd, ft->blocktable, &ft->descriptor, version);
if (e != DS_OK) {
goto exit;
}
// copy descriptor to cmp_descriptor for #4541
h->cmp_descriptor.dbt.size = h->descriptor.dbt.size;
h->cmp_descriptor.dbt.data = toku_xmemdup(h->descriptor.dbt.data, h->descriptor.dbt.size);
ft->cmp_descriptor.dbt.size = ft->descriptor.dbt.size;
ft->cmp_descriptor.dbt.data = toku_xmemdup(ft->descriptor.dbt.data, ft->descriptor.dbt.size);
// Version 13 descriptors had an extra 4 bytes that we don't read
// anymore. Since the header is going to think it's the current
// version if it gets written out, we need to write the descriptor in
// the new format (without those bytes) before that happens.
if (version <= FT_LAYOUT_VERSION_13) {
r = toku_update_descriptor(h, &h->cmp_descriptor, fd);
r = toku_update_descriptor(ft, &ft->cmp_descriptor, fd);
if (r != 0) {
errno = r;
e = DS_ERRNO;
......@@ -322,11 +352,11 @@ deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ft, uint32_t version)
}
}
exit:
if (e != DS_OK && h != NULL) {
toku_free(h);
h = NULL;
if (e != DS_OK && ft != NULL) {
toku_free(ft);
ft = NULL;
}
*ft = h;
*ftp = ft;
return e;
}
......@@ -625,7 +655,7 @@ toku_deserialize_ft_from(int fd,
}
int toku_serialize_ft_size (FT h) {
int toku_serialize_ft_size (FT_HEADER h) {
u_int32_t size = serialize_ft_min_size(h->layout_version);
//There is no dynamic data.
lazy_assert(size <= BLOCK_ALLOCATOR_HEADER_RESERVE);
......@@ -633,7 +663,13 @@ int toku_serialize_ft_size (FT h) {
}
int toku_serialize_ft_to_wbuf (struct wbuf *wbuf, FT h, DISKOFF translation_location_on_disk, DISKOFF translation_size_on_disk) {
int toku_serialize_ft_to_wbuf (
struct wbuf *wbuf,
FT_HEADER h,
DISKOFF translation_location_on_disk,
DISKOFF translation_size_on_disk
)
{
wbuf_literal_bytes(wbuf, "tokudata", 8);
wbuf_network_int (wbuf, h->layout_version); //MUST be in network order regardless of disk order
wbuf_network_int (wbuf, BUILD_ID); //MUST be in network order regardless of disk order
......@@ -643,7 +679,6 @@ int toku_serialize_ft_to_wbuf (struct wbuf *wbuf, FT h, DISKOFF translation_loca
wbuf_LSN (wbuf, h->checkpoint_lsn);
wbuf_int (wbuf, h->nodesize);
//printf("%s:%d bta=%lu size=%lu\n", __FILE__, __LINE__, h->block_translation_address_on_disk, 4 + 16*h->translated_blocknum_limit);
wbuf_DISKOFF(wbuf, translation_location_on_disk);
wbuf_DISKOFF(wbuf, translation_size_on_disk);
wbuf_BLOCKNUM(wbuf, h->root_blocknum);
......@@ -655,8 +690,8 @@ int toku_serialize_ft_to_wbuf (struct wbuf *wbuf, FT h, DISKOFF translation_loca
wbuf_TXNID(wbuf, h->root_xid_that_created);
wbuf_int(wbuf, h->basementnodesize);
wbuf_ulonglong(wbuf, h->time_of_last_verification);
wbuf_ulonglong(wbuf, h->checkpoint_staging_stats.numrows);
wbuf_ulonglong(wbuf, h->checkpoint_staging_stats.numbytes);
wbuf_ulonglong(wbuf, h->on_disk_stats.numrows);
wbuf_ulonglong(wbuf, h->on_disk_stats.numbytes);
wbuf_ulonglong(wbuf, h->time_of_last_optimize_begin);
wbuf_ulonglong(wbuf, h->time_of_last_optimize_end);
wbuf_int(wbuf, h->count_of_optimize_in_progress);
......@@ -669,23 +704,21 @@ int toku_serialize_ft_to_wbuf (struct wbuf *wbuf, FT h, DISKOFF translation_loca
return 0;
}
int toku_serialize_ft_to (int fd, FT h) {
int toku_serialize_ft_to (int fd, FT_HEADER h, BLOCK_TABLE blocktable, CACHEFILE cf) {
int rr = 0;
if (h->panic) return h->panic;
lazy_assert(h->type==FT_CHECKPOINT_INPROGRESS);
toku_ft_lock(h);
struct wbuf w_translation;
int64_t size_translation;
int64_t address_translation;
{
//Must serialize translation first, to get address,size for header.
toku_serialize_translation_to_wbuf_unlocked(h->blocktable, &w_translation,
toku_serialize_translation_to_wbuf(blocktable, &w_translation,
&address_translation,
&size_translation);
lazy_assert(size_translation==w_translation.size);
}
struct wbuf w_main;
unsigned int size_main = toku_serialize_ft_size (h);
unsigned int size_main = toku_serialize_ft_size(h);
{
wbuf_init(&w_main, toku_xmalloc(size_main), size_main);
{
......@@ -694,7 +727,6 @@ int toku_serialize_ft_to (int fd, FT h) {
}
lazy_assert(w_main.ndone==size_main);
}
toku_ft_unlock(h);
toku_lock_for_pwrite();
{
//Actual Write translation table
......@@ -708,8 +740,8 @@ int toku_serialize_ft_to (int fd, FT h) {
//If the header has a cachefile we need to do cachefile fsync (to
//prevent crash if we redirected to dev null)
//If there is no cachefile we still need to do an fsync.
if (h->cf) {
rr = toku_cachefile_fsync(h->cf);
if (cf) {
rr = toku_cachefile_fsync(cf);
}
else {
rr = toku_file_fsync(fd);
......
......@@ -74,7 +74,7 @@ int toku_testsetup_nonleaf (FT_HANDLE brt, int height, BLOCKNUM *blocknum, int n
int toku_testsetup_root(FT_HANDLE brt, BLOCKNUM blocknum) {
assert(testsetup_initialized);
brt->ft->root_blocknum = blocknum;
brt->ft->h->root_blocknum = blocknum;
return 0;
}
......
......@@ -410,8 +410,8 @@ toku_verify_ft_with_progress (FT_HANDLE brt, int (*progress_callback)(void *extr
int r = toku_verify_ftnode(brt, ZERO_MSN, ZERO_MSN, root_node, -1, NULL, NULL, progress_callback, progress_extra, 1, verbose, keep_on_going);
if (r == 0) {
toku_ft_lock(brt->ft);
brt->ft->time_of_last_verification = time(NULL);
brt->ft->dirty = 1;
brt->ft->h->time_of_last_verification = time(NULL);
brt->ft->h->dirty = 1;
toku_ft_unlock(brt->ft);
}
return r;
......
......@@ -14,89 +14,100 @@ toku_ft_suppress_rollbacks(FT h, TOKUTXN txn) {
assert(h->txnid_that_created_or_locked_when_empty == TXNID_NONE ||
h->txnid_that_created_or_locked_when_empty == txnid);
h->txnid_that_created_or_locked_when_empty = txnid;
TXNID rootid = toku_txn_get_root_txnid(txn);
assert(h->root_that_created_or_locked_when_empty == TXNID_NONE ||
h->root_that_created_or_locked_when_empty == rootid);
h->root_that_created_or_locked_when_empty = rootid;
}
void
toku_reset_root_xid_that_created(FT h, TXNID new_root_xid_that_created) {
toku_reset_root_xid_that_created(FT ft, TXNID new_root_xid_that_created) {
// Reset the root_xid_that_created field to the given value.
// This redefines which xid created the dictionary.
// hold lock around setting and clearing of dirty bit
// (see cooperative use of dirty bit in ft_begin_checkpoint())
toku_ft_lock (h);
h->root_xid_that_created = new_root_xid_that_created;
h->dirty = 1;
toku_ft_unlock (h);
toku_ft_lock (ft);
ft->h->root_xid_that_created = new_root_xid_that_created;
ft->h->dirty = 1;
toku_ft_unlock (ft);
}
static void
ft_destroy(FT h) {
if (!h->panic) assert(!h->checkpoint_header);
ft_destroy(FT ft) {
if (!ft->panic) assert(!ft->checkpoint_header);
//header and checkpoint_header have same Blocktable pointer
//cannot destroy since it is still in use by CURRENT
if (h->type == FT_CHECKPOINT_INPROGRESS) h->blocktable = NULL;
else {
assert(h->type == FT_CURRENT);
toku_blocktable_destroy(&h->blocktable);
if (h->descriptor.dbt.data) toku_free(h->descriptor.dbt.data);
if (h->cmp_descriptor.dbt.data) toku_free(h->cmp_descriptor.dbt.data);
toku_ft_destroy_treelock(h);
toku_omt_destroy(&h->txns);
}
assert(ft->h->type == FT_CURRENT);
toku_blocktable_destroy(&ft->blocktable);
if (ft->descriptor.dbt.data) toku_free(ft->descriptor.dbt.data);
if (ft->cmp_descriptor.dbt.data) toku_free(ft->cmp_descriptor.dbt.data);
toku_ft_destroy_treelock(ft);
toku_ft_destroy_reflock(ft);
toku_omt_destroy(&ft->txns);
toku_free(ft->h);
}
// Make a copy of the header for the purpose of a checkpoint
// Not reentrant for a single FT.
// See ft_checkpoint for explanation of why
// FT lock must be held.
static void
ft_copy_for_checkpoint(FT h, LSN checkpoint_lsn) {
assert(h->type == FT_CURRENT);
assert(h->checkpoint_header == NULL);
assert(h->panic==0);
ft_copy_for_checkpoint_unlocked(FT ft, LSN checkpoint_lsn) {
assert(ft->h->type == FT_CURRENT);
assert(ft->checkpoint_header == NULL);
assert(ft->panic==0);
FT XMALLOC(ch);
*ch = *h; //Do a shallow copy
FT_HEADER ch = toku_xmemdup(ft->h, sizeof *ft->h);
ch->type = FT_CHECKPOINT_INPROGRESS; //Different type
//printf("checkpoint_lsn=%" PRIu64 "\n", checkpoint_lsn.lsn);
ch->checkpoint_lsn = checkpoint_lsn;
ch->panic_string = NULL;
//ch->blocktable is SHARED between the two headers
h->checkpoint_header = ch;
ft->checkpoint_header = ch;
}
static void
ft_free(FT h) {
ft_destroy(h);
toku_free(h);
void
toku_ft_free (FT ft) {
ft_destroy(ft);
toku_free(ft);
}
void
toku_ft_init_treelock(FT ft) {
toku_mutex_init(&ft->tree_lock, NULL);
}
void
toku_ft_destroy_treelock(FT ft) {
toku_mutex_destroy(&ft->tree_lock);
}
void
toku_ft_grab_treelock(FT ft) {
toku_mutex_lock(&ft->tree_lock);
}
void
toku_ft_free (FT h) {
ft_free(h);
toku_ft_release_treelock(FT ft) {
toku_mutex_unlock(&ft->tree_lock);
}
void
toku_ft_init_treelock(FT h) {
toku_mutex_init(&h->tree_lock, NULL);
toku_ft_init_reflock(FT ft) {
toku_mutex_init(&ft->ft_ref_lock, NULL);
}
void
toku_ft_destroy_treelock(FT h) {
toku_mutex_destroy(&h->tree_lock);
toku_ft_destroy_reflock(FT ft) {
toku_mutex_destroy(&ft->ft_ref_lock);
}
void
toku_ft_grab_treelock(FT h) {
toku_mutex_lock(&h->tree_lock);
toku_ft_grab_reflock(FT ft) {
toku_mutex_lock(&ft->ft_ref_lock);
}
void
toku_ft_release_treelock(FT h) {
toku_mutex_unlock(&h->tree_lock);
toku_ft_release_reflock(FT ft) {
toku_mutex_unlock(&ft->ft_ref_lock);
}
/////////////////////////////////////////////////////////////////////////
......@@ -106,13 +117,13 @@ toku_ft_release_treelock(FT h) {
// maps to cf->log_fassociate_during_checkpoint
static int
ft_log_fassociate_during_checkpoint (CACHEFILE cf, void *header_v) {
FT h = header_v;
FT ft = header_v;
char* fname_in_env = toku_cachefile_fname_in_env(cf);
BYTESTRING bs = { strlen(fname_in_env), // don't include the NUL
fname_in_env };
TOKULOGGER logger = toku_cachefile_logger(cf);
FILENUM filenum = toku_cachefile_filenum (cf);
int r = toku_log_fassociate(logger, NULL, 0, filenum, h->flags, bs);
int r = toku_log_fassociate(logger, NULL, 0, filenum, ft->h->flags, bs);
return r;
}
......@@ -133,43 +144,64 @@ ft_log_suppress_rollback_during_checkpoint (CACHEFILE cf, void *header_v) {
// Maps to cf->begin_checkpoint_userdata
// Create checkpoint-in-progress versions of header and translation (btt) (and fifo for now...).
//Has access to fd (it is protected).
// Has access to fd (it is protected).
//
// Not reentrant for a single FT (see ft_checkpoint)
static int
ft_begin_checkpoint (LSN checkpoint_lsn, void *header_v) {
FT h = header_v;
int r = h->panic;
FT ft = header_v;
int r = ft->panic;
if (r==0) {
// hold lock around copying and clearing of dirty bit
toku_ft_lock (h);
assert(h->type == FT_CURRENT);
assert(h->checkpoint_header == NULL);
ft_copy_for_checkpoint(h, checkpoint_lsn);
h->dirty = 0; // this is only place this bit is cleared (in currentheader)
// on_disk_stats includes on disk changes since last checkpoint,
// so checkpoint_staging_stats now includes changes for checkpoint in progress.
h->checkpoint_staging_stats = h->on_disk_stats;
toku_block_translation_note_start_checkpoint_unlocked(h->blocktable);
toku_ft_unlock (h);
toku_ft_lock (ft);
assert(ft->h->type == FT_CURRENT);
assert(ft->checkpoint_header == NULL);
ft_copy_for_checkpoint_unlocked(ft, checkpoint_lsn);
ft->h->dirty = 0; // this is only place this bit is cleared (in currentheader)
toku_block_translation_note_start_checkpoint_unlocked(ft->blocktable);
toku_ft_unlock (ft);
}
return r;
}
// #4922: Hack to remove data corruption race condition.
// Reading (and upgrading) a node up to version 19 causes this.
// We COULD skip this if we know that no nodes remained (as of last checkpoint)
// that are below version 19.
// If there are no nodes < version 19 this is harmless (field is unused).
// If there are, this will make certain the value is at least as low as necessary,
// and not much lower. (Too low is good, too high can cause data corruption).
// TODO(yoni): If we ever stop supporting upgrades of nodes < version 19 we can delete this.
// TODO(yoni): If we know no nodes are left to upgrade, we can skip this. (Probably not worth doing).
static void
ft_hack_highest_unused_msn_for_upgrade_for_checkpoint(FT ft) {
if (ft->h->layout_version_original < FT_LAYOUT_VERSION_19) {
ft->checkpoint_header->highest_unused_msn_for_upgrade = ft->h->highest_unused_msn_for_upgrade;
}
}
// maps to cf->checkpoint_userdata
// Write checkpoint-in-progress versions of header and translation to disk (really to OS internal buffer).
// Copy current header's version of checkpoint_staging stat64info to checkpoint header.
// Must have access to fd (protected).
// Requires: all pending bits are clear. This implies that no thread will modify the checkpoint_staging
// version of the stat64info.
//
// No locks are taken for checkpoint_count/lsn because this is single threaded. Can be called by:
// - ft_close
// - end_checkpoint
// checkpoints hold references to FTs and so they cannot be closed during a checkpoint.
// ft_close is not reentrant for a single FT
// end_checkpoint is not reentrant period
static int
ft_checkpoint (CACHEFILE cf, int fd, void *header_v) {
FT h = header_v;
FT ch = h->checkpoint_header;
FT ft = header_v;
FT_HEADER ch = ft->checkpoint_header;
int r = 0;
if (h->panic!=0) goto handle_error;
if (ft->panic!=0) goto handle_error;
//printf("%s:%d allocated_limit=%lu writing queue to %lu\n", __FILE__, __LINE__,
// block_allocator_allocated_limit(h->block_allocator), h->unused_blocks.b*h->nodesize);
assert(ch);
if (ch->panic!=0) goto handle_error;
assert(ch->type == FT_CHECKPOINT_INPROGRESS);
if (ch->dirty) { // this is only place this bit is tested (in checkpoint_header)
TOKULOGGER logger = toku_cachefile_logger(cf);
......@@ -178,22 +210,13 @@ ft_checkpoint (CACHEFILE cf, int fd, void *header_v) {
if (r!=0) goto handle_error;
}
uint64_t now = (uint64_t) time(NULL); // 4018;
h->time_of_last_modification = now;
ft->h->time_of_last_modification = now;
ch->time_of_last_modification = now;
ch->checkpoint_count++;
// Threadsafety of checkpoint_staging_stats here depends on there being no pending bits set,
// so that all callers to flush callback should have the for_checkpoint argument false,
// and therefore will not modify the checkpoint_staging_stats.
// TODO 4184: If the flush callback is called with the for_checkpoint argument true even when all the pending bits
// are clear, then this is a problem.
ch->checkpoint_staging_stats = h->checkpoint_staging_stats;
// The in_memory_stats and on_disk_stats in the checkpoint header should be ignored, but we set them
// here just in case the serializer looks in the wrong place.
ch->in_memory_stats = ch->checkpoint_staging_stats;
ch->on_disk_stats = ch->checkpoint_staging_stats;
ft_hack_highest_unused_msn_for_upgrade_for_checkpoint(ft);
// write translation and header to disk (or at least to OS internal buffer)
r = toku_serialize_ft_to(fd, ch);
r = toku_serialize_ft_to(fd, ch, ft->blocktable, ft->cf);
if (r!=0) goto handle_error;
ch->dirty = 0; // this is only place this bit is cleared (in checkpoint_header)
......@@ -202,22 +225,16 @@ ft_checkpoint (CACHEFILE cf, int fd, void *header_v) {
if (r!=0) {
goto handle_error;
}
h->checkpoint_count++; // checkpoint succeeded, next checkpoint will save to alternate header location
h->checkpoint_lsn = ch->checkpoint_lsn; //Header updated.
ft->h->checkpoint_count++; // checkpoint succeeded, next checkpoint will save to alternate header location
ft->h->checkpoint_lsn = ch->checkpoint_lsn; //Header updated.
}
else {
toku_block_translation_note_skipped_checkpoint(ch->blocktable);
toku_block_translation_note_skipped_checkpoint(ft->blocktable);
}
if (0) {
handle_error:
if (h->panic) r = h->panic;
else if (ch->panic) {
r = ch->panic;
//Steal panic string. Cannot afford to malloc.
h->panic = ch->panic;
h->panic_string = ch->panic_string;
}
else toku_block_translation_note_failed_checkpoint(ch->blocktable);
if (ft->panic) r = ft->panic;
else toku_block_translation_note_failed_checkpoint(ft->blocktable);
}
return r;
......@@ -229,15 +246,15 @@ ft_checkpoint (CACHEFILE cf, int fd, void *header_v) {
// Must have access to fd (protected)
static int
ft_end_checkpoint (CACHEFILE UU(cachefile), int fd, void *header_v) {
FT h = header_v;
int r = h->panic;
FT ft = header_v;
int r = ft->panic;
if (r==0) {
assert(h->type == FT_CURRENT);
toku_block_translation_note_end_checkpoint(h->blocktable, fd, h);
assert(ft->h->type == FT_CURRENT);
toku_block_translation_note_end_checkpoint(ft->blocktable, fd, ft);
}
if (h->checkpoint_header) { // could be NULL only if panic was true at begin_checkpoint
ft_free(h->checkpoint_header);
h->checkpoint_header = NULL;
if (ft->checkpoint_header) { // could be NULL only if panic was true at begin_checkpoint
toku_free(ft->checkpoint_header);
ft->checkpoint_header = NULL;
}
return r;
}
......@@ -246,16 +263,16 @@ ft_end_checkpoint (CACHEFILE UU(cachefile), int fd, void *header_v) {
// Has access to fd (it is protected).
static int
ft_close (CACHEFILE cachefile, int fd, void *header_v, char **malloced_error_string, BOOL oplsn_valid, LSN oplsn) {
FT h = header_v;
assert(h->type == FT_CURRENT);
toku_ft_lock(h);
assert(!toku_ft_needed(h));
toku_ft_unlock(h);
FT ft = header_v;
assert(ft->h->type == FT_CURRENT);
// We already have exclusive access to this field already, so skip the locking.
// This should already never fail.
invariant(!toku_ft_needed_unlocked(ft));
int r = 0;
if (h->panic) {
r = h->panic;
if (ft->panic) {
r = ft->panic;
} else {
assert(h->cf == cachefile);
assert(ft->cf == cachefile);
TOKULOGGER logger = toku_cachefile_logger(cachefile);
LSN lsn = ZERO_LSN;
//Get LSN
......@@ -263,8 +280,8 @@ ft_close (CACHEFILE cachefile, int fd, void *header_v, char **malloced_error_str
//Use recovery-specified lsn
lsn = oplsn;
//Recovery cannot reduce lsn of a header.
if (lsn.lsn < h->checkpoint_lsn.lsn)
lsn = h->checkpoint_lsn;
if (lsn.lsn < ft->h->checkpoint_lsn.lsn)
lsn = ft->h->checkpoint_lsn;
}
else {
//Get LSN from logger
......@@ -273,11 +290,11 @@ ft_close (CACHEFILE cachefile, int fd, void *header_v, char **malloced_error_str
char* fname_in_env = toku_cachefile_fname_in_env(cachefile);
assert(fname_in_env);
BYTESTRING bs = {.len=strlen(fname_in_env), .data=fname_in_env};
r = toku_log_fclose(logger, &lsn, h->dirty, bs, toku_cachefile_filenum(cachefile)); // flush the log on close (if new header is being written), otherwise it might not make it out.
r = toku_log_fclose(logger, &lsn, ft->h->dirty, bs, toku_cachefile_filenum(cachefile)); // flush the log on close (if new header is being written), otherwise it might not make it out.
if (r!=0) return r;
}
}
if (h->dirty) { // this is the only place this bit is tested (in currentheader)
if (ft->h->dirty) { // this is the only place this bit is tested (in currentheader)
if (logger) { //Rollback cachefile MUST NOT BE CLOSED DIRTY
//It can be checkpointed only via 'checkpoint'
assert(logger->rollback_cachefile != cachefile);
......@@ -286,18 +303,18 @@ ft_close (CACHEFILE cachefile, int fd, void *header_v, char **malloced_error_str
//assert(lsn.lsn!=0);
r2 = ft_begin_checkpoint(lsn, header_v);
if (r==0) r = r2;
r2 = ft_checkpoint(cachefile, fd, h);
r2 = ft_checkpoint(cachefile, fd, ft);
if (r==0) r = r2;
r2 = ft_end_checkpoint(cachefile, fd, header_v);
if (r==0) r = r2;
if (!h->panic) assert(!h->dirty); // dirty bit should be cleared by begin_checkpoint and never set again (because we're closing the dictionary)
if (!ft->panic) assert(!ft->h->dirty); // dirty bit should be cleared by begin_checkpoint and never set again (because we're closing the dictionary)
}
}
if (malloced_error_string) *malloced_error_string = h->panic_string;
if (malloced_error_string) *malloced_error_string = ft->panic_string;
if (r == 0) {
r = h->panic;
r = ft->panic;
}
toku_ft_free(h);
toku_ft_free(ft);
return r;
}
......@@ -309,84 +326,77 @@ ft_note_pin_by_checkpoint (CACHEFILE UU(cachefile), void *header_v)
{
//Set arbitrary brt (for given header) as pinned by checkpoint.
//Only one can be pinned (only one checkpoint at a time), but not worth verifying.
FT h = header_v;
assert(!h->pinned_by_checkpoint);
h->pinned_by_checkpoint = true;
FT ft = header_v;
// Note: open_close lock is held by checkpoint begin
toku_ft_grab_reflock(ft);
assert(!ft->pinned_by_checkpoint);
assert(toku_ft_needed_unlocked(ft));
ft->pinned_by_checkpoint = true;
toku_ft_release_reflock(ft);
return 0;
}
static void
unpin_by_checkpoint_callback(FT ft, void *extra) {
invariant(extra == NULL);
invariant(ft->pinned_by_checkpoint);
ft->pinned_by_checkpoint = false; //Unpin
}
// maps to cf->note_unpin_by_checkpoint
//Must be protected by ydb lock.
//Called by end_checkpoint, which grabs ydb lock around note_unpin
static int
ft_note_unpin_by_checkpoint (CACHEFILE UU(cachefile), void *header_v)
{
FT h = header_v;
assert(h->pinned_by_checkpoint);
h->pinned_by_checkpoint = false; //Unpin
int r = 0;
//Close if necessary
if (!toku_ft_needed(h)) {
//Close immediately.
char *error_string = NULL;
r = toku_remove_ft(h, &error_string, false, ZERO_LSN);
lazy_assert_zero(r);
}
return r;
FT ft = header_v;
toku_ft_remove_reference(ft, false, ZERO_LSN, unpin_by_checkpoint_callback, NULL);
return 0;
}
//
// End of Functions that are callbacks to the cachefile
/////////////////////////////////////////////////////////////////////////
static int setup_initial_ft_root_node (FT h, BLOCKNUM blocknum) {
static int setup_initial_ft_root_node (FT ft, BLOCKNUM blocknum) {
FTNODE XMALLOC(node);
toku_initialize_empty_ftnode(node, blocknum, 0, 1, h->layout_version, h->nodesize, h->flags);
toku_initialize_empty_ftnode(node, blocknum, 0, 1, ft->h->layout_version, ft->h->nodesize, ft->h->flags);
BP_STATE(node,0) = PT_AVAIL;
u_int32_t fullhash = toku_cachetable_hash(h->cf, blocknum);
u_int32_t fullhash = toku_cachetable_hash(ft->cf, blocknum);
node->fullhash = fullhash;
int r = toku_cachetable_put(h->cf, blocknum, fullhash,
int r = toku_cachetable_put(ft->cf, blocknum, fullhash,
node, make_ftnode_pair_attr(node),
get_write_callbacks_for_node(h));
get_write_callbacks_for_node(ft));
if (r != 0)
toku_free(node);
else
toku_unpin_ftnode(h, node);
toku_unpin_ftnode(ft, node);
return r;
}
static int
ft_init (FT ft, FT_OPTIONS options, CACHEFILE cf, TOKUTXN txn) {
ft->type = FT_CURRENT;
ft_init(FT ft, FT_OPTIONS options, CACHEFILE cf) {
ft->checkpoint_header = NULL;
toku_ft_init_treelock(ft);
toku_blocktable_create_new(&ft->blocktable);
//Assign blocknum for root block, also dirty the header
toku_allocate_blocknum(ft->blocktable, &ft->root_blocknum, ft);
ft->layout_version_read_from_disk = FT_LAYOUT_VERSION; // fake, prevent unnecessary upgrade logic
toku_list_init(&ft->live_ft_handles);
int r = toku_omt_create(&ft->txns);
assert_zero(r);
ft->flags = options->flags;
ft->nodesize = options->nodesize;
ft->basementnodesize = options->basementnodesize;
ft->compression_method = options->compression_method;
ft->compare_fun = options->compare_fun;
ft->update_fun = options->update_fun;
if (ft->cf!=NULL) assert(ft->cf == cf);
if (ft->cf != NULL) {
assert(ft->cf == cf);
}
ft->cf = cf;
ft->root_xid_that_created = txn ? txn->ancestor_txnid64 : TXNID_NONE;
ft->in_memory_stats = ZEROSTATS;
ft->on_disk_stats = ZEROSTATS;
ft->checkpoint_staging_stats = ZEROSTATS;
ft->highest_unused_msn_for_upgrade.msn = MIN_MSN.msn - 1;
ft->in_memory_stats = ZEROSTATS;
r = setup_initial_ft_root_node(ft, ft->root_blocknum);
if (r != 0) {
goto exit;
r = setup_initial_ft_root_node(ft, ft->h->root_blocknum);
if (r != 0) {
goto exit;
}
//printf("%s:%d putting %p (%d)\n", __FILE__, __LINE__, ft, 0);
toku_cachefile_set_userdata(ft->cf,
......@@ -407,32 +417,60 @@ ft_init (FT ft, FT_OPTIONS options, CACHEFILE cf, TOKUTXN txn) {
}
// allocate and initialize a brt header.
// t->ft->cf is not set to anything.
int
static FT_HEADER
ft_header_new(FT_OPTIONS options, BLOCKNUM root_blocknum, TXNID root_xid_that_created)
{
uint64_t now = (uint64_t) time(NULL);
struct ft_header h = {
.type = FT_CURRENT,
.dirty = 0,
.checkpoint_count = 0,
.checkpoint_lsn = ZERO_LSN,
.layout_version = FT_LAYOUT_VERSION,
.layout_version_original = FT_LAYOUT_VERSION,
.build_id = BUILD_ID,
.build_id_original = BUILD_ID,
.time_of_creation = now,
.root_xid_that_created = root_xid_that_created,
.time_of_last_modification = now,
.time_of_last_verification = 0,
.root_blocknum = root_blocknum,
.flags = options->flags,
.nodesize = options->nodesize,
.basementnodesize = options->basementnodesize,
.compression_method = options->compression_method,
.highest_unused_msn_for_upgrade = { .msn = (MIN_MSN.msn - 1) },
.time_of_last_optimize_begin = 0,
.time_of_last_optimize_end = 0,
.count_of_optimize_in_progress = 0,
.count_of_optimize_in_progress_read_from_disk = 0,
.msn_at_start_of_last_completed_optimize = ZERO_MSN,
.on_disk_stats = ZEROSTATS
};
return toku_xmemdup(&h, sizeof h);
}
// allocate and initialize a fractal tree.
// t->ft->cf is not set to anything. TODO(leif): I don't think that's true
int
toku_create_new_ft(FT *ftp, FT_OPTIONS options, CACHEFILE cf, TOKUTXN txn) {
int r;
invariant(ftp);
FT XCALLOC(ft);
ft->layout_version = FT_LAYOUT_VERSION;
ft->layout_version_original = FT_LAYOUT_VERSION;
ft->layout_version_read_from_disk = FT_LAYOUT_VERSION; // fake, prevent unnecessary upgrade logic
ft->build_id = BUILD_ID;
ft->build_id_original = BUILD_ID;
uint64_t now = (uint64_t) time(NULL);
ft->time_of_creation = now;
ft->time_of_last_modification = now;
ft->time_of_last_verification = 0;
memset(&ft->descriptor, 0, sizeof(ft->descriptor));
memset(&ft->cmp_descriptor, 0, sizeof(ft->cmp_descriptor));
r = ft_init(ft, options, cf, txn);
ft->h = ft_header_new(options, make_blocknum(0), (txn ? txn->ancestor_txnid64 : TXNID_NONE));
toku_ft_init_treelock(ft);
toku_ft_init_reflock(ft);
toku_blocktable_create_new(&ft->blocktable);
//Assign blocknum for root block, also dirty the header
toku_allocate_blocknum(ft->blocktable, &ft->h->root_blocknum, ft);
r = ft_init(ft, options, cf);
if (r != 0) {
goto exit;
}
......@@ -504,22 +542,30 @@ int toku_read_ft_and_store_in_cachefile (FT_HANDLE brt, CACHEFILE cf, LSN max_ac
void
toku_ft_note_ft_handle_open(FT ft, FT_HANDLE live) {
toku_ft_lock(ft);
toku_ft_grab_reflock(ft);
live->ft = ft;
toku_list_push(&ft->live_ft_handles, &live->live_ft_handle_link);
toku_ft_unlock(ft);
toku_ft_release_reflock(ft);
}
int
toku_ft_needed(FT h) {
toku_ft_needed_unlocked(FT h) {
return !toku_list_empty(&h->live_ft_handles) || toku_omt_size(h->txns) != 0 || h->pinned_by_checkpoint;
}
BOOL
toku_ft_has_one_reference_unlocked(FT ft) {
u_int32_t pinned_by_checkpoint = ft->pinned_by_checkpoint ? 1 : 0;
u_int32_t num_txns = toku_omt_size(ft->txns);
int num_handles = toku_list_num_elements_est(&ft->live_ft_handles);
return ((pinned_by_checkpoint + num_txns + num_handles) == 1);
}
// Close brt. If opsln_valid, use given oplsn as lsn in brt header instead of logging
// the close and using the lsn provided by logging the close. (Subject to constraint
// that if a newer lsn is already in the dictionary, don't overwrite the dictionary.)
int toku_remove_ft (FT h, char **error_string, BOOL oplsn_valid, LSN oplsn) {
assert(!h->pinned_by_checkpoint);
int r = 0;
// Must do this work before closing the cf
if (h->cf) {
......@@ -534,11 +580,11 @@ int toku_remove_ft (FT h, char **error_string, BOOL oplsn_valid, LSN oplsn) {
// for this header, returns NULL
FT_HANDLE toku_ft_get_some_existing_ft_handle(FT h) {
FT_HANDLE ft_handle_ret = NULL;
toku_ft_lock(h);
toku_ft_grab_reflock(h);
if (!toku_list_empty(&h->live_ft_handles)) {
ft_handle_ret = toku_list_struct(toku_list_head(&h->live_ft_handles), struct ft_handle, live_ft_handle_link);
}
toku_ft_unlock(h);
toku_ft_release_reflock(h);
return ft_handle_ret;
}
......@@ -548,16 +594,16 @@ FT_HANDLE toku_ft_get_some_existing_ft_handle(FT h) {
// convenient here for keeping the HOT variables threadsafe.)
void
toku_ft_note_hot_begin(FT_HANDLE brt) {
FT h = brt->ft;
FT ft = brt->ft;
time_t now = time(NULL);
// hold lock around setting and clearing of dirty bit
// (see cooperative use of dirty bit in ft_begin_checkpoint())
toku_ft_lock(h);
h->time_of_last_optimize_begin = now;
h->count_of_optimize_in_progress++;
h->dirty = 1;
toku_ft_unlock(h);
toku_ft_lock(ft);
ft->h->time_of_last_optimize_begin = now;
ft->h->count_of_optimize_in_progress++;
ft->h->dirty = 1;
toku_ft_unlock(ft);
}
......@@ -565,47 +611,45 @@ toku_ft_note_hot_begin(FT_HANDLE brt) {
// Note: See note for toku_ft_note_hot_begin().
void
toku_ft_note_hot_complete(FT_HANDLE brt, BOOL success, MSN msn_at_start_of_hot) {
FT h = brt->ft;
FT ft = brt->ft;
time_t now = time(NULL);
toku_ft_lock(h);
h->count_of_optimize_in_progress--;
toku_ft_lock(ft);
ft->h->count_of_optimize_in_progress--;
if (success) {
h->time_of_last_optimize_end = now;
h->msn_at_start_of_last_completed_optimize = msn_at_start_of_hot;
ft->h->time_of_last_optimize_end = now;
ft->h->msn_at_start_of_last_completed_optimize = msn_at_start_of_hot;
// If we just successfully completed an optimization and no other thread is performing
// an optimization, then the number of optimizations in progress is zero.
// If there was a crash during a HOT optimization, this is how count_of_optimize_in_progress
// would be reset to zero on the disk after recovery from that crash.
if (h->count_of_optimize_in_progress == h->count_of_optimize_in_progress_read_from_disk)
h->count_of_optimize_in_progress = 0;
if (ft->h->count_of_optimize_in_progress == ft->h->count_of_optimize_in_progress_read_from_disk)
ft->h->count_of_optimize_in_progress = 0;
}
h->dirty = 1;
toku_ft_unlock(h);
ft->h->dirty = 1;
toku_ft_unlock(ft);
}
void
toku_ft_init(FT h,
BLOCKNUM root_blocknum_on_disk, LSN checkpoint_lsn, TXNID root_xid_that_created, uint32_t target_nodesize, uint32_t target_basementnodesize, enum toku_compression_method compression_method) {
memset(h, 0, sizeof *h);
h->layout_version = FT_LAYOUT_VERSION;
h->layout_version_original = FT_LAYOUT_VERSION;
h->build_id = BUILD_ID;
h->build_id_original = BUILD_ID;
uint64_t now = (uint64_t) time(NULL);
h->time_of_creation = now;
h->time_of_last_modification = now;
h->time_of_last_verification = 0;
h->checkpoint_count = 1;
h->checkpoint_lsn = checkpoint_lsn;
h->nodesize = target_nodesize;
h->basementnodesize = target_basementnodesize;
h->root_blocknum = root_blocknum_on_disk;
h->flags = 0;
h->root_xid_that_created = root_xid_that_created;
h->compression_method = compression_method;
h->highest_unused_msn_for_upgrade.msn = MIN_MSN.msn - 1;
toku_ft_init(FT ft,
BLOCKNUM root_blocknum_on_disk,
LSN checkpoint_lsn,
TXNID root_xid_that_created,
uint32_t target_nodesize,
uint32_t target_basementnodesize,
enum toku_compression_method compression_method)
{
memset(ft, 0, sizeof *ft);
struct ft_options options = {
.nodesize = target_nodesize,
.basementnodesize = target_basementnodesize,
.compression_method = compression_method,
.flags = 0
};
ft->h = ft_header_new(&options, root_blocknum_on_disk, root_xid_that_created);
ft->h->checkpoint_count = 1;
ft->h->checkpoint_lsn = checkpoint_lsn;
}
// Open a brt for use by redirect. The new brt must have the same dict_id as the old_ft passed in. (FILENUM is assigned by the ft_handle_open() function.)
......@@ -620,11 +664,11 @@ ft_handle_open_for_redirect(FT_HANDLE *new_ftp, const char *fname_in_env, TOKUTX
assert_zero(r);
r = toku_ft_set_update(t, old_h->update_fun);
assert_zero(r);
r = toku_ft_set_nodesize(t, old_h->nodesize);
r = toku_ft_set_nodesize(t, old_h->h->nodesize);
assert_zero(r);
r = toku_ft_set_basementnodesize(t, old_h->basementnodesize);
r = toku_ft_set_basementnodesize(t, old_h->h->basementnodesize);
assert_zero(r);
r = toku_ft_set_compression_method(t, old_h->compression_method);
r = toku_ft_set_compression_method(t, old_h->h->compression_method);
assert_zero(r);
CACHETABLE ct = toku_cachefile_get_cachetable(old_h->cf);
r = toku_ft_handle_open_with_dict_id(t, fname_in_env, 0, 0, ct, txn, old_h->dict_id);
......@@ -662,14 +706,13 @@ dictionary_redirect_internal(const char *dst_fname_in_env, FT src_h, TOKUTXN txn
// for each live brt, brt->ft is currently src_h
// we want to change it to dummy_dst
toku_ft_grab_reflock(src_h);
while (!toku_list_empty(&src_h->live_ft_handles)) {
list = src_h->live_ft_handles.next;
FT_HANDLE src_handle = NULL;
src_handle = toku_list_struct(list, struct ft_handle, live_ft_handle_link);
toku_ft_lock(src_h);
toku_list_remove(&src_handle->live_ft_handle_link);
toku_ft_unlock(src_h);
toku_ft_note_ft_handle_open(dst_h, src_handle);
if (src_handle->redirect_callback) {
......@@ -677,6 +720,9 @@ dictionary_redirect_internal(const char *dst_fname_in_env, FT src_h, TOKUTXN txn
}
}
assert(dst_h);
// making sure that we are not leaking src_h
assert(toku_ft_needed_unlocked(src_h));
toku_ft_release_reflock(src_h);
r = toku_ft_handle_close(tmp_dst_ft, FALSE, ZERO_LSN);
assert_zero(r);
......@@ -699,23 +745,16 @@ toku_dictionary_redirect_abort(FT old_h, FT new_h, TOKUTXN txn) {
assert(old_filenum.fileid!=new_filenum.fileid); //Cannot be same file.
//No living brts in old header.
toku_ft_grab_reflock(old_h);
assert(toku_list_empty(&old_h->live_ft_handles));
toku_ft_release_reflock(old_h);
}
// If application did not close all DBs using the new file, then there should
// be no zombies and we need to redirect the DBs back to the original file.
if (!toku_list_empty(&new_h->live_ft_handles)) {
FT dst_h;
// redirect back from new_h to old_h
r = dictionary_redirect_internal(old_fname_in_env, new_h, txn, &dst_h);
assert_zero(r);
assert(dst_h == old_h);
}
else {
//No live brts.
//No need to redirect back.
r = 0;
}
FT dst_h;
// redirect back from new_h to old_h
r = dictionary_redirect_internal(old_fname_in_env, new_h, txn, &dst_h);
assert_zero(r);
assert(dst_h == old_h);
return r;
}
......@@ -784,7 +823,6 @@ toku_dictionary_redirect (const char *dst_fname_in_env, FT_HANDLE old_ft, TOKUTX
// make rollback log entry
if (txn) {
assert(!toku_list_empty(&new_h->live_ft_handles));
r = toku_txn_note_ft(txn, new_h); // mark new brt as touched by this txn
FILENUM old_filenum = toku_cachefile_filenum(old_h->cf);
......@@ -817,7 +855,7 @@ toku_ft_maybe_add_txn_ref(FT h, TOKUTXN txn) {
BOOL ref_added = FALSE;
OMTVALUE txnv;
u_int32_t index;
toku_ft_lock(h);
toku_ft_grab_reflock(h);
// Does brt already know about transaction txn?
int r = toku_omt_find_zero(h->txns, find_xid, txn, &txnv, &index);
if (r==0) {
......@@ -832,85 +870,80 @@ toku_ft_maybe_add_txn_ref(FT h, TOKUTXN txn) {
assert(r==0);
ref_added = TRUE;
exit:
toku_ft_unlock(h);
toku_ft_release_reflock(h);
return ref_added;
}
void
toku_ft_remove_txn_ref(FT h, TOKUTXN txn) {
static void
remove_txn_ref_callback(FT ft, void *context) {
TOKUTXN txn = context;
OMTVALUE txnv_again=NULL;
u_int32_t index;
toku_ft_lock(h);
int r = toku_omt_find_zero(h->txns, find_xid, txn, &txnv_again, &index);
int r = toku_omt_find_zero(ft->txns, find_xid, txn, &txnv_again, &index);
assert(r==0);
assert(txnv_again == txn);
r = toku_omt_delete_at(h->txns, index);
r = toku_omt_delete_at(ft->txns, index);
assert(r==0);
// TODO: (Zardosht) figure out how to properly do this
// below this unlock, are depending on ydb lock
toku_ft_unlock(h);
if (!toku_ft_needed(h)) {
//Close immediately.
// I have no idea how this error string business works
char *error_string = NULL;
r = toku_remove_ft(h, &error_string, false, ZERO_LSN);
lazy_assert_zero(r);
}
}
void
toku_ft_remove_txn_ref(FT ft, TOKUTXN txn) {
toku_ft_remove_reference(ft, false, ZERO_LSN, remove_txn_ref_callback, txn);
}
void toku_calculate_root_offset_pointer (
FT h,
FT ft,
CACHEKEY* root_key,
u_int32_t *roothash
)
{
*roothash = toku_cachetable_hash(h->cf, h->root_blocknum);
*root_key = h->root_blocknum;
*roothash = toku_cachetable_hash(ft->cf, ft->h->root_blocknum);
*root_key = ft->h->root_blocknum;
}
void toku_ft_set_new_root_blocknum(
FT h,
FT ft,
CACHEKEY new_root_key
)
{
h->root_blocknum = new_root_key;
ft->h->root_blocknum = new_root_key;
}
LSN toku_ft_checkpoint_lsn(FT h) {
return h->checkpoint_lsn;
LSN toku_ft_checkpoint_lsn(FT ft) {
return ft->h->checkpoint_lsn;
}
int toku_ft_set_panic(FT h, int panic, char *panic_string) {
if (h->panic == 0) {
h->panic = panic;
if (h->panic_string) {
toku_free(h->panic_string);
int toku_ft_set_panic(FT ft, int panic, char *panic_string) {
if (ft->panic == 0) {
ft->panic = panic;
if (ft->panic_string) {
toku_free(ft->panic_string);
}
h->panic_string = toku_strdup(panic_string);
ft->panic_string = toku_strdup(panic_string);
}
return 0;
}
void
toku_ft_stat64 (FT h, struct ftstat64_s *s) {
s->fsize = toku_cachefile_size(h->cf);
toku_ft_stat64 (FT ft, struct ftstat64_s *s) {
s->fsize = toku_cachefile_size(ft->cf);
// just use the in memory stats from the header
// prevent appearance of negative numbers for numrows, numbytes
int64_t n = h->in_memory_stats.numrows;
int64_t n = ft->in_memory_stats.numrows;
if (n < 0) {
n = 0;
}
s->nkeys = s->ndata = n;
n = h->in_memory_stats.numbytes;
n = ft->in_memory_stats.numbytes;
if (n < 0) {
n = 0;
}
s->dsize = n;
// 4018
s->create_time_sec = h->time_of_creation;
s->modify_time_sec = h->time_of_last_modification;
s->verify_time_sec = h->time_of_last_verification;
s->create_time_sec = ft->h->time_of_creation;
s->modify_time_sec = ft->h->time_of_last_modification;
s->verify_time_sec = ft->h->time_of_last_verification;
}
// TODO: (Zardosht), once the fdlock has been removed from cachetable, remove
......@@ -963,3 +996,33 @@ toku_ft_decrease_stats(STAT64INFO headerstats, STAT64INFO_S delta) {
(void) __sync_fetch_and_sub(&(headerstats->numrows), delta.numrows);
(void) __sync_fetch_and_sub(&(headerstats->numbytes), delta.numbytes);
}
void
toku_ft_remove_reference(FT ft, bool oplsn_valid, LSN oplsn, remove_ft_ref_callback remove_ref, void *extra) {
toku_ft_grab_reflock(ft);
if (toku_ft_has_one_reference_unlocked(ft)) {
toku_ft_release_reflock(ft);
toku_ft_open_close_lock();
toku_ft_grab_reflock(ft);
remove_ref(ft, extra);
BOOL needed = toku_ft_needed_unlocked(ft);
toku_ft_release_reflock(ft);
if (!needed) {
// close header
char *error_string = NULL;
int r;
r = toku_remove_ft(ft, &error_string, oplsn_valid, oplsn);
assert_zero(r);
assert(error_string == NULL);
}
toku_ft_open_close_unlock();
}
else {
remove_ref(ft, extra);
toku_ft_release_reflock(ft);
}
}
......@@ -22,13 +22,19 @@ void toku_ft_destroy_treelock(FT h);
void toku_ft_grab_treelock(FT h);
void toku_ft_release_treelock(FT h);
void toku_ft_init_reflock(FT ft);
void toku_ft_destroy_reflock(FT ft);
void toku_ft_grab_reflock(FT ft);
void toku_ft_release_reflock(FT ft);
int toku_create_new_ft(FT *ftp, FT_OPTIONS options, CACHEFILE cf, TOKUTXN txn);
void toku_ft_free (FT h);
int toku_read_ft_and_store_in_cachefile (FT_HANDLE brt, CACHEFILE cf, LSN max_acceptable_lsn, FT *header, BOOL* was_open);
void toku_ft_note_ft_handle_open(FT ft, FT_HANDLE live);
int toku_ft_needed(FT h);
int toku_ft_needed_unlocked(FT h);
BOOL toku_ft_has_one_reference_unlocked(FT ft);
int toku_remove_ft (FT h, char **error_string, BOOL oplsn_valid, LSN oplsn) __attribute__ ((warn_unused_result));
FT_HANDLE toku_ft_get_some_existing_ft_handle(FT h);
......@@ -36,14 +42,14 @@ FT_HANDLE toku_ft_get_some_existing_ft_handle(FT h);
void toku_ft_note_hot_begin(FT_HANDLE brt);
void toku_ft_note_hot_complete(FT_HANDLE brt, BOOL success, MSN msn_at_start_of_hot);
void
void
toku_ft_init(
FT h,
BLOCKNUM root_blocknum_on_disk,
LSN checkpoint_lsn,
TXNID root_xid_that_created,
uint32_t target_nodesize,
uint32_t target_basementnodesize,
BLOCKNUM root_blocknum_on_disk,
LSN checkpoint_lsn,
TXNID root_xid_that_created,
uint32_t target_nodesize,
uint32_t target_basementnodesize,
enum toku_compression_method compression_method
);
......@@ -71,5 +77,8 @@ void toku_ft_update_cmp_descriptor(FT h);
void toku_ft_update_stats(STAT64INFO headerstats, STAT64INFO_S delta);
void toku_ft_decrease_stats(STAT64INFO headerstats, STAT64INFO_S delta);
void toku_ft_remove_reference(FT ft,
bool oplsn_valid, LSN oplsn,
remove_ft_ref_callback remove_ref, void *extra);
#endif
......@@ -881,8 +881,8 @@ toku_serialize_ftnode_to (int fd, BLOCKNUM blocknum, FTNODE node, FTNODE_DISK_DA
int r = toku_serialize_ftnode_to_memory(
node,
ndd,
h->basementnodesize,
h->compression_method,
h->h->basementnodesize,
h->h->compression_method,
do_rebalancing,
FALSE, // in_parallel
&n_to_write,
......@@ -1786,7 +1786,7 @@ deserialize_and_upgrade_internal_node(FTNODE node,
// of messages in the buffer.
MSN lowest;
u_int64_t amount = n_in_this_buffer;
lowest.msn = __sync_sub_and_fetch(&bfe->h->highest_unused_msn_for_upgrade.msn, amount);
lowest.msn = __sync_sub_and_fetch(&bfe->h->h->highest_unused_msn_for_upgrade.msn, amount);
if (highest_msn.msn == 0) {
highest_msn.msn = lowest.msn + n_in_this_buffer;
}
......@@ -2035,7 +2035,7 @@ deserialize_and_upgrade_leaf_node(FTNODE node,
// Whatever this is must be less than the MSNs of every message above
// it, so it's ok to take it here.
bn->max_msn_applied = bfe->h->highest_unused_msn_for_upgrade;
bn->max_msn_applied = bfe->h->h->highest_unused_msn_for_upgrade;
bn->stale_ancestor_messages_applied = false;
node->max_msn_applied_to_node_on_disk = bn->max_msn_applied;
......@@ -2625,7 +2625,7 @@ toku_serialize_rollback_log_to (int fd, BLOCKNUM blocknum, ROLLBACK_LOG_NODE log
size_t n_to_write;
char *compressed_buf;
{
int r = toku_serialize_rollback_log_to_memory(log, n_workitems, n_threads, h->compression_method, &n_to_write, &compressed_buf);
int r = toku_serialize_rollback_log_to_memory(log, n_workitems, n_threads, h->h->compression_method, &n_to_write, &compressed_buf);
if (r!=0) return r;
}
......@@ -2949,9 +2949,9 @@ toku_upgrade_subtree_estimates_to_stat64info(int fd, FT h)
FTNODE_DISK_DATA unused_ndd = NULL;
struct ftnode_fetch_extra bfe;
fill_bfe_for_min_read(&bfe, h);
e = deserialize_ftnode_from_fd(fd, h->root_blocknum, 0, &unused_node, &unused_ndd,
&bfe, &h->on_disk_stats);
h->in_memory_stats = h->on_disk_stats;
e = deserialize_ftnode_from_fd(fd, h->h->root_blocknum, 0, &unused_node, &unused_ndd,
&bfe, &h->h->on_disk_stats);
h->in_memory_stats = h->h->on_disk_stats;
if (unused_node) {
toku_ftnode_free(&unused_node);
......
......@@ -85,34 +85,34 @@ dump_descriptor(DESCRIPTOR d) {
static void
dump_header (int f, FT *header, CACHEFILE cf) {
FT h;
FT ft;
int r;
char timestr[26];
r = toku_deserialize_ft_from (f, MAX_LSN, &h);
r = toku_deserialize_ft_from (f, MAX_LSN, &ft);
assert(r==0);
h->cf = cf;
ft->cf = cf;
printf("ft:\n");
printf(" layout_version=%d\n", h->layout_version);
printf(" layout_version_original=%d\n", h->layout_version_original);
printf(" layout_version_read_from_disk=%d\n", h->layout_version_read_from_disk);
printf(" build_id=%d\n", h->build_id);
printf(" build_id_original=%d\n", h->build_id_original);
format_time(h->time_of_creation, timestr);
printf(" time_of_creation= %"PRIu64" %s\n", h->time_of_creation, timestr);
format_time(h->time_of_last_modification, timestr);
printf(" time_of_last_modification=%"PRIu64" %s\n", h->time_of_last_modification, timestr);
printf(" dirty=%d\n", h->dirty);
printf(" checkpoint_count=%" PRId64 "\n", h->checkpoint_count);
printf(" checkpoint_lsn=%" PRId64 "\n", h->checkpoint_lsn.lsn);
printf(" nodesize=%u\n", h->nodesize);
printf(" basementnodesize=%u\n", h->basementnodesize);
printf(" compression_method=%u\n", (unsigned) h->compression_method);
printf(" unnamed_root=%" PRId64 "\n", h->root_blocknum.b);
printf(" flags=%u\n", h->flags);
dump_descriptor(&h->descriptor);
printf(" estimated numrows=%" PRId64 "\n", h->in_memory_stats.numrows);
printf(" estimated numbytes=%" PRId64 "\n", h->in_memory_stats.numbytes);
*header = h;
printf(" layout_version=%d\n", ft->h->layout_version);
printf(" layout_version_original=%d\n", ft->h->layout_version_original);
printf(" layout_version_read_from_disk=%d\n", ft->layout_version_read_from_disk);
printf(" build_id=%d\n", ft->h->build_id);
printf(" build_id_original=%d\n", ft->h->build_id_original);
format_time(ft->h->time_of_creation, timestr);
printf(" time_of_creation= %"PRIu64" %s\n", ft->h->time_of_creation, timestr);
format_time(ft->h->time_of_last_modification, timestr);
printf(" time_of_last_modification=%"PRIu64" %s\n", ft->h->time_of_last_modification, timestr);
printf(" dirty=%d\n", ft->h->dirty);
printf(" checkpoint_count=%" PRId64 "\n", ft->h->checkpoint_count);
printf(" checkpoint_lsn=%" PRId64 "\n", ft->h->checkpoint_lsn.lsn);
printf(" nodesize=%u\n", ft->h->nodesize);
printf(" basementnodesize=%u\n", ft->h->basementnodesize);
printf(" compression_method=%u\n", (unsigned) ft->h->compression_method);
printf(" unnamed_root=%" PRId64 "\n", ft->h->root_blocknum.b);
printf(" flags=%u\n", ft->h->flags);
dump_descriptor(&ft->descriptor);
printf(" estimated numrows=%" PRId64 "\n", ft->in_memory_stats.numrows);
printf(" estimated numbytes=%" PRId64 "\n", ft->in_memory_stats.numbytes);
*header = ft;
}
static int
......@@ -506,14 +506,14 @@ main (int argc, const char *const argv[]) {
const char *n = argv[0];
int f = open(n, O_RDWR + O_BINARY); assert(f>=0);
FT h;
FT ft;
// create a cachefile for the header
int r = toku_create_cachetable(&ct, 1<<25, (LSN){0}, 0);
assert(r == 0);
CACHEFILE cf;
r = toku_cachetable_openfd (&cf, ct, f, n);
assert(r==0);
dump_header(f, &h, cf);
dump_header(f, &ft, cf);
if (interactive) {
while (1) {
printf("ftdump>"); fflush(stdout);
......@@ -530,25 +530,25 @@ main (int argc, const char *const argv[]) {
if (strcmp(fields[0], "help") == 0) {
interactive_help();
} else if (strcmp(fields[0], "header") == 0) {
toku_ft_free(h);
dump_header(f, &h, cf);
toku_ft_free(ft);
dump_header(f, &ft, cf);
} else if (strcmp(fields[0], "block") == 0 && nfields == 2) {
BLOCKNUM blocknum = make_blocknum(getuint64(fields[1]));
dump_block(f, blocknum, h);
dump_block(f, blocknum, ft);
} else if (strcmp(fields[0], "node") == 0 && nfields == 2) {
BLOCKNUM off = make_blocknum(getuint64(fields[1]));
dump_node(f, off, h);
dump_node(f, off, ft);
} else if (strcmp(fields[0], "dumpdata") == 0 && nfields == 2) {
dump_data = strtol(fields[1], NULL, 10);
} else if (strcmp(fields[0], "block_translation") == 0 || strcmp(fields[0], "bx") == 0) {
u_int64_t offset = 0;
if (nfields == 2)
offset = getuint64(fields[1]);
dump_block_translation(h, offset);
dump_block_translation(ft, offset);
} else if (strcmp(fields[0], "fragmentation") == 0) {
dump_fragmentation(f, h);
dump_fragmentation(f, ft);
} else if (strcmp(fields[0], "garbage") == 0) {
dump_garbage_stats(f, h);
dump_garbage_stats(f, ft);
} else if (strcmp(fields[0], "file") == 0 && nfields >= 3) {
u_int64_t offset = getuint64(fields[1]);
u_int64_t size = getuint64(fields[2]);
......@@ -565,18 +565,18 @@ main (int argc, const char *const argv[]) {
}
}
} else if (rootnode) {
dump_node(f, h->root_blocknum, h);
dump_node(f, ft->h->root_blocknum, ft);
} else {
printf("Block translation:");
toku_dump_translation_table(stdout, h->blocktable);
toku_dump_translation_table(stdout, ft->blocktable);
struct __dump_node_extra info;
info.f = f;
info.h = h;
toku_blocktable_iterate(h->blocktable, TRANSLATION_CHECKPOINTED,
info.h = ft;
toku_blocktable_iterate(ft->blocktable, TRANSLATION_CHECKPOINTED,
dump_node_wrapper, &info, TRUE, TRUE);
}
toku_ft_free(h);
toku_ft_free(ft);
return 0;
}
......@@ -507,7 +507,7 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp,
#define SET_TO_MY_STRDUP(lval, s) do { char *v = toku_strdup(s); if (!v) { int r = errno; toku_ft_loader_internal_destroy(bl, TRUE); return r; } lval = v; } while (0)
MY_CALLOC_N(N, bl->root_xids_that_created);
for (int i=0; i<N; i++) if (brts[i]) bl->root_xids_that_created[i]=brts[i]->ft->root_xid_that_created;
for (int i=0; i<N; i++) if (brts[i]) bl->root_xids_that_created[i]=brts[i]->ft->h->root_xid_that_created;
MY_CALLOC_N(N, bl->dbs);
for (int i=0; i<N; i++) if (brts[i]) bl->dbs[i]=dbs[i];
MY_CALLOC_N(N, bl->descriptors);
......@@ -2206,11 +2206,12 @@ static int toku_loader_write_ft_from_q (FTLOADER bl,
if (bl->root_xids_that_created)
root_xid_that_created = bl->root_xids_that_created[which_db];
struct ft h;
toku_ft_init(&h, (BLOCKNUM){0}, bl->load_lsn, root_xid_that_created, target_nodesize, target_basementnodesize, target_compression_method);
// TODO: (Zardosht/Yoni/Leif), do this code properly
struct ft ft;
toku_ft_init(&ft, (BLOCKNUM){0}, bl->load_lsn, root_xid_that_created, target_nodesize, target_basementnodesize, target_compression_method);
struct dbout out;
dbout_init(&out, &h);
dbout_init(&out, &ft);
out.fd = fd;
out.current_off = 8192; // leave 8K reserved at beginning
out.n_translations = 3; // 3 translations reserved at the beginning
......@@ -2333,7 +2334,7 @@ static int toku_loader_write_ft_from_q (FTLOADER bl,
}
if (deltas.numrows || deltas.numbytes) {
toku_ft_update_stats(&h.in_memory_stats, deltas);
toku_ft_update_stats(&ft.in_memory_stats, deltas);
}
cleanup_maxkey(&maxkey);
......@@ -2375,7 +2376,7 @@ static int toku_loader_write_ft_from_q (FTLOADER bl,
{
invariant(sts.n_subtrees==1);
out.h->root_blocknum = make_blocknum(sts.subtrees[0].block);
out.h->h->root_blocknum = make_blocknum(sts.subtrees[0].block);
toku_free(sts.subtrees); sts.subtrees = NULL;
// write the descriptor
......@@ -2766,16 +2767,15 @@ static int write_translation_table (struct dbout *out, long long *off_of_transla
static int
write_header (struct dbout *out, long long translation_location_on_disk, long long translation_size_on_disk) {
int result = 0;
out->h->checkpoint_staging_stats = out->h->in_memory_stats; // #4184
unsigned int size = toku_serialize_ft_size (out->h);
unsigned int size = toku_serialize_ft_size (out->h->h);
struct wbuf wbuf;
char *MALLOC_N(size, buf);
if (buf == NULL) {
result = errno;
} else {
wbuf_init(&wbuf, buf, size);
toku_serialize_ft_to_wbuf(&wbuf, out->h, translation_location_on_disk, translation_size_on_disk);
out->h->h->on_disk_stats = out->h->in_memory_stats;
toku_serialize_ft_to_wbuf(&wbuf, out->h->h, translation_location_on_disk, translation_size_on_disk);
if (wbuf.ndone != size)
result = EINVAL;
else
......
......@@ -38,6 +38,7 @@ typedef struct ftnode_leaf_basement_node *BASEMENTNODE;
typedef struct ftnode_nonleaf_childinfo *NONLEAF_CHILDINFO;
typedef struct sub_block *SUB_BLOCK;
typedef struct ft *FT;
typedef struct ft_header *FT_HEADER;
typedef struct ft_options *FT_OPTIONS;
struct wbuf;
struct dbuf;
......@@ -252,6 +253,7 @@ typedef int (*ft_compare_func)(DB *, const DBT *, const DBT *);
typedef void (*setval_func)(const DBT *, void *);
typedef int (*ft_update_func)(DB *, const DBT *, const DBT *, const DBT *, setval_func, void *);
typedef void (*on_redirect_callback)(FT_HANDLE, void*);
typedef void (*remove_ft_ref_callback)(FT, void*);
#define UU(x) x __attribute__((__unused__))
......
......@@ -197,7 +197,7 @@ toku_logger_open_rollback(TOKULOGGER logger, CACHETABLE cachetable, BOOL create)
//Verify it is empty
assert(!t->ft->panic);
//Must have no data blocks (rollback logs or otherwise).
toku_block_verify_no_data_blocks_except_root_unlocked(t->ft->blocktable, t->ft->root_blocknum);
toku_block_verify_no_data_blocks_except_root_unlocked(t->ft->blocktable, t->ft->h->root_blocknum);
BOOL is_empty;
is_empty = toku_ft_is_empty_fast(t);
assert(is_empty);
......@@ -216,26 +216,26 @@ toku_logger_close_rollback(TOKULOGGER logger, BOOL recovery_failed) {
if (!logger->is_panicked && cf) {
FT_HANDLE ft_to_close;
{ //Find "brt"
FT h = toku_cachefile_get_userdata(cf);
if (!h->panic && recovery_failed) {
r = toku_ft_set_panic(h, EINVAL, "Recovery failed");
FT ft = toku_cachefile_get_userdata(cf);
if (!ft->panic && recovery_failed) {
r = toku_ft_set_panic(ft, EINVAL, "Recovery failed");
assert_zero(r);
}
//Verify it is safe to close it.
if (!h->panic) { //If paniced, it is safe to close.
assert(!h->dirty); //Must not be dirty.
if (!ft->panic) { //If paniced, it is safe to close.
assert(!ft->h->dirty); //Must not be dirty.
//Must have no data blocks (rollback logs or otherwise).
toku_block_verify_no_data_blocks_except_root_unlocked(h->blocktable, h->root_blocknum);
toku_block_verify_no_data_blocks_except_root_unlocked(ft->blocktable, ft->h->root_blocknum);
}
assert(!h->dirty);
ft_to_close = toku_ft_get_some_existing_ft_handle(h);
assert(!ft->h->dirty);
ft_to_close = toku_ft_get_some_existing_ft_handle(ft);
assert(ft_to_close);
{
BOOL is_empty;
is_empty = toku_ft_is_empty_fast(ft_to_close);
assert(is_empty);
}
assert(!h->dirty); // it should not have been dirtied by the toku_ft_is_empty test.
assert(!ft->h->dirty); // it should not have been dirtied by the toku_ft_is_empty test.
}
r = toku_ft_handle_close(ft_to_close, FALSE, ZERO_LSN);
......
......@@ -330,11 +330,15 @@ test_prefetching(void) {
FT_HANDLE XMALLOC(brt);
FT XCALLOC(brt_h);
toku_ft_init(brt_h,
make_blocknum(0),
ZERO_LSN,
TXNID_NONE,
4*1024*1024,
128*1024,
TOKU_DEFAULT_COMPRESSION_METHOD);
brt->ft = brt_h;
brt_h->type = FT_CURRENT;
brt_h->panic = 0; brt_h->panic_string = 0;
brt_h->basementnodesize = 128*1024;
brt_h->compression_method = TOKU_DEFAULT_COMPRESSION_METHOD;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable);
//Want to use block #20
......
......@@ -273,11 +273,15 @@ test_serialize_nonleaf(void) {
FT_HANDLE XMALLOC(brt);
FT XCALLOC(brt_h);
toku_ft_init(brt_h,
make_blocknum(0),
ZERO_LSN,
TXNID_NONE,
4*1024*1024,
128*1024,
TOKU_DEFAULT_COMPRESSION_METHOD);
brt->ft = brt_h;
brt_h->type = FT_CURRENT;
brt_h->panic = 0; brt_h->panic_string = 0;
brt_h->basementnodesize = 128*1024;
brt_h->compression_method = TOKU_DEFAULT_COMPRESSION_METHOD;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable);
//Want to use block #20
......@@ -359,11 +363,15 @@ test_serialize_leaf(void) {
FT_HANDLE XMALLOC(brt);
FT XCALLOC(brt_h);
toku_ft_init(brt_h,
make_blocknum(0),
ZERO_LSN,
TXNID_NONE,
4*1024*1024,
128*1024,
TOKU_DEFAULT_COMPRESSION_METHOD);
brt->ft = brt_h;
brt_h->type = FT_CURRENT;
brt_h->panic = 0; brt_h->panic_string = 0;
brt_h->basementnodesize = 128*1024;
brt_h->compression_method = TOKU_DEFAULT_COMPRESSION_METHOD;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable);
//Want to use block #20
......
......@@ -104,11 +104,15 @@ test_serialize_leaf(int valsize, int nelts, double entropy) {
FT_HANDLE XMALLOC(brt);
FT XCALLOC(brt_h);
toku_ft_init(brt_h,
make_blocknum(0),
ZERO_LSN,
TXNID_NONE,
4*1024*1024,
128*1024,
TOKU_DEFAULT_COMPRESSION_METHOD);
brt->ft = brt_h;
brt_h->type = FT_CURRENT;
brt_h->panic = 0; brt_h->panic_string = 0;
brt_h->basementnodesize = 128*1024;
brt_h->compression_method = TOKU_DEFAULT_COMPRESSION_METHOD;
brt_h->compare_fun = long_key_cmp;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable);
......@@ -237,11 +241,15 @@ test_serialize_nonleaf(int valsize, int nelts, double entropy) {
FT_HANDLE XMALLOC(brt);
FT XCALLOC(brt_h);
toku_ft_init(brt_h,
make_blocknum(0),
ZERO_LSN,
TXNID_NONE,
4*1024*1024,
128*1024,
TOKU_DEFAULT_COMPRESSION_METHOD);
brt->ft = brt_h;
brt_h->type = FT_CURRENT;
brt_h->panic = 0; brt_h->panic_string = 0;
brt_h->basementnodesize = 128*1024;
brt_h->compression_method = TOKU_DEFAULT_COMPRESSION_METHOD;
brt_h->compare_fun = long_key_cmp;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable);
......
......@@ -250,11 +250,15 @@ test_serialize_leaf_check_msn(enum ftnode_verify_type bft, BOOL do_clone) {
FT_HANDLE XMALLOC(brt);
FT XCALLOC(brt_h);
toku_ft_init(brt_h,
make_blocknum(0),
ZERO_LSN,
TXNID_NONE,
4*1024*1024,
128*1024,
TOKU_DEFAULT_COMPRESSION_METHOD);
brt->ft = brt_h;
brt_h->type = FT_CURRENT;
brt_h->panic = 0; brt_h->panic_string = 0;
brt_h->basementnodesize = 128*1024;
brt_h->compression_method = TOKU_DEFAULT_COMPRESSION_METHOD;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable);
//Want to use block #20
......@@ -392,11 +396,15 @@ test_serialize_leaf_with_large_pivots(enum ftnode_verify_type bft, BOOL do_clone
FT_HANDLE XMALLOC(brt);
FT XCALLOC(brt_h);
toku_ft_init(brt_h,
make_blocknum(0),
ZERO_LSN,
TXNID_NONE,
4*1024*1024,
128*1024,
TOKU_DEFAULT_COMPRESSION_METHOD);
brt->ft = brt_h;
brt_h->type = FT_CURRENT;
brt_h->panic = 0; brt_h->panic_string = 0;
brt_h->basementnodesize = 128*1024;
brt_h->compression_method = TOKU_DEFAULT_COMPRESSION_METHOD;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable);
//Want to use block #20
......@@ -531,11 +539,15 @@ test_serialize_leaf_with_many_rows(enum ftnode_verify_type bft, BOOL do_clone) {
FT_HANDLE XMALLOC(brt);
FT XCALLOC(brt_h);
toku_ft_init(brt_h,
make_blocknum(0),
ZERO_LSN,
TXNID_NONE,
4*1024*1024,
128*1024,
TOKU_DEFAULT_COMPRESSION_METHOD);
brt->ft = brt_h;
brt_h->type = FT_CURRENT;
brt_h->panic = 0; brt_h->panic_string = 0;
brt_h->basementnodesize = 128*1024;
brt_h->compression_method = TOKU_DEFAULT_COMPRESSION_METHOD;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable);
//Want to use block #20
......@@ -675,11 +687,15 @@ test_serialize_leaf_with_large_rows(enum ftnode_verify_type bft, BOOL do_clone)
FT_HANDLE XMALLOC(brt);
FT XCALLOC(brt_h);
toku_ft_init(brt_h,
make_blocknum(0),
ZERO_LSN,
TXNID_NONE,
4*1024*1024,
128*1024,
TOKU_DEFAULT_COMPRESSION_METHOD);
brt->ft = brt_h;
brt_h->type = FT_CURRENT;
brt_h->panic = 0; brt_h->panic_string = 0;
brt_h->basementnodesize = 128*1024;
brt_h->compression_method = TOKU_DEFAULT_COMPRESSION_METHOD;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable);
//Want to use block #20
......@@ -835,11 +851,15 @@ test_serialize_leaf_with_empty_basement_nodes(enum ftnode_verify_type bft, BOOL
FT_HANDLE XMALLOC(brt);
FT XCALLOC(brt_h);
toku_ft_init(brt_h,
make_blocknum(0),
ZERO_LSN,
TXNID_NONE,
4*1024*1024,
128*1024,
TOKU_DEFAULT_COMPRESSION_METHOD);
brt->ft = brt_h;
brt_h->type = FT_CURRENT;
brt_h->panic = 0; brt_h->panic_string = 0;
brt_h->basementnodesize = 128*1024;
brt_h->compression_method = TOKU_DEFAULT_COMPRESSION_METHOD;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable);
//Want to use block #20
......@@ -959,11 +979,15 @@ test_serialize_leaf_with_multiple_empty_basement_nodes(enum ftnode_verify_type b
FT_HANDLE XMALLOC(brt);
FT XCALLOC(brt_h);
toku_ft_init(brt_h,
make_blocknum(0),
ZERO_LSN,
TXNID_NONE,
4*1024*1024,
128*1024,
TOKU_DEFAULT_COMPRESSION_METHOD);
brt->ft = brt_h;
brt_h->type = FT_CURRENT;
brt_h->panic = 0; brt_h->panic_string = 0;
brt_h->basementnodesize = 128*1024;
brt_h->compression_method = TOKU_DEFAULT_COMPRESSION_METHOD;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable);
//Want to use block #20
......@@ -1088,11 +1112,15 @@ test_serialize_leaf(enum ftnode_verify_type bft, BOOL do_clone) {
FT_HANDLE XMALLOC(brt);
FT XCALLOC(brt_h);
toku_ft_init(brt_h,
make_blocknum(0),
ZERO_LSN,
TXNID_NONE,
4*1024*1024,
128*1024,
TOKU_DEFAULT_COMPRESSION_METHOD);
brt->ft = brt_h;
brt_h->type = FT_CURRENT;
brt_h->panic = 0; brt_h->panic_string = 0;
brt_h->basementnodesize = 128*1024;
brt_h->compression_method = TOKU_DEFAULT_COMPRESSION_METHOD;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable);
//Want to use block #20
......@@ -1230,11 +1258,15 @@ test_serialize_nonleaf(enum ftnode_verify_type bft, BOOL do_clone) {
FT_HANDLE XMALLOC(brt);
FT XCALLOC(brt_h);
toku_ft_init(brt_h,
make_blocknum(0),
ZERO_LSN,
TXNID_NONE,
4*1024*1024,
128*1024,
TOKU_DEFAULT_COMPRESSION_METHOD);
brt->ft = brt_h;
brt_h->type = FT_CURRENT;
brt_h->panic = 0; brt_h->panic_string = 0;
brt_h->basementnodesize = 128*1024;
brt_h->compression_method = TOKU_DEFAULT_COMPRESSION_METHOD;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable);
//Want to use block #20
......
......@@ -25,14 +25,15 @@ static void test_header (void) {
r = toku_open_ft_handle(fname, 1, &t, 1024, 256, TOKU_DEFAULT_COMPRESSION_METHOD, ct, null_txn, toku_builtin_compare_fun);
assert(r==0);
// now insert some info into the header
FT h = t->ft;
h->dirty = 1;
h->layout_version_original = 13;
h->layout_version_read_from_disk = 14;
h->build_id_original = 1234;
h->in_memory_stats = (STAT64INFO_S) {10, 11};
h->on_disk_stats = (STAT64INFO_S) {20, 21};
h->checkpoint_staging_stats = (STAT64INFO_S) {30, 31};
FT ft = t->ft;
ft->h->dirty = 1;
// cast away const because we actually want to fiddle with the header
// in this test
*((int *) &ft->h->layout_version_original) = 13;
ft->layout_version_read_from_disk = 14;
*((uint32_t *) &ft->h->build_id_original) = 1234;
ft->in_memory_stats = (STAT64INFO_S) {10, 11};
ft->h->on_disk_stats = (STAT64INFO_S) {20, 21};
r = toku_close_ft_handle_nolsn(t, 0); assert(r==0);
r = toku_cachetable_close(&ct);
assert(r==0);
......@@ -43,20 +44,17 @@ static void test_header (void) {
r = toku_open_ft_handle(fname, 0, &t, 1024, 256, TOKU_DEFAULT_COMPRESSION_METHOD, ct, null_txn, toku_builtin_compare_fun);
assert(r==0);
h = t->ft;
STAT64INFO_S expected_stats = {20, 21}; // on checkpoint, on_disk_stats copied to checkpoint_staging_stats
assert(h->layout_version == FT_LAYOUT_VERSION);
assert(h->layout_version_original == 13);
assert(h->layout_version_read_from_disk == FT_LAYOUT_VERSION);
assert(h->build_id_original == 1234);
assert(h->in_memory_stats.numrows == expected_stats.numrows);
assert(h->on_disk_stats.numbytes == expected_stats.numbytes);
ft = t->ft;
STAT64INFO_S expected_stats = {20, 21}; // on checkpoint, on_disk_stats copied to ft->checkpoint_header->on_disk_stats
assert(ft->h->layout_version == FT_LAYOUT_VERSION);
assert(ft->h->layout_version_original == 13);
assert(ft->layout_version_read_from_disk == FT_LAYOUT_VERSION);
assert(ft->h->build_id_original == 1234);
assert(ft->in_memory_stats.numrows == expected_stats.numrows);
assert(ft->h->on_disk_stats.numbytes == expected_stats.numbytes);
r = toku_close_ft_handle_nolsn(t, 0); assert(r==0);
r = toku_cachetable_close(&ct);
assert(r==0);
}
int
......
......@@ -658,7 +658,6 @@ static int remove_txn (OMTVALUE hv, u_int32_t UU(idx), void *txnv)
if (txn->txnid64==h->txnid_that_created_or_locked_when_empty) {
h->txnid_that_created_or_locked_when_empty = TXNID_NONE;
h->root_that_created_or_locked_when_empty = TXNID_NONE;
}
if (txn->txnid64==h->txnid_that_suppressed_recovery_logs) {
h->txnid_that_suppressed_recovery_logs = TXNID_NONE;
......
......@@ -500,7 +500,8 @@ toku_db_change_descriptor(DB *db, DB_TXN* txn, const DBT* descriptor, u_int32_t
goto cleanup;
}
if (!is_db_hot_index) {
r = toku_db_pre_acquire_fileops_lock(db, txn);
//TODO(zardosht): why doesn't hot_index need to do locking?
r = toku_db_pre_acquire_table_lock(db, txn);
if (r != 0) { goto cleanup; }
}
......@@ -677,9 +678,9 @@ locked_db_open(DB *db, DB_TXN *txn, const char *fname, const char *dbname, DBTYP
static int
locked_db_change_descriptor(DB *db, DB_TXN* txn, const DBT* descriptor, u_int32_t flags) {
toku_ydb_lock();
toku_multi_operation_client_lock(); //Cannot begin checkpoint
int r = toku_db_change_descriptor(db, txn, descriptor, flags);
toku_ydb_unlock();
toku_multi_operation_client_unlock(); //Can now begin checkpoint
return r;
}
......
......@@ -19,6 +19,13 @@ struct toku_list {
struct toku_list *next, *prev;
};
static inline int toku_list_num_elements_est(struct toku_list *head) {
if (head->next == head) return 0;
if (head->next == head->prev) return 1;
return 2;
}
static inline void toku_list_init(struct toku_list *head) {
head->next = head->prev = head;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment