Commit 1fff2936 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

refs #4937, merge to main

git-svn-id: file:///svn/toku/tokudb@47083 c7de825b-a66e-492c-adef-691d508d4ae1
parent 6d2fce87
...@@ -728,6 +728,13 @@ static void cachetable_partial_eviction(void* extra) { ...@@ -728,6 +728,13 @@ static void cachetable_partial_eviction(void* extra) {
bjm_remove_background_job(cf->bjm); bjm_remove_background_job(cf->bjm);
} }
void toku_cachetable_swap_pair_values(PAIR old_pair, PAIR new_pair) {
void* old_value = old_pair->value_data;
void* new_value = new_pair->value_data;
old_pair->value_data = new_value;
new_pair->value_data = old_value;
}
void toku_cachetable_maybe_flush_some(CACHETABLE ct) { void toku_cachetable_maybe_flush_some(CACHETABLE ct) {
// TODO: <CER> Maybe move this... // TODO: <CER> Maybe move this...
ct->ev.signal_eviction_thread(); ct->ev.signal_eviction_thread();
......
...@@ -208,6 +208,9 @@ void *toku_cachefile_get_userdata(CACHEFILE); ...@@ -208,6 +208,9 @@ void *toku_cachefile_get_userdata(CACHEFILE);
CACHETABLE toku_cachefile_get_cachetable(CACHEFILE cf); CACHETABLE toku_cachefile_get_cachetable(CACHEFILE cf);
// Effect: Get the cachetable. // Effect: Get the cachetable.
void toku_cachetable_swap_pair_values(PAIR old_pair, PAIR new_pair);
// Effect: Swaps the value_data of old_pair and new_pair.
// Requires: both old_pair and new_pair to be pinned with write locks.
typedef enum { typedef enum {
PL_READ = 0, PL_READ = 0,
......
...@@ -141,6 +141,9 @@ flush_some_child( ...@@ -141,6 +141,9 @@ flush_some_child(
bool bool
always_recursively_flush(FTNODE child, void* extra); always_recursively_flush(FTNODE child, void* extra);
bool
never_recursively_flush(FTNODE UU(child), void* UU(extra));
bool bool
dont_destroy_basement_nodes(void* extra); dont_destroy_basement_nodes(void* extra);
......
...@@ -202,6 +202,12 @@ always_recursively_flush(FTNODE UU(child), void* UU(extra)) ...@@ -202,6 +202,12 @@ always_recursively_flush(FTNODE UU(child), void* UU(extra))
return true; return true;
} }
bool
never_recursively_flush(FTNODE UU(child), void* UU(extra))
{
return false;
}
static bool static bool
recurse_if_child_is_gorged(FTNODE child, void* UU(extra)) recurse_if_child_is_gorged(FTNODE child, void* UU(extra))
{ {
...@@ -395,8 +401,6 @@ ct_maybe_merge_child(struct flusher_advice *fa, ...@@ -395,8 +401,6 @@ ct_maybe_merge_child(struct flusher_advice *fa,
FTNODE root_node = NULL; FTNODE root_node = NULL;
{ {
toku_ft_grab_treelock(h);
uint32_t fullhash; uint32_t fullhash;
CACHEKEY root; CACHEKEY root;
toku_calculate_root_offset_pointer(h, &root, &fullhash); toku_calculate_root_offset_pointer(h, &root, &fullhash);
...@@ -404,8 +408,6 @@ ct_maybe_merge_child(struct flusher_advice *fa, ...@@ -404,8 +408,6 @@ ct_maybe_merge_child(struct flusher_advice *fa,
fill_bfe_for_full_read(&bfe, h); fill_bfe_for_full_read(&bfe, h);
toku_pin_ftnode_off_client_thread(h, root, fullhash, &bfe, PL_WRITE_EXPENSIVE, 0, NULL, &root_node); toku_pin_ftnode_off_client_thread(h, root, fullhash, &bfe, PL_WRITE_EXPENSIVE, 0, NULL, &root_node);
toku_assert_entire_node_in_memory(root_node); toku_assert_entire_node_in_memory(root_node);
toku_ft_release_treelock(h);
} }
(void) __sync_fetch_and_add(&STATUS_VALUE(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_STARTED), 1); (void) __sync_fetch_and_add(&STATUS_VALUE(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_STARTED), 1);
...@@ -1649,6 +1651,51 @@ update_cleaner_status( ...@@ -1649,6 +1651,51 @@ update_cleaner_status(
} }
} }
static void
dummy_update_status(
FTNODE UU(child),
int UU(dirtied),
void* UU(extra)
)
{
}
static int
dummy_pick_heaviest_child(FT UU(h),
FTNODE UU(parent),
void* UU(extra))
{
assert(false);
return -1;
}
void toku_ft_split_child(
FT ft,
FTNODE node,
int childnum,
FTNODE child
)
{
struct flusher_advice fa;
flusher_advice_init(
&fa,
dummy_pick_heaviest_child,
dont_destroy_basement_nodes,
never_recursively_flush,
default_merge_child,
dummy_update_status,
default_pick_child_after_split,
NULL
);
ft_split_child(
ft,
node,
childnum, // childnum to split
child,
&fa
);
}
int int
toku_ftnode_cleaner_callback( toku_ftnode_cleaner_callback(
void *ftnode_pv, void *ftnode_pv,
......
...@@ -266,8 +266,6 @@ toku_ft_hot_optimize(FT_HANDLE brt, ...@@ -266,8 +266,6 @@ toku_ft_hot_optimize(FT_HANDLE brt,
uint32_t fullhash; uint32_t fullhash;
{ {
toku_ft_grab_treelock(brt->ft);
// Get root node (the first parent of each successive HOT // Get root node (the first parent of each successive HOT
// call.) // call.)
toku_calculate_root_offset_pointer(brt->ft, &root_key, &fullhash); toku_calculate_root_offset_pointer(brt->ft, &root_key, &fullhash);
...@@ -282,8 +280,6 @@ toku_ft_hot_optimize(FT_HANDLE brt, ...@@ -282,8 +280,6 @@ toku_ft_hot_optimize(FT_HANDLE brt,
NULL, NULL,
&root); &root);
toku_assert_entire_node_in_memory(root); toku_assert_entire_node_in_memory(root);
toku_ft_release_treelock(brt->ft);
} }
// Prepare HOT diagnostics. // Prepare HOT diagnostics.
......
...@@ -383,7 +383,7 @@ struct ft_header { ...@@ -383,7 +383,7 @@ struct ft_header {
// last time that this tree was verified // last time that this tree was verified
uint64_t time_of_last_verification; uint64_t time_of_last_verification;
// this field is protected by tree_lock, see comment for tree_lock // this field is essentially a const
BLOCKNUM root_blocknum; BLOCKNUM root_blocknum;
const unsigned int flags; const unsigned int flags;
...@@ -434,12 +434,6 @@ struct ft { ...@@ -434,12 +434,6 @@ struct ft {
// These are not read-only: // These are not read-only:
// lock used by a thread to pin the root node to start a descent into
// the tree. This lock protects the blocknum of the root node (root_blocknum). Any
// thread that wants to descend down the tree starting at the root
// must grab this lock before pinning the root.
toku_mutex_t tree_lock;
// protected by blocktable lock // protected by blocktable lock
BLOCK_TABLE blocktable; BLOCK_TABLE blocktable;
...@@ -656,6 +650,14 @@ extern bool toku_ftnode_pf_req_callback(void* ftnode_pv, void* read_extraargs); ...@@ -656,6 +650,14 @@ extern bool toku_ftnode_pf_req_callback(void* ftnode_pv, void* read_extraargs);
int toku_ftnode_pf_callback(void* ftnode_pv, void* UU(disk_data), void* read_extraargs, int fd, PAIR_ATTR* sizep); int toku_ftnode_pf_callback(void* ftnode_pv, void* UU(disk_data), void* read_extraargs, int fd, PAIR_ATTR* sizep);
extern int toku_ftnode_cleaner_callback( void *ftnode_pv, BLOCKNUM blocknum, uint32_t fullhash, void *extraargs); extern int toku_ftnode_cleaner_callback( void *ftnode_pv, BLOCKNUM blocknum, uint32_t fullhash, void *extraargs);
// Given pinned node and pinned child, split child into two
// and update node with information about its new child.
void toku_ft_split_child(
FT h,
FTNODE node,
int childnum,
FTNODE child
);
static inline CACHETABLE_WRITE_CALLBACK get_write_callbacks_for_node(FT h) { static inline CACHETABLE_WRITE_CALLBACK get_write_callbacks_for_node(FT h) {
CACHETABLE_WRITE_CALLBACK wc; CACHETABLE_WRITE_CALLBACK wc;
wc.flush_callback = toku_ftnode_flush_callback; wc.flush_callback = toku_ftnode_flush_callback;
......
...@@ -1278,49 +1278,82 @@ toku_initialize_empty_ftnode (FTNODE n, BLOCKNUM nodename, int height, int num_c ...@@ -1278,49 +1278,82 @@ toku_initialize_empty_ftnode (FTNODE n, BLOCKNUM nodename, int height, int num_c
} }
static void static void
ft_init_new_root(FT ft, FTNODE nodea, FTNODE nodeb, DBT splitk, CACHEKEY *rootp, FTNODE *newrootp) ft_init_new_root(FT ft, FTNODE oldroot, FTNODE *newrootp)
// Effect: Create a new root node whose two children are NODEA and NODEB, and the pivotkey is SPLITK. // Effect: Create a new root node whose two children are the split of oldroot.
// Store the new root's identity in *ROOTP, and the node in *NEWROOTP. // oldroot is unpinned in the process.
// Unpin nodea and nodeb.
// Leave the new root pinned. // Leave the new root pinned.
{ {
FTNODE XMALLOC(newroot); FTNODE newroot;
int new_height = nodea->height+1;
BLOCKNUM newroot_diskoff; BLOCKNUM old_blocknum = oldroot->thisnodename;
toku_allocate_blocknum(ft->blocktable, &newroot_diskoff, ft); uint32_t old_fullhash = oldroot->fullhash;
PAIR old_pair = oldroot->ct_pair;
int new_height = oldroot->height+1;
uint32_t new_fullhash;
BLOCKNUM new_blocknum;
PAIR new_pair = NULL;
cachetable_put_empty_node_with_dep_nodes(
ft,
1,
&oldroot,
&new_blocknum,
&new_fullhash,
&newroot
);
new_pair = newroot->ct_pair;
assert(newroot); assert(newroot);
*rootp=newroot_diskoff;
assert(new_height > 0); assert(new_height > 0);
toku_initialize_empty_ftnode (newroot, newroot_diskoff, new_height, 2, ft->h->layout_version, ft->h->nodesize, ft->h->flags); toku_initialize_empty_ftnode (
//printf("new_root %lld %d %lld %lld\n", newroot_diskoff, newroot->height, nodea->thisnodename, nodeb->thisnodename); newroot,
//printf("%s:%d Splitkey=%p %s\n", __FILE__, __LINE__, splitkey, splitkey); new_blocknum,
toku_copyref_dbt(&newroot->childkeys[0], splitk); new_height,
newroot->totalchildkeylens=splitk.size; 1,
BP_BLOCKNUM(newroot,0)=nodea->thisnodename; ft->h->layout_version,
BP_BLOCKNUM(newroot,1)=nodeb->thisnodename; ft->h->nodesize,
{ ft->h->flags
MSN msna = nodea->max_msn_applied_to_node_on_disk; );
MSN msnb = nodeb->max_msn_applied_to_node_on_disk; MSN msna = oldroot->max_msn_applied_to_node_on_disk;
invariant(msna.msn == msnb.msn);
newroot->max_msn_applied_to_node_on_disk = msna; newroot->max_msn_applied_to_node_on_disk = msna;
}
BP_STATE(newroot,0) = PT_AVAIL; BP_STATE(newroot,0) = PT_AVAIL;
BP_STATE(newroot,1) = PT_AVAIL;
newroot->dirty = 1; newroot->dirty = 1;
//printf("%s:%d put %lld\n", __FILE__, __LINE__, newroot_diskoff);
uint32_t fullhash = toku_cachetable_hash(ft->cf, newroot_diskoff);
newroot->fullhash = fullhash;
toku_cachetable_put(ft->cf, newroot_diskoff, fullhash, newroot, make_ftnode_pair_attr(newroot), get_write_callbacks_for_node(ft), toku_node_save_ct_pair);
//at this point, newroot is associated with newroot_diskoff, nodea is associated with root_blocknum // now do the "switcheroo"
// make newroot_diskoff point to nodea BP_BLOCKNUM(newroot,0) = new_blocknum;
// make root_blocknum point to newroot newroot->thisnodename = old_blocknum;
// also modify the blocknum and fullhash of nodea and newroot newroot->fullhash = old_fullhash;
// before doing this, assert(nodea->blocknum == ft->root_blocknum) newroot->ct_pair = old_pair;
oldroot->thisnodename = new_blocknum;
oldroot->fullhash = new_fullhash;
oldroot->ct_pair = new_pair;
toku_cachetable_swap_pair_values(old_pair, new_pair);
toku_unpin_ftnode(ft, nodea); toku_ft_split_child(
toku_unpin_ftnode(ft, nodeb); ft,
*newrootp = newroot; newroot,
0, // childnum to split
oldroot
);
// ft_split_child released locks on newroot
// and oldroot, so now we repin and
// return to caller
struct ftnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, ft);
toku_pin_ftnode_off_client_thread(
ft,
old_blocknum,
old_fullhash,
&bfe,
PL_WRITE_EXPENSIVE, // may_modify_node
0,
NULL,
newrootp
);
} }
static void static void
...@@ -2055,35 +2088,21 @@ ft_nonleaf_put_cmd (ft_compare_func compare_fun, DESCRIPTOR desc, FTNODE node, F ...@@ -2055,35 +2088,21 @@ ft_nonleaf_put_cmd (ft_compare_func compare_fun, DESCRIPTOR desc, FTNODE node, F
// return true if root changed, false otherwise // return true if root changed, false otherwise
static bool static void
ft_process_maybe_reactive_root (FT ft, CACHEKEY *rootp, FTNODE *nodep) { ft_process_maybe_reactive_root (FT ft, FTNODE *nodep) {
FTNODE node = *nodep; FTNODE node = *nodep;
toku_assert_entire_node_in_memory(node); toku_assert_entire_node_in_memory(node);
enum reactivity re = get_node_reactivity(node); enum reactivity re = get_node_reactivity(node);
switch (re) { switch (re) {
case RE_STABLE: case RE_STABLE:
return false; return;
case RE_FISSIBLE: case RE_FISSIBLE:
{ {
// The root node should split, so make a new root. ft_init_new_root(ft, node, nodep);
FTNODE nodea,nodeb; return;
DBT splitk;
assert(ft->h->nodesize>=node->nodesize); /* otherwise we might be in trouble because the nodesize shrank. */
//
// This happens on the client thread with the ydb lock, so it is safe to
// not pass in dependent nodes. Although if we wanted to, we could pass
// in just node. That would be correct.
//
if (node->height==0) {
ftleaf_split(ft, node, &nodea, &nodeb, &splitk, true, 0, NULL);
} else {
ft_nonleaf_split(ft, node, &nodea, &nodeb, &splitk, 0, NULL);
}
ft_init_new_root(ft, nodea, nodeb, splitk, rootp, nodep);
return true;
} }
case RE_FUSIBLE: case RE_FUSIBLE:
return false; // Cannot merge anything at the root, so return happy. return; // Cannot merge anything at the root, so return happy.
} }
abort(); // cannot happen abort(); // cannot happen
} }
...@@ -2498,8 +2517,6 @@ toku_ft_root_put_cmd (FT ft, FT_MSG_S * cmd) ...@@ -2498,8 +2517,6 @@ toku_ft_root_put_cmd (FT ft, FT_MSG_S * cmd)
// others // others
// //
{ {
toku_ft_grab_treelock(ft);
uint32_t fullhash; uint32_t fullhash;
toku_calculate_root_offset_pointer(ft, &root_key, &fullhash); toku_calculate_root_offset_pointer(ft, &root_key, &fullhash);
...@@ -2529,13 +2546,7 @@ toku_ft_root_put_cmd (FT ft, FT_MSG_S * cmd) ...@@ -2529,13 +2546,7 @@ toku_ft_root_put_cmd (FT ft, FT_MSG_S * cmd)
ft_verify_flags(ft, node); ft_verify_flags(ft, node);
// first handle a reactive root, then put in the message // first handle a reactive root, then put in the message
CACHEKEY new_root_key; ft_process_maybe_reactive_root(ft, &node);
bool root_changed = ft_process_maybe_reactive_root(ft, &new_root_key, &node);
if (root_changed) {
toku_ft_set_new_root_blocknum(ft, new_root_key);
}
toku_ft_release_treelock(ft);
} }
push_something_at_root(ft, &node, cmd); push_something_at_root(ft, &node, cmd);
// verify that msn of latest message was captured in root node (push_something_at_root() did not release ydb lock) // verify that msn of latest message was captured in root node (push_something_at_root() did not release ydb lock)
...@@ -4822,7 +4833,6 @@ try_again: ...@@ -4822,7 +4833,6 @@ try_again:
); );
FTNODE node = NULL; FTNODE node = NULL;
{ {
toku_ft_grab_treelock(ft);
uint32_t fullhash; uint32_t fullhash;
CACHEKEY root_key; CACHEKEY root_key;
toku_calculate_root_offset_pointer(ft, &root_key, &fullhash); toku_calculate_root_offset_pointer(ft, &root_key, &fullhash);
...@@ -4847,7 +4857,6 @@ try_again: ...@@ -4847,7 +4857,6 @@ try_again:
// end it. // end it.
toku_cachetable_end_batched_pin(ft->cf); toku_cachetable_end_batched_pin(ft->cf);
} }
toku_ft_release_treelock(ft);
} }
uint tree_height = node->height + 1; // How high is the tree? This is the height of the root node plus one (leaf is at height 0). uint tree_height = node->height + 1; // How high is the tree? This is the height of the root node plus one (leaf is at height 0).
...@@ -5397,8 +5406,6 @@ try_again: ...@@ -5397,8 +5406,6 @@ try_again:
uint64_t less = 0, equal = 0, greater = 0; uint64_t less = 0, equal = 0, greater = 0;
FTNODE node = NULL; FTNODE node = NULL;
{ {
toku_ft_grab_treelock(brt->ft);
uint32_t fullhash; uint32_t fullhash;
CACHEKEY root_key; CACHEKEY root_key;
toku_calculate_root_offset_pointer(brt->ft, &root_key, &fullhash); toku_calculate_root_offset_pointer(brt->ft, &root_key, &fullhash);
...@@ -5413,7 +5420,6 @@ try_again: ...@@ -5413,7 +5420,6 @@ try_again:
NULL, NULL,
&node &node
); );
toku_ft_release_treelock(brt->ft);
} }
struct unlock_ftnode_extra unlock_extra = {brt,node,false}; struct unlock_ftnode_extra unlock_extra = {brt,node,false};
...@@ -5532,14 +5538,10 @@ int toku_dump_ft (FILE *f, FT_HANDLE brt) { ...@@ -5532,14 +5538,10 @@ int toku_dump_ft (FILE *f, FT_HANDLE brt) {
assert(brt->ft); assert(brt->ft);
toku_dump_translation_table(f, brt->ft->blocktable); toku_dump_translation_table(f, brt->ft->blocktable);
{ {
toku_ft_grab_treelock(brt->ft);
uint32_t fullhash = 0; uint32_t fullhash = 0;
CACHEKEY root_key; CACHEKEY root_key;
toku_calculate_root_offset_pointer(brt->ft, &root_key, &fullhash); toku_calculate_root_offset_pointer(brt->ft, &root_key, &fullhash);
r = toku_dump_ftnode(f, brt, root_key, 0, 0, 0); r = toku_dump_ftnode(f, brt, root_key, 0, 0, 0);
toku_ft_release_treelock(brt->ft);
} }
return r; return r;
} }
...@@ -5705,10 +5707,7 @@ bool toku_ft_is_empty_fast (FT_HANDLE brt) ...@@ -5705,10 +5707,7 @@ bool toku_ft_is_empty_fast (FT_HANDLE brt)
{ {
uint32_t fullhash; uint32_t fullhash;
FTNODE node; FTNODE node;
//assert(fullhash == toku_cachetable_hash(brt->ft->cf, *rootp));
{ {
toku_ft_grab_treelock(brt->ft);
CACHEKEY root_key; CACHEKEY root_key;
toku_calculate_root_offset_pointer(brt->ft, &root_key, &fullhash); toku_calculate_root_offset_pointer(brt->ft, &root_key, &fullhash);
struct ftnode_fetch_extra bfe; struct ftnode_fetch_extra bfe;
...@@ -5723,8 +5722,6 @@ bool toku_ft_is_empty_fast (FT_HANDLE brt) ...@@ -5723,8 +5722,6 @@ bool toku_ft_is_empty_fast (FT_HANDLE brt)
NULL, NULL,
&node &node
); );
toku_ft_release_treelock(brt->ft);
} }
bool r = is_empty_fast_iter(brt, node); bool r = is_empty_fast_iter(brt, node);
toku_unpin_ftnode(brt->ft, node); toku_unpin_ftnode(brt->ft, node);
......
...@@ -188,7 +188,6 @@ deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ftp, uint32_t version) ...@@ -188,7 +188,6 @@ deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ftp, uint32_t version)
lazy_assert(translation_size_on_disk > 0); lazy_assert(translation_size_on_disk > 0);
// initialize the tree lock // initialize the tree lock
toku_ft_init_treelock(ft);
toku_ft_init_reflock(ft); toku_ft_init_reflock(ft);
//Load translation table //Load translation table
......
...@@ -459,14 +459,10 @@ toku_verify_ft_with_progress (FT_HANDLE brt, int (*progress_callback)(void *extr ...@@ -459,14 +459,10 @@ toku_verify_ft_with_progress (FT_HANDLE brt, int (*progress_callback)(void *extr
assert(brt->ft); assert(brt->ft);
FTNODE root_node = NULL; FTNODE root_node = NULL;
{ {
toku_ft_grab_treelock(brt->ft);
uint32_t root_hash; uint32_t root_hash;
CACHEKEY root_key; CACHEKEY root_key;
toku_calculate_root_offset_pointer(brt->ft, &root_key, &root_hash); toku_calculate_root_offset_pointer(brt->ft, &root_key, &root_hash);
toku_get_node_for_verify(root_key, brt, &root_node); toku_get_node_for_verify(root_key, brt, &root_node);
toku_ft_release_treelock(brt->ft);
} }
int r = toku_verify_ftnode(brt, ZERO_MSN, ZERO_MSN, root_node, -1, NULL, NULL, progress_callback, progress_extra, 1, verbose, keep_on_going); int r = toku_verify_ftnode(brt, ZERO_MSN, ZERO_MSN, root_node, -1, NULL, NULL, progress_callback, progress_extra, 1, verbose, keep_on_going);
if (r == 0) { if (r == 0) {
......
...@@ -39,7 +39,6 @@ ft_destroy(FT ft) { ...@@ -39,7 +39,6 @@ ft_destroy(FT ft) {
toku_blocktable_destroy(&ft->blocktable); toku_blocktable_destroy(&ft->blocktable);
if (ft->descriptor.dbt.data) toku_free(ft->descriptor.dbt.data); if (ft->descriptor.dbt.data) toku_free(ft->descriptor.dbt.data);
if (ft->cmp_descriptor.dbt.data) toku_free(ft->cmp_descriptor.dbt.data); if (ft->cmp_descriptor.dbt.data) toku_free(ft->cmp_descriptor.dbt.data);
toku_ft_destroy_treelock(ft);
toku_ft_destroy_reflock(ft); toku_ft_destroy_reflock(ft);
toku_free(ft->h); toku_free(ft->h);
} }
...@@ -69,26 +68,6 @@ toku_ft_free (FT ft) { ...@@ -69,26 +68,6 @@ toku_ft_free (FT ft) {
toku_free(ft); toku_free(ft);
} }
void
toku_ft_init_treelock(FT ft) {
toku_mutex_init(&ft->tree_lock, NULL);
}
void
toku_ft_destroy_treelock(FT ft) {
toku_mutex_destroy(&ft->tree_lock);
}
void
toku_ft_grab_treelock(FT ft) {
toku_mutex_lock(&ft->tree_lock);
}
void
toku_ft_release_treelock(FT ft) {
toku_mutex_unlock(&ft->tree_lock);
}
void void
toku_ft_init_reflock(FT ft) { toku_ft_init_reflock(FT ft) {
toku_mutex_init(&ft->ft_ref_lock, NULL); toku_mutex_init(&ft->ft_ref_lock, NULL);
...@@ -475,7 +454,6 @@ toku_create_new_ft(FT *ftp, FT_OPTIONS options, CACHEFILE cf, TOKUTXN txn) { ...@@ -475,7 +454,6 @@ toku_create_new_ft(FT *ftp, FT_OPTIONS options, CACHEFILE cf, TOKUTXN txn) {
ft->h = ft_header_new(options, make_blocknum(0), (txn ? txn->ancestor_txnid64 : TXNID_NONE)); ft->h = ft_header_new(options, make_blocknum(0), (txn ? txn->ancestor_txnid64 : TXNID_NONE));
toku_ft_init_treelock(ft);
toku_ft_init_reflock(ft); toku_ft_init_reflock(ft);
toku_blocktable_create_new(&ft->blocktable); toku_blocktable_create_new(&ft->blocktable);
//Assign blocknum for root block, also dirty the header //Assign blocknum for root block, also dirty the header
......
...@@ -23,11 +23,6 @@ int toku_ft_unlink_on_commit(FT_HANDLE handle, TOKUTXN txn) __attribute__((__war ...@@ -23,11 +23,6 @@ int toku_ft_unlink_on_commit(FT_HANDLE handle, TOKUTXN txn) __attribute__((__war
//Effect: suppresses rollback logs //Effect: suppresses rollback logs
void toku_ft_suppress_rollbacks(FT h, TOKUTXN txn); void toku_ft_suppress_rollbacks(FT h, TOKUTXN txn);
void toku_ft_init_treelock(FT h);
void toku_ft_destroy_treelock(FT h);
void toku_ft_grab_treelock(FT h);
void toku_ft_release_treelock(FT h);
void toku_ft_init_reflock(FT ft); void toku_ft_init_reflock(FT ft);
void toku_ft_destroy_reflock(FT ft); void toku_ft_destroy_reflock(FT ft);
void toku_ft_grab_reflock(FT ft); void toku_ft_grab_reflock(FT ft);
......
...@@ -339,7 +339,6 @@ test_prefetching(void) { ...@@ -339,7 +339,6 @@ test_prefetching(void) {
TOKU_DEFAULT_COMPRESSION_METHOD); TOKU_DEFAULT_COMPRESSION_METHOD);
brt->ft = brt_h; brt->ft = brt_h;
brt_h->panic = 0; brt_h->panic_string = 0; brt_h->panic = 0; brt_h->panic_string = 0;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable); toku_blocktable_create_new(&brt_h->blocktable);
{ int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); } { int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); }
//Want to use block #20 //Want to use block #20
...@@ -375,7 +374,6 @@ test_prefetching(void) { ...@@ -375,7 +374,6 @@ test_prefetching(void) {
toku_free(sn.childkeys); toku_free(sn.childkeys);
toku_block_free(brt_h->blocktable, BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE); toku_block_free(brt_h->blocktable, BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE);
toku_ft_destroy_treelock(brt_h);
toku_blocktable_destroy(&brt_h->blocktable); toku_blocktable_destroy(&brt_h->blocktable);
toku_free(brt_h->h); toku_free(brt_h->h);
toku_free(brt_h); toku_free(brt_h);
......
...@@ -284,7 +284,6 @@ test_serialize_nonleaf(void) { ...@@ -284,7 +284,6 @@ test_serialize_nonleaf(void) {
TOKU_DEFAULT_COMPRESSION_METHOD); TOKU_DEFAULT_COMPRESSION_METHOD);
brt->ft = brt_h; brt->ft = brt_h;
brt_h->panic = 0; brt_h->panic_string = 0; brt_h->panic = 0; brt_h->panic_string = 0;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable); toku_blocktable_create_new(&brt_h->blocktable);
{ int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); } { int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); }
//Want to use block #20 //Want to use block #20
...@@ -319,7 +318,6 @@ test_serialize_nonleaf(void) { ...@@ -319,7 +318,6 @@ test_serialize_nonleaf(void) {
toku_free(ndd); toku_free(ndd);
toku_block_free(brt_h->blocktable, BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE); toku_block_free(brt_h->blocktable, BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE);
toku_ft_destroy_treelock(brt_h);
toku_blocktable_destroy(&brt_h->blocktable); toku_blocktable_destroy(&brt_h->blocktable);
toku_free(brt_h->h); toku_free(brt_h->h);
toku_free(brt_h); toku_free(brt_h);
...@@ -376,7 +374,6 @@ test_serialize_leaf(void) { ...@@ -376,7 +374,6 @@ test_serialize_leaf(void) {
TOKU_DEFAULT_COMPRESSION_METHOD); TOKU_DEFAULT_COMPRESSION_METHOD);
brt->ft = brt_h; brt->ft = brt_h;
brt_h->panic = 0; brt_h->panic_string = 0; brt_h->panic = 0; brt_h->panic_string = 0;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable); toku_blocktable_create_new(&brt_h->blocktable);
{ int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); } { int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); }
//Want to use block #20 //Want to use block #20
...@@ -418,7 +415,6 @@ test_serialize_leaf(void) { ...@@ -418,7 +415,6 @@ test_serialize_leaf(void) {
toku_free(sn.childkeys); toku_free(sn.childkeys);
toku_block_free(brt_h->blocktable, BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE); toku_block_free(brt_h->blocktable, BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE);
toku_ft_destroy_treelock(brt_h);
toku_blocktable_destroy(&brt_h->blocktable); toku_blocktable_destroy(&brt_h->blocktable);
toku_free(brt_h->h); toku_free(brt_h->h);
toku_free(brt_h); toku_free(brt_h);
......
...@@ -118,7 +118,6 @@ test_serialize_leaf(int valsize, int nelts, double entropy) { ...@@ -118,7 +118,6 @@ test_serialize_leaf(int valsize, int nelts, double entropy) {
brt->ft = brt_h; brt->ft = brt_h;
brt_h->panic = 0; brt_h->panic_string = 0; brt_h->panic = 0; brt_h->panic_string = 0;
brt_h->compare_fun = long_key_cmp; brt_h->compare_fun = long_key_cmp;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable); toku_blocktable_create_new(&brt_h->blocktable);
{ int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); } { int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); }
//Want to use block #20 //Want to use block #20
...@@ -170,7 +169,6 @@ test_serialize_leaf(int valsize, int nelts, double entropy) { ...@@ -170,7 +169,6 @@ test_serialize_leaf(int valsize, int nelts, double entropy) {
toku_block_free(brt_h->blocktable, BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE); toku_block_free(brt_h->blocktable, BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE);
toku_blocktable_destroy(&brt_h->blocktable); toku_blocktable_destroy(&brt_h->blocktable);
toku_ft_destroy_treelock(brt_h);
toku_free(brt_h->h); toku_free(brt_h->h);
toku_free(brt_h); toku_free(brt_h);
toku_free(brt); toku_free(brt);
...@@ -252,7 +250,6 @@ test_serialize_nonleaf(int valsize, int nelts, double entropy) { ...@@ -252,7 +250,6 @@ test_serialize_nonleaf(int valsize, int nelts, double entropy) {
brt->ft = brt_h; brt->ft = brt_h;
brt_h->panic = 0; brt_h->panic_string = 0; brt_h->panic = 0; brt_h->panic_string = 0;
brt_h->compare_fun = long_key_cmp; brt_h->compare_fun = long_key_cmp;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable); toku_blocktable_create_new(&brt_h->blocktable);
{ int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); } { int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); }
//Want to use block #20 //Want to use block #20
...@@ -306,7 +303,6 @@ test_serialize_nonleaf(int valsize, int nelts, double entropy) { ...@@ -306,7 +303,6 @@ test_serialize_nonleaf(int valsize, int nelts, double entropy) {
toku_block_free(brt_h->blocktable, BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE); toku_block_free(brt_h->blocktable, BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE);
toku_blocktable_destroy(&brt_h->blocktable); toku_blocktable_destroy(&brt_h->blocktable);
toku_ft_destroy_treelock(brt_h);
toku_free(brt_h->h); toku_free(brt_h->h);
toku_free(brt_h); toku_free(brt_h);
toku_free(brt); toku_free(brt);
......
...@@ -262,7 +262,6 @@ test_serialize_leaf_check_msn(enum ftnode_verify_type bft, bool do_clone) { ...@@ -262,7 +262,6 @@ test_serialize_leaf_check_msn(enum ftnode_verify_type bft, bool do_clone) {
TOKU_DEFAULT_COMPRESSION_METHOD); TOKU_DEFAULT_COMPRESSION_METHOD);
brt->ft = brt_h; brt->ft = brt_h;
brt_h->panic = 0; brt_h->panic_string = 0; brt_h->panic = 0; brt_h->panic_string = 0;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable); toku_blocktable_create_new(&brt_h->blocktable);
{ int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); } { int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); }
...@@ -345,7 +344,6 @@ test_serialize_leaf_check_msn(enum ftnode_verify_type bft, bool do_clone) { ...@@ -345,7 +344,6 @@ test_serialize_leaf_check_msn(enum ftnode_verify_type bft, bool do_clone) {
toku_block_free(brt_h->blocktable, BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE); toku_block_free(brt_h->blocktable, BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE);
toku_blocktable_destroy(&brt_h->blocktable); toku_blocktable_destroy(&brt_h->blocktable);
toku_ft_destroy_treelock(brt_h);
toku_free(brt_h->h); toku_free(brt_h->h);
toku_free(brt_h); toku_free(brt_h);
toku_free(brt); toku_free(brt);
...@@ -411,7 +409,6 @@ test_serialize_leaf_with_large_pivots(enum ftnode_verify_type bft, bool do_clone ...@@ -411,7 +409,6 @@ test_serialize_leaf_with_large_pivots(enum ftnode_verify_type bft, bool do_clone
TOKU_DEFAULT_COMPRESSION_METHOD); TOKU_DEFAULT_COMPRESSION_METHOD);
brt->ft = brt_h; brt->ft = brt_h;
brt_h->panic = 0; brt_h->panic_string = 0; brt_h->panic = 0; brt_h->panic_string = 0;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable); toku_blocktable_create_new(&brt_h->blocktable);
{ int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); } { int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); }
//Want to use block #20 //Want to use block #20
...@@ -494,7 +491,6 @@ test_serialize_leaf_with_large_pivots(enum ftnode_verify_type bft, bool do_clone ...@@ -494,7 +491,6 @@ test_serialize_leaf_with_large_pivots(enum ftnode_verify_type bft, bool do_clone
toku_block_free(brt_h->blocktable, BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE); toku_block_free(brt_h->blocktable, BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE);
toku_blocktable_destroy(&brt_h->blocktable); toku_blocktable_destroy(&brt_h->blocktable);
toku_ft_destroy_treelock(brt_h);
toku_free(brt_h->h); toku_free(brt_h->h);
toku_free(brt_h); toku_free(brt_h);
toku_free(brt); toku_free(brt);
...@@ -556,7 +552,6 @@ test_serialize_leaf_with_many_rows(enum ftnode_verify_type bft, bool do_clone) { ...@@ -556,7 +552,6 @@ test_serialize_leaf_with_many_rows(enum ftnode_verify_type bft, bool do_clone) {
TOKU_DEFAULT_COMPRESSION_METHOD); TOKU_DEFAULT_COMPRESSION_METHOD);
brt->ft = brt_h; brt->ft = brt_h;
brt_h->panic = 0; brt_h->panic_string = 0; brt_h->panic = 0; brt_h->panic_string = 0;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable); toku_blocktable_create_new(&brt_h->blocktable);
{ int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); } { int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); }
//Want to use block #20 //Want to use block #20
...@@ -637,7 +632,6 @@ test_serialize_leaf_with_many_rows(enum ftnode_verify_type bft, bool do_clone) { ...@@ -637,7 +632,6 @@ test_serialize_leaf_with_many_rows(enum ftnode_verify_type bft, bool do_clone) {
toku_block_free(brt_h->blocktable, BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE); toku_block_free(brt_h->blocktable, BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE);
toku_blocktable_destroy(&brt_h->blocktable); toku_blocktable_destroy(&brt_h->blocktable);
toku_ft_destroy_treelock(brt_h);
toku_free(brt_h->h); toku_free(brt_h->h);
toku_free(brt_h); toku_free(brt_h);
toku_free(brt); toku_free(brt);
...@@ -706,7 +700,6 @@ test_serialize_leaf_with_large_rows(enum ftnode_verify_type bft, bool do_clone) ...@@ -706,7 +700,6 @@ test_serialize_leaf_with_large_rows(enum ftnode_verify_type bft, bool do_clone)
TOKU_DEFAULT_COMPRESSION_METHOD); TOKU_DEFAULT_COMPRESSION_METHOD);
brt->ft = brt_h; brt->ft = brt_h;
brt_h->panic = 0; brt_h->panic_string = 0; brt_h->panic = 0; brt_h->panic_string = 0;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable); toku_blocktable_create_new(&brt_h->blocktable);
{ int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); } { int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); }
//Want to use block #20 //Want to use block #20
...@@ -792,7 +785,6 @@ test_serialize_leaf_with_large_rows(enum ftnode_verify_type bft, bool do_clone) ...@@ -792,7 +785,6 @@ test_serialize_leaf_with_large_rows(enum ftnode_verify_type bft, bool do_clone)
toku_block_free(brt_h->blocktable, BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE); toku_block_free(brt_h->blocktable, BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE);
toku_blocktable_destroy(&brt_h->blocktable); toku_blocktable_destroy(&brt_h->blocktable);
toku_ft_destroy_treelock(brt_h);
toku_free(brt_h->h); toku_free(brt_h->h);
toku_free(brt_h); toku_free(brt_h);
toku_free(brt); toku_free(brt);
...@@ -872,7 +864,6 @@ test_serialize_leaf_with_empty_basement_nodes(enum ftnode_verify_type bft, bool ...@@ -872,7 +864,6 @@ test_serialize_leaf_with_empty_basement_nodes(enum ftnode_verify_type bft, bool
TOKU_DEFAULT_COMPRESSION_METHOD); TOKU_DEFAULT_COMPRESSION_METHOD);
brt->ft = brt_h; brt->ft = brt_h;
brt_h->panic = 0; brt_h->panic_string = 0; brt_h->panic = 0; brt_h->panic_string = 0;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable); toku_blocktable_create_new(&brt_h->blocktable);
{ int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); } { int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); }
//Want to use block #20 //Want to use block #20
...@@ -948,7 +939,6 @@ test_serialize_leaf_with_empty_basement_nodes(enum ftnode_verify_type bft, bool ...@@ -948,7 +939,6 @@ test_serialize_leaf_with_empty_basement_nodes(enum ftnode_verify_type bft, bool
toku_block_free(brt_h->blocktable, BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE); toku_block_free(brt_h->blocktable, BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE);
toku_blocktable_destroy(&brt_h->blocktable); toku_blocktable_destroy(&brt_h->blocktable);
toku_ft_destroy_treelock(brt_h);
toku_free(brt_h->h); toku_free(brt_h->h);
toku_free(brt_h); toku_free(brt_h);
toku_free(brt); toku_free(brt);
...@@ -1002,7 +992,6 @@ test_serialize_leaf_with_multiple_empty_basement_nodes(enum ftnode_verify_type b ...@@ -1002,7 +992,6 @@ test_serialize_leaf_with_multiple_empty_basement_nodes(enum ftnode_verify_type b
TOKU_DEFAULT_COMPRESSION_METHOD); TOKU_DEFAULT_COMPRESSION_METHOD);
brt->ft = brt_h; brt->ft = brt_h;
brt_h->panic = 0; brt_h->panic_string = 0; brt_h->panic = 0; brt_h->panic_string = 0;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable); toku_blocktable_create_new(&brt_h->blocktable);
{ int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); } { int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); }
//Want to use block #20 //Want to use block #20
...@@ -1068,7 +1057,6 @@ test_serialize_leaf_with_multiple_empty_basement_nodes(enum ftnode_verify_type b ...@@ -1068,7 +1057,6 @@ test_serialize_leaf_with_multiple_empty_basement_nodes(enum ftnode_verify_type b
toku_block_free(brt_h->blocktable, BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE); toku_block_free(brt_h->blocktable, BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE);
toku_blocktable_destroy(&brt_h->blocktable); toku_blocktable_destroy(&brt_h->blocktable);
toku_ft_destroy_treelock(brt_h);
toku_free(brt_h->h); toku_free(brt_h->h);
toku_free(brt_h); toku_free(brt_h);
toku_free(brt); toku_free(brt);
...@@ -1137,7 +1125,6 @@ test_serialize_leaf(enum ftnode_verify_type bft, bool do_clone) { ...@@ -1137,7 +1125,6 @@ test_serialize_leaf(enum ftnode_verify_type bft, bool do_clone) {
TOKU_DEFAULT_COMPRESSION_METHOD); TOKU_DEFAULT_COMPRESSION_METHOD);
brt->ft = brt_h; brt->ft = brt_h;
brt_h->panic = 0; brt_h->panic_string = 0; brt_h->panic = 0; brt_h->panic_string = 0;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable); toku_blocktable_create_new(&brt_h->blocktable);
{ int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); } { int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); }
//Want to use block #20 //Want to use block #20
...@@ -1215,7 +1202,6 @@ test_serialize_leaf(enum ftnode_verify_type bft, bool do_clone) { ...@@ -1215,7 +1202,6 @@ test_serialize_leaf(enum ftnode_verify_type bft, bool do_clone) {
toku_block_free(brt_h->blocktable, BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE); toku_block_free(brt_h->blocktable, BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE);
toku_blocktable_destroy(&brt_h->blocktable); toku_blocktable_destroy(&brt_h->blocktable);
toku_ft_destroy_treelock(brt_h);
toku_free(brt_h->h); toku_free(brt_h->h);
toku_free(brt_h); toku_free(brt_h);
toku_free(brt); toku_free(brt);
...@@ -1285,7 +1271,6 @@ test_serialize_nonleaf(enum ftnode_verify_type bft, bool do_clone) { ...@@ -1285,7 +1271,6 @@ test_serialize_nonleaf(enum ftnode_verify_type bft, bool do_clone) {
TOKU_DEFAULT_COMPRESSION_METHOD); TOKU_DEFAULT_COMPRESSION_METHOD);
brt->ft = brt_h; brt->ft = brt_h;
brt_h->panic = 0; brt_h->panic_string = 0; brt_h->panic = 0; brt_h->panic_string = 0;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable); toku_blocktable_create_new(&brt_h->blocktable);
{ int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); } { int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); }
//Want to use block #20 //Want to use block #20
...@@ -1342,7 +1327,6 @@ test_serialize_nonleaf(enum ftnode_verify_type bft, bool do_clone) { ...@@ -1342,7 +1327,6 @@ test_serialize_nonleaf(enum ftnode_verify_type bft, bool do_clone) {
toku_block_free(brt_h->blocktable, BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE); toku_block_free(brt_h->blocktable, BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE);
toku_blocktable_destroy(&brt_h->blocktable); toku_blocktable_destroy(&brt_h->blocktable);
toku_ft_destroy_treelock(brt_h);
toku_free(brt_h->h); toku_free(brt_h->h);
toku_free(brt_h); toku_free(brt_h);
toku_free(brt); toku_free(brt);
......
...@@ -507,12 +507,7 @@ indexer_ft_delete_committed(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xi ...@@ -507,12 +507,7 @@ indexer_ft_delete_committed(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xi
} else { } else {
result = toku_ydb_check_avail_fs_space(indexer->i->env); result = toku_ydb_check_avail_fs_space(indexer->i->env);
if (result == 0) { if (result == 0) {
// MO lock needed because toku_ft_root_put_cmd must be atomic
// with respect to checkpointing
// comment/question in indexer_ft_delete_provisional applies
toku_multi_operation_client_lock();
result = toku_ft_send_delete(db_struct_i(hotdb)->ft_handle, hotkey, xids); result = toku_ft_send_delete(db_struct_i(hotdb)->ft_handle, hotkey, xids);
toku_multi_operation_client_unlock();
} }
} }
return result; return result;
...@@ -550,12 +545,7 @@ indexer_ft_insert_committed(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT *ho ...@@ -550,12 +545,7 @@ indexer_ft_insert_committed(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT *ho
} else { } else {
result = toku_ydb_check_avail_fs_space(indexer->i->env); result = toku_ydb_check_avail_fs_space(indexer->i->env);
if (result == 0) { if (result == 0) {
// MO lock needed because toku_ft_root_put_cmd must be atomic
// with respect to checkpointing
// comment/question in indexer_ft_delete_provisional applies
toku_multi_operation_client_lock();
result = toku_ft_send_insert(db_struct_i(hotdb)->ft_handle, hotkey, hotval, xids, FT_INSERT); result = toku_ft_send_insert(db_struct_i(hotdb)->ft_handle, hotkey, hotval, xids, FT_INSERT);
toku_multi_operation_client_unlock();
} }
} }
return result; return result;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment