Commit b0e3547a authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Fix #5321. Refs #5295, #5292, #5290. {{{svn merge -r46336:46350 ../tokudb.5321}}}

git-svn-id: file:///svn/toku/tokudb@46351 c7de825b-a66e-492c-adef-691d508d4ae1
parent 20db7232
...@@ -13,19 +13,30 @@ ...@@ -13,19 +13,30 @@
#include "growable_array.h" #include "growable_array.h"
//****************************************************************************** //******************************************************************************
//
// Representation: The representation of a partitioned counter // Representation: The representation of a partitioned counter comprises a
// comprises a sum, called sum_of_dead; an index, called the ckey, // sum, called sum_of_dead; an index, called the ckey, which indexes into a
// which indexes into a thread-local array to find a thread-local // thread-local array to find a thread-local part of the counter; and a
// part of the counter; and a linked list of thread-local parts. // linked list of thread-local parts.
// There is also a linked list, for each thread that has a //
// thread-local part of any counter, of all the thread-local parts of // There is also a linked list, for each thread that has a thread-local part
// all the counters. // of any counter, of all the thread-local parts of all the counters.
// Abstraction function: The sum is represented by the sum of _sum and //
// the sum's of the thread-local parts of the counter. // There is a pthread_key which gives us a hook to clean up thread-local
// Representation invariant: Every thread-local part is in the linked // state when a thread terminates. For each thread-local part of a counter
// list of the thread-local parts of its counter, as well as in the // that the thread has, we add in the thread-local sum into the sum_of_dead.
// linked list of the counters of a the thread. //
// Finally there is a list of all the thread-local arrays so that when we
// destroy the partitioned counter before the threads are done, we can find
// and destroy the thread_local_arrays before destroying the pthread_key.
//
// Abstraction function: The sum is represented by the sum of _sum and the
// sum's of the thread-local parts of the counter.
//
// Representation invariant: Every thread-local part is in the linked list of
// the thread-local parts of its counter, as well as in the linked list of
// the counters of a the thread.
//
//****************************************************************************** //******************************************************************************
//****************************************************************************** //******************************************************************************
...@@ -99,6 +110,10 @@ struct local_counter { ...@@ -99,6 +110,10 @@ struct local_counter {
// Try to get it it into one cache line by aligning it. // Try to get it it into one cache line by aligning it.
static __thread GrowableArray<struct local_counter *> thread_local_array; static __thread GrowableArray<struct local_counter *> thread_local_array;
static __thread bool thread_local_array_inited = false;
DoublyLinkedList<GrowableArray<struct local_counter *> *> all_thread_local_arrays;
__thread LinkedListElement<GrowableArray<struct local_counter *> *> thread_local_ll_elt;
// I want this to be static, but I have to use hidden visibility instead because it's a friend function. // I want this to be static, but I have to use hidden visibility instead because it's a friend function.
static void destroy_thread_local_part_of_partitioned_counters (void *ignore_me); static void destroy_thread_local_part_of_partitioned_counters (void *ignore_me);
...@@ -119,6 +134,8 @@ static void destroy_thread_local_part_of_partitioned_counters (void *ignore_me _ ...@@ -119,6 +134,8 @@ static void destroy_thread_local_part_of_partitioned_counters (void *ignore_me _
owner->ll_counter_head.remove(&lc->ll_in_counter); owner->ll_counter_head.remove(&lc->ll_in_counter);
toku_free(lc); toku_free(lc);
} }
all_thread_local_arrays.remove(&thread_local_ll_elt);
thread_local_array_inited = false;
thread_local_array.deinit(); thread_local_array.deinit();
pc_unlock(); pc_unlock();
} }
...@@ -247,7 +264,12 @@ void increment_partitioned_counter(PARTITIONED_COUNTER pc, uint64_t amount) ...@@ -247,7 +264,12 @@ void increment_partitioned_counter(PARTITIONED_COUNTER pc, uint64_t amount)
struct local_counter *lc = get_thread_local_counter(pc_key, &thread_local_array); struct local_counter *lc = get_thread_local_counter(pc_key, &thread_local_array);
if (lc==NULL) { if (lc==NULL) {
// Set things up so that this thread terminates, the thread-local parts of the counter will be destroyed and merged into their respective counters. // Set things up so that this thread terminates, the thread-local parts of the counter will be destroyed and merged into their respective counters.
pk_setspecific(thread_destructor_key, "dont care"); if (!thread_local_array_inited) {
pk_setspecific(thread_destructor_key, "dont care");
thread_local_array_inited=true;
thread_local_array.init();
all_thread_local_arrays.insert(&thread_local_ll_elt, &thread_local_array);
}
XMALLOC(lc); XMALLOC(lc);
lc->sum = 0; lc->sum = 0;
...@@ -289,12 +311,20 @@ void partitioned_counters_init(void) ...@@ -289,12 +311,20 @@ void partitioned_counters_init(void)
// Effect: Initialize any partitioned counters data structures that must be set up before any partitioned counters run. // Effect: Initialize any partitioned counters data structures that must be set up before any partitioned counters run.
{ {
pk_create(&thread_destructor_key, destroy_thread_local_part_of_partitioned_counters); pk_create(&thread_destructor_key, destroy_thread_local_part_of_partitioned_counters);
all_thread_local_arrays.init();
} }
void partitioned_counters_destroy(void) void partitioned_counters_destroy(void)
// Effect: Destroy any partitioned counters data structures. // Effect: Destroy any partitioned counters data structures.
{ {
pc_lock();
LinkedListElement<GrowableArray<struct local_counter *> *> *a_ll;
while (all_thread_local_arrays.pop(&a_ll)) {
a_ll->get_container()->deinit();
}
pk_delete(thread_destructor_key); pk_delete(thread_destructor_key);
destroy_counters(); destroy_counters();
pc_unlock();
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment