Commit e37c5d33 authored by Barry Perlman's avatar Barry Perlman Committed by Yoni Fogel

[t:2949] Merge from tokudb.2949 with command, executed at sandbox/toku {{{svn...

[t:2949] Merge from tokudb.2949 with command, executed at sandbox/toku {{{svn merge -r39023:HEAD tokudb.2949 tokudb}}}.  Refs #2949.

git-svn-id: file:///svn/toku/tokudb@39376 c7de825b-a66e-492c-adef-691d508d4ae1
parent 717ff666
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
.DEFAULT_GOAL=install .DEFAULT_GOAL=install
TOKUROOT=../ TOKUROOT=../
INCLUDEDIRS=-I. INCLUDEDIRS=-I. -I$(TOKUROOT)/include
include $(TOKUROOT)toku_include/Makefile.include include $(TOKUROOT)toku_include/Makefile.include
OPT_AROPT=-qnoipo #Disable ipo for lib creation even when optimization is on. OPT_AROPT=-qnoipo #Disable ipo for lib creation even when optimization is on.
......
...@@ -2,6 +2,8 @@ ...@@ -2,6 +2,8 @@
#ident "Copyright (c) 2007-2011 Tokutek Inc. All rights reserved." #ident "Copyright (c) 2007-2011 Tokutek Inc. All rights reserved."
#include <toku_portability.h> #include <toku_portability.h>
#include "db.h" // get Toku-specific version of db.h
#include <string.h> #include <string.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
...@@ -16,7 +18,51 @@ static free_fun_t t_free = 0; ...@@ -16,7 +18,51 @@ static free_fun_t t_free = 0;
static realloc_fun_t t_realloc = 0; static realloc_fun_t t_realloc = 0;
static realloc_fun_t t_xrealloc = 0; static realloc_fun_t t_xrealloc = 0;
static MEMORY_STATUS_S status; ///////////////////////////////////////////////////////////////////////////////////
// Engine status
//
// Status is intended for display to humans to help understand system behavior.
// It does not need to be perfectly thread-safe.
static MEMORY_STATUS_S memory_status;
static volatile uint64_t max_in_use; // maximum memory footprint (used - freed), approximate (not worth threadsafety overhead for exact, but worth keeping as volatile)
#define STATUS_INIT(k,t,l) { \
memory_status.status[k].keyname = #k; \
memory_status.status[k].type = t; \
memory_status.status[k].legend = "memory: " l; \
}
static void
status_init(void) {
// Note, this function initializes the keyname, type, and legend fields.
// Value fields are initialized to zero by compiler.
STATUS_INIT(MEMORY_MALLOC_COUNT, UINT64, "number of malloc operations");
STATUS_INIT(MEMORY_FREE_COUNT, UINT64, "number of free operations");
STATUS_INIT(MEMORY_REALLOC_COUNT, UINT64, "number of realloc operations");
STATUS_INIT(MEMORY_MALLOC_FAIL, UINT64, "number of malloc operations that failed");
STATUS_INIT(MEMORY_REALLOC_FAIL, UINT64, "number of realloc operations that failed" );
STATUS_INIT(MEMORY_REQUESTED, UINT64, "number of bytes requested");
STATUS_INIT(MEMORY_USED, UINT64, "number of bytes used (requested + overhead)");
STATUS_INIT(MEMORY_FREED, UINT64, "number of bytes freed");
STATUS_INIT(MEMORY_MAX_IN_USE, UINT64, "estimated maximum memory footprint");
STATUS_INIT(MEMORY_MALLOCATOR_VERSION, CHARSTR, "mallocator version");
STATUS_INIT(MEMORY_MMAP_THRESHOLD, UINT64, "mmap threshold");
memory_status.initialized = 1; // TODO 2949 Make this a bool, set to true
}
#undef STATUS_INIT
#define STATUS_VALUE(x) memory_status.status[x].value.num
void
toku_memory_get_status(MEMORY_STATUS statp) {
if (!memory_status.initialized)
status_init();
STATUS_VALUE(MEMORY_MAX_IN_USE) = max_in_use;
*statp = memory_status;
}
#define STATUS_VERSION_STRING memory_status.status[MEMORY_MALLOCATOR_VERSION].value.str
int int
toku_memory_startup(void) { toku_memory_startup(void) {
...@@ -26,8 +72,8 @@ toku_memory_startup(void) { ...@@ -26,8 +72,8 @@ toku_memory_startup(void) {
size_t mmap_threshold = 64 * 1024; // 64K and larger should be malloced with mmap(). size_t mmap_threshold = 64 * 1024; // 64K and larger should be malloced with mmap().
int success = mallopt(M_MMAP_THRESHOLD, mmap_threshold); int success = mallopt(M_MMAP_THRESHOLD, mmap_threshold);
if (success) { if (success) {
status.mallocator_version = "libc"; STATUS_VERSION_STRING = "libc";
status.mmap_threshold = mmap_threshold; STATUS_VALUE(MEMORY_MMAP_THRESHOLD) = mmap_threshold;
} else } else
result = EINVAL; result = EINVAL;
...@@ -38,14 +84,14 @@ toku_memory_startup(void) { ...@@ -38,14 +84,14 @@ toku_memory_startup(void) {
mallctl_fun_t mallctl_f; mallctl_fun_t mallctl_f;
mallctl_f = (mallctl_fun_t) dlsym(RTLD_DEFAULT, "mallctl"); mallctl_f = (mallctl_fun_t) dlsym(RTLD_DEFAULT, "mallctl");
if (mallctl_f) { // jemalloc is loaded if (mallctl_f) { // jemalloc is loaded
size_t version_length = sizeof status.mallocator_version; size_t version_length = sizeof STATUS_VERSION_STRING;
result = mallctl_f("version", &status.mallocator_version, &version_length, NULL, 0); result = mallctl_f("version", &STATUS_VERSION_STRING, &version_length, NULL, 0);
if (result == 0) { if (result == 0) {
size_t lg_chunk; // log2 of the mmap threshold size_t lg_chunk; // log2 of the mmap threshold
size_t lg_chunk_length = sizeof lg_chunk; size_t lg_chunk_length = sizeof lg_chunk;
result = mallctl_f("opt.lg_chunk", &lg_chunk, &lg_chunk_length, NULL, 0); result = mallctl_f("opt.lg_chunk", &lg_chunk, &lg_chunk_length, NULL, 0);
if (result == 0) if (result == 0)
status.mmap_threshold = 1 << lg_chunk; STATUS_VALUE(MEMORY_MMAP_THRESHOLD) = 1 << lg_chunk;
} }
} }
...@@ -56,11 +102,6 @@ void ...@@ -56,11 +102,6 @@ void
toku_memory_shutdown(void) { toku_memory_shutdown(void) {
} }
void
toku_memory_get_status(MEMORY_STATUS s) {
*s = status;
}
// jemalloc's malloc_usable_size does not work with a NULL pointer, so we implement a version that works // jemalloc's malloc_usable_size does not work with a NULL pointer, so we implement a version that works
static size_t static size_t
my_malloc_usable_size(void *p) { my_malloc_usable_size(void *p) {
...@@ -71,16 +112,16 @@ my_malloc_usable_size(void *p) { ...@@ -71,16 +112,16 @@ my_malloc_usable_size(void *p) {
// It is not worth the overhead to make it completely accurate, but // It is not worth the overhead to make it completely accurate, but
// this logic is intended to guarantee that it increases monotonically. // this logic is intended to guarantee that it increases monotonically.
// Note that status.sum_used and status.sum_freed increase monotonically // Note that status.sum_used and status.sum_freed increase monotonically
// and that status.max_in_use is declared volatile. // and that max_in_use is declared volatile.
static inline void static inline void
set_max(uint64_t sum_used, uint64_t sum_freed) { set_max(uint64_t sum_used, uint64_t sum_freed) {
if (sum_used >= sum_freed) { if (sum_used >= sum_freed) {
uint64_t in_use = sum_used - sum_freed; uint64_t in_use = sum_used - sum_freed;
uint64_t old_max; uint64_t old_max;
do { do {
old_max = status.max_in_use; old_max = max_in_use;
} while (old_max < in_use && } while (old_max < in_use &&
!__sync_bool_compare_and_swap(&status.max_in_use, old_max, in_use)); !__sync_bool_compare_and_swap(&max_in_use, old_max, in_use));
} }
} }
...@@ -92,7 +133,7 @@ toku_memory_footprint(void * p, size_t touched) { ...@@ -92,7 +133,7 @@ toku_memory_footprint(void * p, size_t touched) {
pagesize = sysconf(_SC_PAGESIZE); pagesize = sysconf(_SC_PAGESIZE);
if (p) { if (p) {
size_t usable = my_malloc_usable_size(p); size_t usable = my_malloc_usable_size(p);
if (usable >= status.mmap_threshold) { if (usable >= STATUS_VALUE(MEMORY_MMAP_THRESHOLD)) {
int num_pages = (touched + pagesize) / pagesize; int num_pages = (touched + pagesize) / pagesize;
rval = num_pages * pagesize; rval = num_pages * pagesize;
} }
...@@ -108,12 +149,12 @@ toku_malloc(size_t size) { ...@@ -108,12 +149,12 @@ toku_malloc(size_t size) {
void *p = t_malloc ? t_malloc(size) : os_malloc(size); void *p = t_malloc ? t_malloc(size) : os_malloc(size);
if (p) { if (p) {
size_t used = my_malloc_usable_size(p); size_t used = my_malloc_usable_size(p);
__sync_add_and_fetch(&status.malloc_count, 1); __sync_add_and_fetch(&STATUS_VALUE(MEMORY_MALLOC_COUNT), 1);
__sync_add_and_fetch(&status.requested,size); __sync_add_and_fetch(&STATUS_VALUE(MEMORY_REQUESTED), size);
__sync_add_and_fetch(&status.used, used); __sync_add_and_fetch(&STATUS_VALUE(MEMORY_USED), used);
set_max(status.used, status.freed); set_max(STATUS_VALUE(MEMORY_USED), STATUS_VALUE(MEMORY_FREED));
} else { } else {
__sync_add_and_fetch(&status.malloc_fail, 1); __sync_add_and_fetch(&STATUS_VALUE(MEMORY_MALLOC_FAIL), 1);
} }
return p; return p;
} }
...@@ -132,13 +173,13 @@ toku_realloc(void *p, size_t size) { ...@@ -132,13 +173,13 @@ toku_realloc(void *p, size_t size) {
void *q = t_realloc ? t_realloc(p, size) : os_realloc(p, size); void *q = t_realloc ? t_realloc(p, size) : os_realloc(p, size);
if (q) { if (q) {
size_t used = my_malloc_usable_size(q); size_t used = my_malloc_usable_size(q);
__sync_add_and_fetch(&status.realloc_count, 1); __sync_add_and_fetch(&STATUS_VALUE(MEMORY_REALLOC_COUNT), 1);
__sync_add_and_fetch(&status.requested, size); __sync_add_and_fetch(&STATUS_VALUE(MEMORY_REQUESTED), size);
__sync_add_and_fetch(&status.used, used); __sync_add_and_fetch(&STATUS_VALUE(MEMORY_USED), used);
__sync_add_and_fetch(&status.freed, used_orig); __sync_add_and_fetch(&STATUS_VALUE(MEMORY_FREED), used_orig);
set_max(status.used, status.freed); set_max(STATUS_VALUE(MEMORY_USED), STATUS_VALUE(MEMORY_FREED));
} else { } else {
__sync_add_and_fetch(&status.realloc_fail, 1); __sync_add_and_fetch(&STATUS_VALUE(MEMORY_REALLOC_FAIL), 1);
} }
return q; return q;
} }
...@@ -159,8 +200,8 @@ void ...@@ -159,8 +200,8 @@ void
toku_free(void *p) { toku_free(void *p) {
if (p) { if (p) {
size_t used = my_malloc_usable_size(p); size_t used = my_malloc_usable_size(p);
__sync_add_and_fetch(&status.free_count, 1); __sync_add_and_fetch(&STATUS_VALUE(MEMORY_FREE_COUNT), 1);
__sync_add_and_fetch(&status.freed, used); __sync_add_and_fetch(&STATUS_VALUE(MEMORY_FREED), used);
if (t_free) if (t_free)
t_free(p); t_free(p);
else else
...@@ -179,10 +220,10 @@ toku_xmalloc(size_t size) { ...@@ -179,10 +220,10 @@ toku_xmalloc(size_t size) {
if (p == NULL) // avoid function call in common case if (p == NULL) // avoid function call in common case
resource_assert(p); resource_assert(p);
size_t used = my_malloc_usable_size(p); size_t used = my_malloc_usable_size(p);
__sync_add_and_fetch(&status.malloc_count, 1); __sync_add_and_fetch(&STATUS_VALUE(MEMORY_MALLOC_COUNT), 1);
__sync_add_and_fetch(&status.requested, size); __sync_add_and_fetch(&STATUS_VALUE(MEMORY_REQUESTED), size);
__sync_add_and_fetch(&status.used, used); __sync_add_and_fetch(&STATUS_VALUE(MEMORY_USED), used);
set_max(status.used, status.freed); set_max(STATUS_VALUE(MEMORY_USED), STATUS_VALUE(MEMORY_FREED));
return p; return p;
} }
...@@ -201,11 +242,11 @@ toku_xrealloc(void *v, size_t size) { ...@@ -201,11 +242,11 @@ toku_xrealloc(void *v, size_t size) {
if (p == 0) // avoid function call in common case if (p == 0) // avoid function call in common case
resource_assert(p); resource_assert(p);
size_t used = my_malloc_usable_size(p); size_t used = my_malloc_usable_size(p);
__sync_add_and_fetch(&status.realloc_count, 1); __sync_add_and_fetch(&STATUS_VALUE(MEMORY_REALLOC_COUNT), 1);
__sync_add_and_fetch(&status.requested, size); __sync_add_and_fetch(&STATUS_VALUE(MEMORY_REQUESTED), size);
__sync_add_and_fetch(&status.used, used); __sync_add_and_fetch(&STATUS_VALUE(MEMORY_USED), used);
__sync_add_and_fetch(&status.freed, used_orig); __sync_add_and_fetch(&STATUS_VALUE(MEMORY_FREED), used_orig);
set_max(status.used, status.freed); set_max(STATUS_VALUE(MEMORY_USED), STATUS_VALUE(MEMORY_FREED));
return p; return p;
} }
...@@ -268,5 +309,5 @@ toku_set_func_free(free_fun_t f) { ...@@ -268,5 +309,5 @@ toku_set_func_free(free_fun_t f) {
void __attribute__((constructor)) toku_memory_drd_ignore(void); void __attribute__((constructor)) toku_memory_drd_ignore(void);
void void
toku_memory_drd_ignore(void) { toku_memory_drd_ignore(void) {
DRD_IGNORE_VAR(status); DRD_IGNORE_VAR(memory_status);
} }
...@@ -18,14 +18,18 @@ ...@@ -18,14 +18,18 @@
static void *backtrace_pointers[N_POINTERS]; static void *backtrace_pointers[N_POINTERS];
#endif #endif
static uint64_t engine_status_num_rows = 0;
// Function pointers are zero by default so asserts can be used by brt-layer tests without an environment. // Function pointers are zero by default so asserts can be used by brt-layer tests without an environment.
static int (*toku_maybe_get_engine_status_text_p)(char* buff, int buffsize) = 0; static int (*toku_maybe_get_engine_status_text_p)(char* buff, int buffsize) = 0;
static void (*toku_maybe_set_env_panic_p)(int code, char* msg) = 0; static void (*toku_maybe_set_env_panic_p)(int code, char* msg) = 0;
void toku_assert_set_fpointers(int (*toku_maybe_get_engine_status_text_pointer)(char*, int), void toku_assert_set_fpointers(int (*toku_maybe_get_engine_status_text_pointer)(char*, int),
void (*toku_maybe_set_env_panic_pointer)(int, char*)) { void (*toku_maybe_set_env_panic_pointer)(int, char*),
uint64_t num_rows) {
toku_maybe_get_engine_status_text_p = toku_maybe_get_engine_status_text_pointer; toku_maybe_get_engine_status_text_p = toku_maybe_get_engine_status_text_pointer;
toku_maybe_set_env_panic_p = toku_maybe_set_env_panic_pointer; toku_maybe_set_env_panic_p = toku_maybe_set_env_panic_pointer;
engine_status_num_rows = num_rows;
} }
void (*do_assert_hook)(void) = NULL; void (*do_assert_hook)(void) = NULL;
...@@ -44,10 +48,9 @@ toku_do_backtrace_abort(void) { ...@@ -44,10 +48,9 @@ toku_do_backtrace_abort(void) {
fflush(stderr); fflush(stderr);
if (toku_maybe_get_engine_status_text_p) { if (engine_status_num_rows && toku_maybe_get_engine_status_text_p) {
int buffsize = 1024 * 32; int buffsize = engine_status_num_rows * 128; // assume 128 characters per row (gross overestimate, should be safe)
char buff[buffsize]; char buff[buffsize];
toku_maybe_get_engine_status_text_p(buff, buffsize); toku_maybe_get_engine_status_text_p(buff, buffsize);
fprintf(stderr, "Engine status:\n%s\n", buff); fprintf(stderr, "Engine status:\n%s\n", buff);
} }
......
This diff is collapsed.
/* -*- mode: C; c-basic-offset: 4 -*- */ /* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
#ifndef BRT_FLUSHER #ifndef BRT_FLUSHER
#define BRT_FLUSHER #define BRT_FLUSHER
#ident "$Id$" #ident "$Id$"
...@@ -11,37 +11,43 @@ ...@@ -11,37 +11,43 @@
C_BEGIN C_BEGIN
typedef struct brt_flusher_status { typedef enum {
uint64_t cleaner_total_nodes; // total number of nodes whose buffers are potentially flushed by cleaner thread BRT_FLUSHER_CLEANER_TOTAL_NODES = 0, // total number of nodes whose buffers are potentially flushed by cleaner thread
uint64_t cleaner_h1_nodes; // number of nodes of height one whose message buffers are flushed by cleaner thread BRT_FLUSHER_CLEANER_H1_NODES, // number of nodes of height one whose message buffers are flushed by cleaner thread
uint64_t cleaner_hgt1_nodes; // number of nodes of height > 1 whose message buffers are flushed by cleaner thread BRT_FLUSHER_CLEANER_HGT1_NODES, // number of nodes of height > 1 whose message buffers are flushed by cleaner thread
uint64_t cleaner_empty_nodes; // number of nodes that are selected by cleaner, but whose buffers are empty BRT_FLUSHER_CLEANER_EMPTY_NODES, // number of nodes that are selected by cleaner, but whose buffers are empty
uint64_t cleaner_nodes_dirtied; // number of nodes that are made dirty by the cleaner thread BRT_FLUSHER_CLEANER_NODES_DIRTIED, // number of nodes that are made dirty by the cleaner thread
uint64_t cleaner_max_buffer_size; // max number of bytes in message buffer flushed by cleaner thread BRT_FLUSHER_CLEANER_MAX_BUFFER_SIZE, // max number of bytes in message buffer flushed by cleaner thread
uint64_t cleaner_min_buffer_size; BRT_FLUSHER_CLEANER_MIN_BUFFER_SIZE,
uint64_t cleaner_total_buffer_size; BRT_FLUSHER_CLEANER_TOTAL_BUFFER_SIZE,
uint64_t cleaner_max_buffer_workdone; // max workdone value of any message buffer flushed by cleaner thread BRT_FLUSHER_CLEANER_MAX_BUFFER_WORKDONE, // max workdone value of any message buffer flushed by cleaner thread
uint64_t cleaner_min_buffer_workdone; BRT_FLUSHER_CLEANER_MIN_BUFFER_WORKDONE,
uint64_t cleaner_total_buffer_workdone; BRT_FLUSHER_CLEANER_TOTAL_BUFFER_WORKDONE,
uint64_t cleaner_num_leaf_merges_started; // number of times cleaner thread tries to merge a leaf BRT_FLUSHER_CLEANER_NUM_LEAF_MERGES_STARTED, // number of times cleaner thread tries to merge a leaf
uint64_t cleaner_num_leaf_merges_running; // number of cleaner thread leaf merges in progress BRT_FLUSHER_CLEANER_NUM_LEAF_MERGES_RUNNING, // number of cleaner thread leaf merges in progress
uint64_t cleaner_num_leaf_merges_completed; // number of times cleaner thread successfully merges a leaf BRT_FLUSHER_CLEANER_NUM_LEAF_MERGES_COMPLETED, // number of times cleaner thread successfully merges a leaf
uint64_t cleaner_num_dirtied_for_leaf_merge; // nodes dirtied by the "flush from root" process to merge a leaf node BRT_FLUSHER_CLEANER_NUM_DIRTIED_FOR_LEAF_MERGE, // nodes dirtied by the "flush from root" process to merge a leaf node
uint64_t flush_total; // total number of flushes done by flusher threads or cleaner threads BRT_FLUSHER_FLUSH_TOTAL, // total number of flushes done by flusher threads or cleaner threads
uint64_t flush_in_memory; // number of in memory flushes BRT_FLUSHER_FLUSH_IN_MEMORY, // number of in memory flushes
uint64_t flush_needed_io; // number of flushes that had to read a child (or part) off disk BRT_FLUSHER_FLUSH_NEEDED_IO, // number of flushes that had to read a child (or part) off disk
uint64_t flush_cascades; // number of flushes that triggered another flush in the child BRT_FLUSHER_FLUSH_CASCADES, // number of flushes that triggered another flush in the child
uint64_t flush_cascades_1; // number of flushes that triggered 1 cascading flush BRT_FLUSHER_FLUSH_CASCADES_1, // number of flushes that triggered 1 cascading flush
uint64_t flush_cascades_2; // number of flushes that triggered 2 cascading flushes BRT_FLUSHER_FLUSH_CASCADES_2, // number of flushes that triggered 2 cascading flushes
uint64_t flush_cascades_3; // number of flushes that triggered 3 cascading flushes BRT_FLUSHER_FLUSH_CASCADES_3, // number of flushes that triggered 3 cascading flushes
uint64_t flush_cascades_4; // number of flushes that triggered 4 cascading flushes BRT_FLUSHER_FLUSH_CASCADES_4, // number of flushes that triggered 4 cascading flushes
uint64_t flush_cascades_5; // number of flushes that triggered 5 cascading flushes BRT_FLUSHER_FLUSH_CASCADES_5, // number of flushes that triggered 5 cascading flushes
uint64_t flush_cascades_gt_5; // number of flushes that triggered more than 5 cascading flushes BRT_FLUSHER_FLUSH_CASCADES_GT_5, // number of flushes that triggered more than 5 cascading flushes
uint64_t split_leaf; // number of leaf nodes split BRT_FLUSHER_SPLIT_LEAF, // number of leaf nodes split
uint64_t split_nonleaf; // number of nonleaf nodes split BRT_FLUSHER_SPLIT_NONLEAF, // number of nonleaf nodes split
uint64_t merge_leaf; // number of times leaf nodes are merged BRT_FLUSHER_MERGE_LEAF, // number of times leaf nodes are merged
uint64_t merge_nonleaf; // number of times nonleaf nodes are merged BRT_FLUSHER_MERGE_NONLEAF, // number of times nonleaf nodes are merged
uint64_t balance_leaf; // number of times a leaf node is balanced inside brt BRT_FLUSHER_BALANCE_LEAF, // number of times a leaf node is balanced inside brt
BRT_FLUSHER_STATUS_NUM_ROWS
} brt_flusher_status_entry;
typedef struct {
bool initialized;
TOKU_ENGINE_STATUS_ROW_S status[BRT_FLUSHER_STATUS_NUM_ROWS];
} BRT_FLUSHER_STATUS_S, *BRT_FLUSHER_STATUS; } BRT_FLUSHER_STATUS_S, *BRT_FLUSHER_STATUS;
void toku_brt_flusher_status_init(void) __attribute__((__constructor__)); void toku_brt_flusher_status_init(void) __attribute__((__constructor__));
...@@ -132,11 +138,17 @@ brt_nonleaf_split( ...@@ -132,11 +138,17 @@ brt_nonleaf_split(
************************************************************************ ************************************************************************
*/ */
typedef struct brt_hot_status { typedef enum {
uint64_t num_started; // number of HOT operations that have begun BRT_HOT_NUM_STARTED = 0, // number of HOT operations that have begun
uint64_t num_completed; // number of HOT operations that have successfully completed BRT_HOT_NUM_COMPLETED, // number of HOT operations that have successfully completed
uint64_t num_aborted; // number of HOT operations that have been aborted BRT_HOT_NUM_ABORTED, // number of HOT operations that have been aborted
uint64_t max_root_flush_count; // max number of flushes from root ever required to optimize a tree BRT_HOT_MAX_ROOT_FLUSH_COUNT, // max number of flushes from root ever required to optimize a tree
BRT_HOT_STATUS_NUM_ROWS
} brt_hot_status_entry;
typedef struct {
bool initialized;
TOKU_ENGINE_STATUS_ROW_S status[BRT_HOT_STATUS_NUM_ROWS];
} BRT_HOT_STATUS_S, *BRT_HOT_STATUS; } BRT_HOT_STATUS_S, *BRT_HOT_STATUS;
void toku_brt_hot_status_init(void) __attribute__((__constructor__)); void toku_brt_hot_status_init(void) __attribute__((__constructor__));
......
/* -*- mode: C; c-basic-offset: 4 -*- */ /* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
#ident "$Id$" #ident "$Id$"
#ident "Copyright (c) 2007-2011 Tokutek Inc. All rights reserved." #ident "Copyright (c) 2007-2011 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it." #ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
...@@ -28,16 +28,34 @@ struct hot_flusher_extra { ...@@ -28,16 +28,34 @@ struct hot_flusher_extra {
bool rightmost_leaf_seen; bool rightmost_leaf_seen;
}; };
static BRT_HOT_STATUS_S hot_status; static volatile BRT_HOT_STATUS_S hot_status;
#define STATUS_INIT(k,t,l) { \
hot_status.status[k].keyname = #k; \
hot_status.status[k].type = t; \
hot_status.status[k].legend = "hot: " l; \
}
#define STATUS_VALUE(x) hot_status.status[x].value.num
void void
toku_brt_hot_status_init(void) toku_brt_hot_status_init(void)
{ {
DRD_IGNORE_VAR(hot_status.max_root_flush_count); STATUS_INIT(BRT_HOT_NUM_STARTED, UINT64, "operations ever started");
STATUS_INIT(BRT_HOT_NUM_COMPLETED, UINT64, "operations successfully completed");
STATUS_INIT(BRT_HOT_NUM_ABORTED, UINT64, "operations aborted");
STATUS_INIT(BRT_HOT_MAX_ROOT_FLUSH_COUNT, UINT64, "max number of flushes from root ever required to optimize a tree");
DRD_IGNORE_VAR(STATUS_VALUE(BRT_HOT_MAX_ROOT_FLUSH_COUNT));
hot_status.initialized = true;
} }
#undef STATUS_INIT
void void
toku_brt_hot_get_status(BRT_HOT_STATUS s) { toku_brt_hot_get_status(BRT_HOT_STATUS s) {
if (!hot_status.initialized) {
toku_brt_hot_status_init();
}
*s = hot_status; *s = hot_status;
} }
...@@ -229,7 +247,7 @@ toku_brt_hot_optimize(BRT brt, ...@@ -229,7 +247,7 @@ toku_brt_hot_optimize(BRT brt,
uint64_t loop_count = 0; uint64_t loop_count = 0;
MSN msn_at_start_of_hot = ZERO_MSN; // capture msn from root at MSN msn_at_start_of_hot = ZERO_MSN; // capture msn from root at
// start of HOT operation // start of HOT operation
(void) __sync_fetch_and_add(&hot_status.num_started, 1); (void) __sync_fetch_and_add(&STATUS_VALUE(BRT_HOT_NUM_STARTED), 1);
{ {
toku_cachetable_call_ydb_lock(brt->h->cf); toku_cachetable_call_ydb_lock(brt->h->cf);
...@@ -273,8 +291,8 @@ toku_brt_hot_optimize(BRT brt, ...@@ -273,8 +291,8 @@ toku_brt_hot_optimize(BRT brt,
loop_count++; loop_count++;
if (loop_count > hot_status.max_root_flush_count) { if (loop_count > STATUS_VALUE(BRT_HOT_MAX_ROOT_FLUSH_COUNT)) {
hot_status.max_root_flush_count = loop_count; STATUS_VALUE(BRT_HOT_MAX_ROOT_FLUSH_COUNT) = loop_count;
} }
// Initialize the maximum current key. We need to do this for // Initialize the maximum current key. We need to do this for
...@@ -338,10 +356,12 @@ toku_brt_hot_optimize(BRT brt, ...@@ -338,10 +356,12 @@ toku_brt_hot_optimize(BRT brt,
} }
if (success) { if (success) {
(void) __sync_fetch_and_add(&hot_status.num_completed, 1); (void) __sync_fetch_and_add(&STATUS_VALUE(BRT_HOT_NUM_COMPLETED), 1);
} else { } else {
(void) __sync_fetch_and_add(&hot_status.num_aborted, 1); (void) __sync_fetch_and_add(&STATUS_VALUE(BRT_HOT_NUM_ABORTED), 1);
} }
} }
return r; return r;
} }
#undef STATUS_VALUE
This diff is collapsed.
...@@ -18,12 +18,41 @@ ...@@ -18,12 +18,41 @@
#define cilk_worker_count 1 #define cilk_worker_count 1
#endif #endif
static BRT_UPGRADE_STATUS_S upgrade_status; // accountability, used in backwards_x.c
void
toku_brt_get_upgrade_status (BRT_UPGRADE_STATUS s) { static BRT_UPGRADE_STATUS_S brt_upgrade_status;
*s = upgrade_status;
#define UPGRADE_STATUS_INIT(k,t,l) { \
brt_upgrade_status.status[k].keyname = #k; \
brt_upgrade_status.status[k].type = t; \
brt_upgrade_status.status[k].legend = "brt upgrade: " l; \
}
static void
status_init(void)
{
// Note, this function initializes the keyname, type, and legend fields.
// Value fields are initialized to zero by compiler.
UPGRADE_STATUS_INIT(BRT_UPGRADE_FOOTPRINT, UINT64, "footprint");
UPGRADE_STATUS_INIT(BRT_UPGRADE_HEADER_13, UINT64, "V13 headers");
UPGRADE_STATUS_INIT(BRT_UPGRADE_NONLEAF_13, UINT64, "V13 nonleaf nodes");
UPGRADE_STATUS_INIT(BRT_UPGRADE_LEAF_13, UINT64, "V13 leaf nodes");
UPGRADE_STATUS_INIT(BRT_UPGRADE_OPTIMIZED_FOR_UPGRADE, UINT64, "optimized for upgrade");
brt_upgrade_status.initialized = true;
} }
#undef UPGRADE_STATUS_INIT
#define UPGRADE_STATUS_VALUE(x) brt_upgrade_status.status[x].value.num
void
toku_brt_upgrade_get_status(BRT_UPGRADE_STATUS s) {
if (!brt_upgrade_status.initialized) {
status_init();
}
UPGRADE_STATUS_VALUE(BRT_UPGRADE_FOOTPRINT) = toku_log_upgrade_get_footprint();
*s = brt_upgrade_status;
}
// performance tracing // performance tracing
#define DO_TOKU_TRACE 0 #define DO_TOKU_TRACE 0
...@@ -1764,7 +1793,7 @@ toku_maybe_upgrade_brt(BRT t) { // possibly do some work to complete the version ...@@ -1764,7 +1793,7 @@ toku_maybe_upgrade_brt(BRT t) { // possibly do some work to complete the version
if (r == 0 && upgrade) { if (r == 0 && upgrade) {
r = toku_brt_optimize_for_upgrade(t); r = toku_brt_optimize_for_upgrade(t);
if (r==0) if (r==0)
__sync_fetch_and_add(&upgrade_status.optimized_for_upgrade, 1); __sync_fetch_and_add(&UPGRADE_STATUS_VALUE(BRT_UPGRADE_OPTIMIZED_FOR_UPGRADE), 1);
} }
if (r == 0) { if (r == 0) {
t->h->upgrade_brt_performed = TRUE; // no further upgrade necessary t->h->upgrade_brt_performed = TRUE; // no further upgrade necessary
...@@ -2228,7 +2257,7 @@ deserialize_brtheader_versioned (int fd, struct rbuf *rb, struct brt_header **br ...@@ -2228,7 +2257,7 @@ deserialize_brtheader_versioned (int fd, struct rbuf *rb, struct brt_header **br
h->flags &= ~TOKU_DB_VALCMP_BUILTIN_13; h->flags &= ~TOKU_DB_VALCMP_BUILTIN_13;
} }
h->layout_version++; h->layout_version++;
__sync_fetch_and_add(&upgrade_status.header_13, 1); // how many header nodes upgraded from v13 __sync_fetch_and_add(&UPGRADE_STATUS_VALUE(BRT_UPGRADE_HEADER_13), 1); // how many header nodes upgraded from v13
upgrade++; upgrade++;
//Fall through on purpose //Fall through on purpose
case BRT_LAYOUT_VERSION_14: case BRT_LAYOUT_VERSION_14:
...@@ -2871,3 +2900,4 @@ cleanup: ...@@ -2871,3 +2900,4 @@ cleanup:
} }
#undef UPGRADE_STATUS_VALUE
This diff is collapsed.
...@@ -19,7 +19,6 @@ ...@@ -19,7 +19,6 @@
extern "C" { extern "C" {
#endif #endif
typedef struct brt_status BRT_STATUS_S, *BRT_STATUS;
typedef struct brt *BRT; typedef struct brt *BRT;
typedef struct brtnode *BRTNODE; typedef struct brtnode *BRTNODE;
typedef struct brtnode_leaf_basement_node *BASEMENTNODE; typedef struct brtnode_leaf_basement_node *BASEMENTNODE;
......
...@@ -37,7 +37,15 @@ static void cachetable_partial_reader(WORKITEM); ...@@ -37,7 +37,15 @@ static void cachetable_partial_reader(WORKITEM);
#define WHEN_TRACE_CT(x) ((void)0) #define WHEN_TRACE_CT(x) ((void)0)
#endif #endif
// these should be in the cachetable object, but we make them file-wide so that gdb can get them easily ///////////////////////////////////////////////////////////////////////////////////
// Engine status
//
// Status is intended for display to humans to help understand system behavior.
// It does not need to be perfectly thread-safe.
// These should be in the cachetable object, but we make them file-wide so that gdb can get them easily.
// They were left here after engine status cleanup (#2949, rather than moved into the status struct)
// so they are still easily available to the debugger and to save lots of typing.
static u_int64_t cachetable_lock_taken = 0; static u_int64_t cachetable_lock_taken = 0;
static u_int64_t cachetable_lock_released = 0; static u_int64_t cachetable_lock_released = 0;
static u_int64_t cachetable_hit; static u_int64_t cachetable_hit;
...@@ -54,6 +62,54 @@ static u_int64_t cachetable_maybe_get_and_pin_hits; // how many times has get_a ...@@ -54,6 +62,54 @@ static u_int64_t cachetable_maybe_get_and_pin_hits; // how many times has get_a
static u_int64_t cachetable_evictions; static u_int64_t cachetable_evictions;
static u_int64_t cleaner_executions; // number of times the cleaner thread's loop has executed static u_int64_t cleaner_executions; // number of times the cleaner thread's loop has executed
static CACHETABLE_STATUS_S ct_status;
// Note, toku_cachetable_get_status() is below, after declaration of cachetable.
#define STATUS_INIT(k,t,l) { \
ct_status.status[k].keyname = #k; \
ct_status.status[k].type = t; \
ct_status.status[k].legend = "cachetable: " l; \
}
static void
status_init(void) {
// Note, this function initializes the keyname, type, and legend fields.
// Value fields are initialized to zero by compiler.
STATUS_INIT(CT_LOCK_TAKEN, UINT64, "lock taken");
STATUS_INIT(CT_LOCK_RELEASED, UINT64, "lock released");
STATUS_INIT(CT_HIT, UINT64, "hit");
STATUS_INIT(CT_MISS, UINT64, "miss");
STATUS_INIT(CT_MISSTIME, UINT64, "miss time");
STATUS_INIT(CT_WAITTIME, UINT64, "wait time");
STATUS_INIT(CT_WAIT_READING, UINT64, "wait reading");
STATUS_INIT(CT_WAIT_WRITING, UINT64, "wait writing");
STATUS_INIT(CT_WAIT_CHECKPOINT, UINT64, "wait checkpoint");
STATUS_INIT(CT_PUTS, UINT64, "puts (new nodes created)");
STATUS_INIT(CT_PREFETCHES, UINT64, "prefetches");
STATUS_INIT(CT_MAYBE_GET_AND_PINS, UINT64, "maybe_get_and_pin");
STATUS_INIT(CT_MAYBE_GET_AND_PIN_HITS, UINT64, "maybe_get_and_pin hits");
STATUS_INIT(CT_SIZE_CURRENT, UINT64, "size current");
STATUS_INIT(CT_SIZE_LIMIT, UINT64, "size limit");
STATUS_INIT(CT_SIZE_MAX, UINT64, "size max");
STATUS_INIT(CT_SIZE_WRITING, UINT64, "size writing");
STATUS_INIT(CT_SIZE_NONLEAF, UINT64, "size nonleaf");
STATUS_INIT(CT_SIZE_LEAF, UINT64, "size leaf");
STATUS_INIT(CT_SIZE_ROLLBACK, UINT64, "size rollback");
STATUS_INIT(CT_SIZE_CACHEPRESSURE, UINT64, "size cachepressure");
STATUS_INIT(CT_EVICTIONS, UINT64, "evictions");
STATUS_INIT(CT_CLEANER_EXECUTIONS, UINT64, "cleaner executions");
STATUS_INIT(CT_CLEANER_PERIOD, UINT64, "cleaner period");
STATUS_INIT(CT_CLEANER_ITERATIONS, UINT64, "cleaner iterations");
ct_status.initialized = true;
}
#undef STATUS_INIT
#define STATUS_VALUE(x) ct_status.status[x].value.num
enum ctpair_state { enum ctpair_state {
CTPAIR_IDLE = 1, // in memory CTPAIR_IDLE = 1, // in memory
...@@ -199,6 +255,42 @@ struct cachetable { ...@@ -199,6 +255,42 @@ struct cachetable {
int64_t size_cachepressure; int64_t size_cachepressure;
}; };
void
toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) {
if (!ct_status.initialized)
status_init();
STATUS_VALUE(CT_LOCK_TAKEN) = cachetable_lock_taken;
STATUS_VALUE(CT_LOCK_RELEASED) = cachetable_lock_released;
STATUS_VALUE(CT_HIT) = cachetable_hit;
STATUS_VALUE(CT_MISS) = cachetable_miss;
STATUS_VALUE(CT_MISSTIME) = cachetable_misstime;
STATUS_VALUE(CT_WAITTIME) = cachetable_waittime;
STATUS_VALUE(CT_WAIT_READING) = cachetable_wait_reading;
STATUS_VALUE(CT_WAIT_WRITING) = cachetable_wait_writing;
STATUS_VALUE(CT_WAIT_CHECKPOINT) = cachetable_wait_checkpoint;
STATUS_VALUE(CT_PUTS) = cachetable_puts;
STATUS_VALUE(CT_PREFETCHES) = cachetable_prefetches;
STATUS_VALUE(CT_MAYBE_GET_AND_PINS) = cachetable_maybe_get_and_pins;
STATUS_VALUE(CT_MAYBE_GET_AND_PIN_HITS) = cachetable_maybe_get_and_pin_hits;
STATUS_VALUE(CT_SIZE_CURRENT) = ct->size_current;
STATUS_VALUE(CT_SIZE_LIMIT) = ct->size_limit;
STATUS_VALUE(CT_SIZE_MAX) = ct->size_max;
STATUS_VALUE(CT_SIZE_WRITING) = ct->size_evicting;
STATUS_VALUE(CT_SIZE_NONLEAF) = ct->size_nonleaf;
STATUS_VALUE(CT_SIZE_LEAF) = ct->size_leaf;
STATUS_VALUE(CT_SIZE_ROLLBACK) = ct->size_rollback;
STATUS_VALUE(CT_SIZE_CACHEPRESSURE) = ct->size_cachepressure;
STATUS_VALUE(CT_EVICTIONS) = cachetable_evictions;
STATUS_VALUE(CT_CLEANER_EXECUTIONS) = cleaner_executions;
STATUS_VALUE(CT_CLEANER_PERIOD) = toku_get_cleaner_period_unlocked(ct);
STATUS_VALUE(CT_CLEANER_ITERATIONS) = toku_get_cleaner_iterations_unlocked(ct);
*statp = ct_status;
}
// Code bracketed with {BEGIN_CRITICAL_REGION; ... END_CRITICAL_REGION;} macros // Code bracketed with {BEGIN_CRITICAL_REGION; ... END_CRITICAL_REGION;} macros
// are critical regions in which a checkpoint is not permitted to begin. // are critical regions in which a checkpoint is not permitted to begin.
// Must increment checkpoint_prohibited before testing checkpoint_is_beginning // Must increment checkpoint_prohibited before testing checkpoint_is_beginning
...@@ -3799,32 +3891,6 @@ toku_cachefile_size_in_memory(CACHEFILE cf) ...@@ -3799,32 +3891,6 @@ toku_cachefile_size_in_memory(CACHEFILE cf)
return result; return result;
} }
void toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS s) {
s->lock_taken = cachetable_lock_taken;
s->lock_released = cachetable_lock_released;
s->hit = cachetable_hit;
s->miss = cachetable_miss;
s->misstime = cachetable_misstime;
s->waittime = cachetable_waittime;
s->wait_reading = cachetable_wait_reading;
s->wait_writing = cachetable_wait_writing;
s->wait_checkpoint = cachetable_wait_checkpoint;
s->puts = cachetable_puts;
s->prefetches = cachetable_prefetches;
s->maybe_get_and_pins = cachetable_maybe_get_and_pins;
s->maybe_get_and_pin_hits = cachetable_maybe_get_and_pin_hits;
s->size_current = ct->size_current;
s->size_limit = ct->size_limit;
s->size_max = ct->size_max;
s->size_writing = ct->size_evicting;
s->size_nonleaf = ct->size_nonleaf;
s->size_leaf = ct->size_leaf;
s->size_rollback = ct->size_rollback;
s->size_cachepressure = ct->size_cachepressure;
s->evictions = cachetable_evictions;
s->cleaner_executions = cleaner_executions;
}
char * char *
toku_construct_full_name(int count, ...) { toku_construct_full_name(int count, ...) {
va_list ap; va_list ap;
...@@ -4005,7 +4071,9 @@ void __attribute__((__constructor__)) toku_cachetable_drd_ignore(void); ...@@ -4005,7 +4071,9 @@ void __attribute__((__constructor__)) toku_cachetable_drd_ignore(void);
void void
toku_cachetable_drd_ignore(void) { toku_cachetable_drd_ignore(void) {
// incremented only while lock is held, but read by engine status asynchronously. // incremented only while lock is held, but read by engine status asynchronously.
DRD_IGNORE_VAR(cachetable_lock_taken); DRD_IGNORE_VAR(STATUS_VALUE(CT_LOCK_TAKEN));
DRD_IGNORE_VAR(cachetable_lock_released); DRD_IGNORE_VAR(STATUS_VALUE(CT_LOCK_RELEASED));
DRD_IGNORE_VAR(cachetable_evictions); DRD_IGNORE_VAR(STATUS_VALUE(CT_EVICTIONS));
} }
#undef STATUS_VALUE
...@@ -485,31 +485,38 @@ void toku_cachetable_maybe_flush_some(CACHETABLE ct); ...@@ -485,31 +485,38 @@ void toku_cachetable_maybe_flush_some(CACHETABLE ct);
u_int64_t toku_cachefile_size_in_memory(CACHEFILE cf); u_int64_t toku_cachefile_size_in_memory(CACHEFILE cf);
typedef enum {
typedef struct cachetable_status { CT_LOCK_TAKEN = 0,
u_int64_t lock_taken; CT_LOCK_RELEASED,
u_int64_t lock_released; CT_HIT,
u_int64_t hit; CT_MISS,
u_int64_t miss; CT_MISSTIME, // how many usec spent waiting for disk read because of cache miss
u_int64_t misstime; /* how many usec spent waiting for disk read because of cache miss */ CT_WAITTIME, // how many usec spent waiting for another thread to release cache line
u_int64_t waittime; /* how many usec spent waiting for another thread to release cache line */ CT_WAIT_READING,
u_int64_t wait_reading; CT_WAIT_WRITING,
u_int64_t wait_writing; CT_WAIT_CHECKPOINT, // number of times get_and_pin waits for a node to be written for a checkpoint
u_int64_t wait_checkpoint; // number of times get_and_pin waits for a node to be written for a checkpoint CT_PUTS, // how many times has a newly created node been put into the cachetable?
u_int64_t puts; // how many times has a newly created node been put into the cachetable? CT_PREFETCHES, // how many times has a block been prefetched into the cachetable?
u_int64_t prefetches; // how many times has a block been prefetched into the cachetable? CT_MAYBE_GET_AND_PINS, // how many times has maybe_get_and_pin(_clean) been called?
u_int64_t maybe_get_and_pins; // how many times has maybe_get_and_pin(_clean) been called? CT_MAYBE_GET_AND_PIN_HITS, // how many times has maybe_get_and_pin(_clean) returned with a node?
u_int64_t maybe_get_and_pin_hits; // how many times has maybe_get_and_pin(_clean) returned with a node? CT_SIZE_CURRENT, // the sum of the sizes of the nodes represented in the cachetable
uint64_t size_current; // the sum of the sizes of the nodes represented in the cachetable CT_SIZE_LIMIT, // the limit to the sum of the node sizes
uint64_t size_limit; // the limit to the sum of the node sizes CT_SIZE_MAX, // high water mark of size_current (max value size_current ever had)
uint64_t size_max; // high water mark of size_current (max value size_current ever had) CT_SIZE_WRITING, // the sum of the sizes of the nodes being written
uint64_t size_writing; // the sum of the sizes of the nodes being written CT_SIZE_NONLEAF, // number of bytes in cachetable belonging to nonleaf nodes
uint64_t size_nonleaf; // number of bytes in cachetable belonging to nonleaf nodes CT_SIZE_LEAF, // number of bytes in cachetable belonging to leaf nodes
uint64_t size_leaf; // number of bytes in cachetable belonging to leaf nodes CT_SIZE_ROLLBACK, // number of bytes in cachetable belonging to rollback nodes
uint64_t size_rollback; // number of bytes in cachetable belonging to rollback nodes CT_SIZE_CACHEPRESSURE, // number of bytes causing cache pressure (sum of buffers and workdone counters)
uint64_t size_cachepressure; // number of bytes causing cache pressure (sum of buffers and workdone counters) CT_EVICTIONS,
u_int64_t evictions; CT_CLEANER_EXECUTIONS, // number of times the cleaner thread's loop has executed
u_int64_t cleaner_executions; // number of times the cleaner thread's loop has executed CT_CLEANER_PERIOD,
CT_CLEANER_ITERATIONS, // number of times the cleaner thread runs the cleaner per period
CT_STATUS_NUM_ROWS
} ct_status_entry;
typedef struct {
BOOL initialized;
TOKU_ENGINE_STATUS_ROW_S status[CT_STATUS_NUM_ROWS];
} CACHETABLE_STATUS_S, *CACHETABLE_STATUS; } CACHETABLE_STATUS_S, *CACHETABLE_STATUS;
void toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS s); void toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS s);
......
...@@ -57,7 +57,61 @@ ...@@ -57,7 +57,61 @@
#include "logger.h" #include "logger.h"
#include "checkpoint.h" #include "checkpoint.h"
///////////////////////////////////////////////////////////////////////////////////
// Engine status
//
// Status is intended for display to humans to help understand system behavior.
// It does not need to be perfectly thread-safe.
static CHECKPOINT_STATUS_S cp_status; static CHECKPOINT_STATUS_S cp_status;
#define STATUS_INIT(k,t,l) { \
cp_status.status[k].keyname = #k; \
cp_status.status[k].type = t; \
cp_status.status[k].legend = "checkpoint: " l; \
}
static void
status_init(void) {
// Note, this function initializes the keyname, type, and legend fields.
// Value fields are initialized to zero by compiler.
STATUS_INIT(CP_PERIOD, UINT64, "period");
STATUS_INIT(CP_FOOTPRINT, UINT64, "footprint");
STATUS_INIT(CP_TIME_LAST_CHECKPOINT_BEGIN, UNIXTIME, "last checkpoint began ");
STATUS_INIT(CP_TIME_LAST_CHECKPOINT_BEGIN_COMPLETE, UNIXTIME, "last complete checkpoint began ");
STATUS_INIT(CP_TIME_LAST_CHECKPOINT_END, UNIXTIME, "last complete checkpoint ended");
STATUS_INIT(CP_LAST_LSN, UINT64, "last complete checkpoint LSN");
STATUS_INIT(CP_CHECKPOINT_COUNT, UINT64, "checkpoints taken ");
STATUS_INIT(CP_CHECKPOINT_COUNT_FAIL, UINT64, "checkpoints failed");
STATUS_INIT(CP_WAITERS_NOW, UINT64, "waiters now");
STATUS_INIT(CP_WAITERS_MAX, UINT64, "waiters max");
STATUS_INIT(CP_CLIENT_WAIT_ON_MO, UINT64, "non-checkpoint client wait on mo lock");
STATUS_INIT(CP_CLIENT_WAIT_ON_CS, UINT64, "non-checkpoint client wait on cs lock");
STATUS_INIT(CP_WAIT_SCHED_CS, UINT64, "sched wait on cs lock");
STATUS_INIT(CP_WAIT_CLIENT_CS, UINT64, "client wait on cs lock");
STATUS_INIT(CP_WAIT_TXN_CS, UINT64, "txn wait on cs lock");
STATUS_INIT(CP_WAIT_OTHER_CS, UINT64, "other wait on cs lock");
STATUS_INIT(CP_WAIT_SCHED_MO, UINT64, "sched wait on mo lock");
STATUS_INIT(CP_WAIT_CLIENT_MO, UINT64, "client wait on mo lock");
STATUS_INIT(CP_WAIT_TXN_MO, UINT64, "txn wait on mo lock");
STATUS_INIT(CP_WAIT_OTHER_MO, UINT64, "other wait on mo lock");
cp_status.initialized = true;
}
#undef STATUS_INIT
#define STATUS_VALUE(x) cp_status.status[x].value.num
void
toku_checkpoint_get_status(CACHETABLE ct, CHECKPOINT_STATUS statp) {
if (!cp_status.initialized)
status_init();
STATUS_VALUE(CP_PERIOD) = toku_get_checkpoint_period_unlocked(ct);
*statp = cp_status;
}
static LSN last_completed_checkpoint_lsn; static LSN last_completed_checkpoint_lsn;
static toku_pthread_rwlock_t checkpoint_safe_lock; static toku_pthread_rwlock_t checkpoint_safe_lock;
...@@ -145,7 +199,7 @@ checkpoint_safe_checkpoint_unlock(void) { ...@@ -145,7 +199,7 @@ checkpoint_safe_checkpoint_unlock(void) {
void void
toku_multi_operation_client_lock(void) { toku_multi_operation_client_lock(void) {
if (locked_mo) if (locked_mo)
(void) __sync_fetch_and_add(&cp_status.client_wait_on_mo, 1); (void) __sync_fetch_and_add(&STATUS_VALUE(CP_CLIENT_WAIT_ON_MO), 1);
int r = toku_pthread_rwlock_rdlock(&multi_operation_lock); int r = toku_pthread_rwlock_rdlock(&multi_operation_lock);
assert(r == 0); assert(r == 0);
} }
...@@ -159,7 +213,7 @@ toku_multi_operation_client_unlock(void) { ...@@ -159,7 +213,7 @@ toku_multi_operation_client_unlock(void) {
void void
toku_checkpoint_safe_client_lock(void) { toku_checkpoint_safe_client_lock(void) {
if (locked_cs) if (locked_cs)
(void) __sync_fetch_and_add(&cp_status.client_wait_on_cs, 1); (void) __sync_fetch_and_add(&STATUS_VALUE(CP_CLIENT_WAIT_ON_CS), 1);
int r = toku_pthread_rwlock_rdlock(&checkpoint_safe_lock); int r = toku_pthread_rwlock_rdlock(&checkpoint_safe_lock);
assert(r == 0); assert(r == 0);
toku_multi_operation_client_lock(); toku_multi_operation_client_lock();
...@@ -173,12 +227,6 @@ toku_checkpoint_safe_client_unlock(void) { ...@@ -173,12 +227,6 @@ toku_checkpoint_safe_client_unlock(void) {
} }
void
toku_checkpoint_get_status(CHECKPOINT_STATUS s) {
*s = cp_status;
}
// Initialize the checkpoint mechanism, must be called before any client operations. // Initialize the checkpoint mechanism, must be called before any client operations.
int int
...@@ -206,7 +254,7 @@ toku_checkpoint_destroy(void) { ...@@ -206,7 +254,7 @@ toku_checkpoint_destroy(void) {
return r; return r;
} }
#define SET_CHECKPOINT_FOOTPRINT(x) cp_status.footprint = footprint_offset + x #define SET_CHECKPOINT_FOOTPRINT(x) STATUS_VALUE(CP_FOOTPRINT) = footprint_offset + x
// Take a checkpoint of all currently open dictionaries // Take a checkpoint of all currently open dictionaries
...@@ -222,39 +270,39 @@ toku_checkpoint(CACHETABLE ct, TOKULOGGER logger, ...@@ -222,39 +270,39 @@ toku_checkpoint(CACHETABLE ct, TOKULOGGER logger,
if (locked_cs) { if (locked_cs) {
if (caller_id == SCHEDULED_CHECKPOINT) if (caller_id == SCHEDULED_CHECKPOINT)
(void) __sync_fetch_and_add(&cp_status.cp_wait_sched_cs, 1); (void) __sync_fetch_and_add(&STATUS_VALUE(CP_WAIT_SCHED_CS), 1);
else if (caller_id == CLIENT_CHECKPOINT) else if (caller_id == CLIENT_CHECKPOINT)
(void) __sync_fetch_and_add(&cp_status.cp_wait_client_cs, 1); (void) __sync_fetch_and_add(&STATUS_VALUE(CP_WAIT_CLIENT_CS), 1);
else if (caller_id == TXN_COMMIT_CHECKPOINT) else if (caller_id == TXN_COMMIT_CHECKPOINT)
(void) __sync_fetch_and_add(&cp_status.cp_wait_txn_cs, 1); (void) __sync_fetch_and_add(&STATUS_VALUE(CP_WAIT_TXN_CS), 1);
else else
(void) __sync_fetch_and_add(&cp_status.cp_wait_other_cs, 1); (void) __sync_fetch_and_add(&STATUS_VALUE(CP_WAIT_OTHER_CS), 1);
} }
(void) __sync_fetch_and_add(&cp_status.waiters_now, 1); (void) __sync_fetch_and_add(&STATUS_VALUE(CP_WAITERS_NOW), 1);
checkpoint_safe_checkpoint_lock(); checkpoint_safe_checkpoint_lock();
(void) __sync_fetch_and_sub(&cp_status.waiters_now, 1); (void) __sync_fetch_and_sub(&STATUS_VALUE(CP_WAITERS_NOW), 1);
if (cp_status.waiters_now > cp_status.waiters_max) if (STATUS_VALUE(CP_WAITERS_NOW) > STATUS_VALUE(CP_WAITERS_MAX))
cp_status.waiters_max = cp_status.waiters_now; // threadsafe, within checkpoint_safe lock STATUS_VALUE(CP_WAITERS_MAX) = STATUS_VALUE(CP_WAITERS_NOW); // threadsafe, within checkpoint_safe lock
SET_CHECKPOINT_FOOTPRINT(10); SET_CHECKPOINT_FOOTPRINT(10);
if (locked_mo) { if (locked_mo) {
if (caller_id == SCHEDULED_CHECKPOINT) if (caller_id == SCHEDULED_CHECKPOINT)
cp_status.cp_wait_sched_mo++; // threadsafe, within checkpoint_safe lock STATUS_VALUE(CP_WAIT_SCHED_MO)++; // threadsafe, within checkpoint_safe lock
else if (caller_id == CLIENT_CHECKPOINT) else if (caller_id == CLIENT_CHECKPOINT)
cp_status.cp_wait_client_mo++; STATUS_VALUE(CP_WAIT_CLIENT_MO)++;
else if (caller_id == TXN_COMMIT_CHECKPOINT) else if (caller_id == TXN_COMMIT_CHECKPOINT)
cp_status.cp_wait_txn_mo++; STATUS_VALUE(CP_WAIT_TXN_MO)++;
else else
cp_status.cp_wait_other_mo++; STATUS_VALUE(CP_WAIT_OTHER_MO)++;
} }
multi_operation_checkpoint_lock(); multi_operation_checkpoint_lock();
SET_CHECKPOINT_FOOTPRINT(20); SET_CHECKPOINT_FOOTPRINT(20);
ydb_lock(); ydb_lock();
SET_CHECKPOINT_FOOTPRINT(30); SET_CHECKPOINT_FOOTPRINT(30);
cp_status.time_last_checkpoint_begin = time(NULL); STATUS_VALUE(CP_TIME_LAST_CHECKPOINT_BEGIN) = time(NULL);
r = toku_cachetable_begin_checkpoint(ct, logger); r = toku_cachetable_begin_checkpoint(ct, logger);
multi_operation_checkpoint_unlock(); multi_operation_checkpoint_unlock();
...@@ -270,26 +318,23 @@ toku_checkpoint(CACHETABLE ct, TOKULOGGER logger, ...@@ -270,26 +318,23 @@ toku_checkpoint(CACHETABLE ct, TOKULOGGER logger,
if (r==0 && logger) { if (r==0 && logger) {
last_completed_checkpoint_lsn = logger->last_completed_checkpoint_lsn; last_completed_checkpoint_lsn = logger->last_completed_checkpoint_lsn;
r = toku_logger_maybe_trim_log(logger, last_completed_checkpoint_lsn); r = toku_logger_maybe_trim_log(logger, last_completed_checkpoint_lsn);
cp_status.last_lsn = last_completed_checkpoint_lsn.lsn; STATUS_VALUE(CP_LAST_LSN) = last_completed_checkpoint_lsn.lsn;
} }
SET_CHECKPOINT_FOOTPRINT(60); SET_CHECKPOINT_FOOTPRINT(60);
cp_status.time_last_checkpoint_end = time(NULL); STATUS_VALUE(CP_TIME_LAST_CHECKPOINT_END) = time(NULL);
cp_status.time_last_checkpoint_begin_complete = cp_status.time_last_checkpoint_begin; STATUS_VALUE(CP_TIME_LAST_CHECKPOINT_BEGIN_COMPLETE) = STATUS_VALUE(CP_TIME_LAST_CHECKPOINT_BEGIN);
if (r == 0) if (r == 0)
cp_status.checkpoint_count++; STATUS_VALUE(CP_CHECKPOINT_COUNT)++;
else else
cp_status.checkpoint_count_fail++; STATUS_VALUE(CP_CHECKPOINT_COUNT_FAIL)++;
cp_status.footprint = 0; STATUS_VALUE(CP_FOOTPRINT) = 0;
checkpoint_safe_checkpoint_unlock(); checkpoint_safe_checkpoint_unlock();
return r; return r;
} }
#undef SET_CHECKPOINT_FOOTPRINT
#include <valgrind/drd.h> #include <valgrind/drd.h>
void __attribute__((__constructor__)) toku_checkpoint_drd_ignore(void); void __attribute__((__constructor__)) toku_checkpoint_drd_ignore(void);
void void
...@@ -298,3 +343,6 @@ toku_checkpoint_drd_ignore(void) { ...@@ -298,3 +343,6 @@ toku_checkpoint_drd_ignore(void) {
DRD_IGNORE_VAR(locked_mo); DRD_IGNORE_VAR(locked_mo);
DRD_IGNORE_VAR(locked_cs); DRD_IGNORE_VAR(locked_cs);
} }
#undef SET_CHECKPOINT_FOOTPRINT
#undef STATUS_VALUE
...@@ -80,35 +80,42 @@ int toku_checkpoint(CACHETABLE ct, TOKULOGGER logger, ...@@ -80,35 +80,42 @@ int toku_checkpoint(CACHETABLE ct, TOKULOGGER logger,
/****** /******
* These functions are called from the ydb level. * These functions are called from the ydb level.
* They return status information and have no side effects. * They return status information and have no side effects.
* Some status information may be incorrect because no locks are taken to collect status. * Some status information may be incorrect because no locks are taken to collect status.
* (If checkpoint is in progress, it may overwrite status info while it is being read.) * (If checkpoint is in progress, it may overwrite status info while it is being read.)
*****/ *****/
typedef enum {
CP_PERIOD,
CP_FOOTPRINT,
CP_TIME_LAST_CHECKPOINT_BEGIN,
CP_TIME_LAST_CHECKPOINT_BEGIN_COMPLETE,
CP_TIME_LAST_CHECKPOINT_END,
CP_LAST_LSN,
CP_CHECKPOINT_COUNT,
CP_CHECKPOINT_COUNT_FAIL,
CP_WAITERS_NOW, // how many threads are currently waiting for the checkpoint_safe lock to perform a checkpoint
CP_WAITERS_MAX, // max threads ever simultaneously waiting for the checkpoint_safe lock to perform a checkpoint
CP_CLIENT_WAIT_ON_MO, // how many times a client thread waited to take the multi_operation lock, not for checkpoint
CP_CLIENT_WAIT_ON_CS, // how many times a client thread waited for the checkpoint_safe lock, not for checkpoint
CP_WAIT_SCHED_CS, // how many times a scheduled checkpoint waited for the checkpoint_safe lock
CP_WAIT_CLIENT_CS, // how many times a client checkpoint waited for the checkpoint_safe lock
CP_WAIT_TXN_CS, // how many times a txn_commit checkpoint waited for the checkpoint_safe lock
CP_WAIT_OTHER_CS, // how many times a checkpoint for another purpose waited for the checkpoint_safe lock
CP_WAIT_SCHED_MO, // how many times a scheduled checkpoint waited for the multi_operation lock
CP_WAIT_CLIENT_MO, // how many times a client checkpoint waited for the multi_operation lock
CP_WAIT_TXN_MO, // how many times a txn_commit checkpoint waited for the multi_operation lock
CP_WAIT_OTHER_MO, // how many times a checkpoint for another purpose waited for the multi_operation lock
CP_STATUS_NUM_ROWS // number of rows in this status array
} cp_status_entry;
typedef struct { typedef struct {
uint64_t footprint; BOOL initialized;
time_t time_last_checkpoint_begin_complete; TOKU_ENGINE_STATUS_ROW_S status[CP_STATUS_NUM_ROWS];
time_t time_last_checkpoint_begin;
time_t time_last_checkpoint_end;
uint64_t last_lsn;
uint64_t checkpoint_count;
uint64_t checkpoint_count_fail;
uint64_t waiters_now; // how many threads are currently waiting for the checkpoint_safe lock to perform a checkpoint
uint64_t waiters_max; // max threads ever simultaneously waiting for the checkpoint_safe lock to perform a checkpoint
uint64_t client_wait_on_mo; // how many times a client thread waited for the multi_operation lock
uint64_t client_wait_on_cs; // how many times a client thread waited for the checkpoint_safe lock
uint64_t cp_wait_sched_cs; // how many times a scheduled checkpoint waited for the checkpoint_safe lock
uint64_t cp_wait_client_cs; // how many times a client checkpoint waited for the checkpoint_safe lock
uint64_t cp_wait_txn_cs; // how many times a txn_commit checkpoint waited for the checkpoint_safe lock
uint64_t cp_wait_other_cs; // how many times a checkpoint for another purpose waited for the checkpoint_safe lock
uint64_t cp_wait_sched_mo; // how many times a scheduled checkpoint waited for the multi_operation lock
uint64_t cp_wait_client_mo; // how many times a client checkpoint waited for the multi_operation lock
uint64_t cp_wait_txn_mo; // how many times a txn_commit checkpoint waited for the multi_operation lock
uint64_t cp_wait_other_mo; // how many times a checkpoint for another purpose waited for the multi_operation lock
} CHECKPOINT_STATUS_S, *CHECKPOINT_STATUS; } CHECKPOINT_STATUS_S, *CHECKPOINT_STATUS;
void toku_checkpoint_get_status(CHECKPOINT_STATUS stat); void toku_checkpoint_get_status(CACHETABLE ct, CHECKPOINT_STATUS stat);
#if defined(__cplusplus) || defined(__cilkplusplus) #if defined(__cplusplus) || defined(__cilkplusplus)
}; };
......
/* -*- mode: C; c-basic-offset: 4 -*- */ /* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
#ident "$Id$" #ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved." #ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it." #ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
...@@ -1350,25 +1350,54 @@ toku_logger_call_remove_finalize_callback(TOKULOGGER logger, DICTIONARY_ID dict_ ...@@ -1350,25 +1350,54 @@ toku_logger_call_remove_finalize_callback(TOKULOGGER logger, DICTIONARY_ID dict_
logger->remove_finalize_callback(dict_id, logger->remove_finalize_callback_extra); logger->remove_finalize_callback(dict_id, logger->remove_finalize_callback_extra);
} }
///////////////////////////////////////////////////////////////////////////////////
// Engine status
//
// Status is intended for display to humans to help understand system behavior.
// It does not need to be perfectly thread-safe.
void static LOGGER_STATUS_S logger_status;
toku_logger_get_status(TOKULOGGER logger, LOGGER_STATUS s) {
if (logger) { #define STATUS_INIT(k,t,l) { \
s->ilock_ctr = logger->input_lock_ctr; logger_status.status[k].keyname = #k; \
s->olock_ctr = logger->output_condition_lock_ctr; logger_status.status[k].type = t; \
s->swap_ctr = logger->swap_ctr; logger_status.status[k].legend = "logger: " l; \
s->panicked = logger->is_panicked;
s->panic_errno = logger->panic_errno;
} }
else {
s->ilock_ctr = 0; static void
s->olock_ctr = 0; status_init(void) {
s->swap_ctr = 0; // Note, this function initializes the keyname, type, and legend fields.
s->panicked = 0; // Value fields are initialized to zero by compiler.
s->panic_errno = 0; STATUS_INIT(LOGGER_NEXT_LSN, UINT64, "next LSN");
STATUS_INIT(LOGGER_ILOCK_CTR, UINT64, "ilock count");
STATUS_INIT(LOGGER_OLOCK_CTR, UINT64, "olock count");
STATUS_INIT(LOGGER_SWAP_CTR, UINT64, "swap count");
STATUS_INIT(LOGGER_PANICKED, UINT64, "panic");
STATUS_INIT(LOGGER_PANIC_ERRNO, UINT64, "panic errno");
logger_status.initialized = true;
}
#undef STATUS_INIT
#define STATUS_VALUE(x) logger_status.status[x].value.num
void
toku_logger_get_status(TOKULOGGER logger, LOGGER_STATUS statp) {
if (!logger_status.initialized)
status_init();
if (logger) {
STATUS_VALUE(LOGGER_NEXT_LSN) = logger->lsn.lsn;
STATUS_VALUE(LOGGER_ILOCK_CTR) = logger->input_lock_ctr;
STATUS_VALUE(LOGGER_OLOCK_CTR) = logger->output_condition_lock_ctr;
STATUS_VALUE(LOGGER_SWAP_CTR) = logger->swap_ctr;
STATUS_VALUE(LOGGER_PANICKED) = logger->is_panicked;
STATUS_VALUE(LOGGER_PANIC_ERRNO) = logger->panic_errno;
} }
*statp = logger_status;
} }
//////////////////////////////////////////////////////////////////////////////////////////////////////
// Used for upgrade: // Used for upgrade:
// if any valid log files exist in log_dir, then // if any valid log files exist in log_dir, then
// set *found_any_logs to TRUE and set *version_found to version number of latest log // set *found_any_logs to TRUE and set *version_found to version number of latest log
...@@ -1411,3 +1440,4 @@ toku_get_version_of_logs_on_disk(const char *log_dir, BOOL *found_any_logs, uint ...@@ -1411,3 +1440,4 @@ toku_get_version_of_logs_on_disk(const char *log_dir, BOOL *found_any_logs, uint
return r; return r;
} }
#undef STATUS_VALUE
...@@ -164,14 +164,23 @@ toku_logger_maybe_fsync (TOKULOGGER logger, LSN lsn, int do_fsync); ...@@ -164,14 +164,23 @@ toku_logger_maybe_fsync (TOKULOGGER logger, LSN lsn, int do_fsync);
// fsync // fsync
// release the outlock // release the outlock
typedef struct logger_status {
u_int64_t ilock_ctr; typedef enum {
u_int64_t olock_ctr; LOGGER_NEXT_LSN = 0,
u_int64_t swap_ctr; LOGGER_ILOCK_CTR,
u_int64_t panicked; LOGGER_OLOCK_CTR,
u_int64_t panic_errno; LOGGER_SWAP_CTR,
LOGGER_PANICKED,
LOGGER_PANIC_ERRNO,
LOGGER_STATUS_NUM_ROWS
} logger_status_entry;
typedef struct {
BOOL initialized;
TOKU_ENGINE_STATUS_ROW_S status[LOGGER_STATUS_NUM_ROWS];
} LOGGER_STATUS_S, *LOGGER_STATUS; } LOGGER_STATUS_S, *LOGGER_STATUS;
void toku_logger_get_status(TOKULOGGER logger, LOGGER_STATUS s); void toku_logger_get_status(TOKULOGGER logger, LOGGER_STATUS s);
int toku_get_version_of_logs_on_disk(const char *log_dir, BOOL *found_any_logs, uint32_t *version_found); int toku_get_version_of_logs_on_disk(const char *log_dir, BOOL *found_any_logs, uint32_t *version_found);
......
...@@ -10,6 +10,8 @@ ...@@ -10,6 +10,8 @@
toku_pthread_mutex_t attr_mutex; toku_pthread_mutex_t attr_mutex;
// used to access engine status variables
#define STATUS_VALUE(x) ct_status.status[x].value.num
const PAIR_ATTR attrs[] = { const PAIR_ATTR attrs[] = {
{ .size = 20, .nonleaf_size = 13, .leaf_size = 900, .rollback_size = 123, .cache_pressure_size = 403 }, { .size = 20, .nonleaf_size = 13, .leaf_size = 900, .rollback_size = 123, .cache_pressure_size = 403 },
...@@ -62,12 +64,12 @@ run_test (void) { ...@@ -62,12 +64,12 @@ run_test (void) {
CACHEFILE f1; CACHEFILE f1;
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0); r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
CACHETABLE_STATUS_S ct_stat; CACHETABLE_STATUS_S ct_status;
toku_cachetable_get_status(ct, &ct_stat); toku_cachetable_get_status(ct, &ct_status);
assert(ct_stat.size_nonleaf == 0); assert(STATUS_VALUE(CT_SIZE_NONLEAF) == 0);
assert(ct_stat.size_leaf == 0); assert(STATUS_VALUE(CT_SIZE_LEAF) == 0);
assert(ct_stat.size_rollback == 0); assert(STATUS_VALUE(CT_SIZE_ROLLBACK) == 0);
assert(ct_stat.size_cachepressure == 0); assert(STATUS_VALUE(CT_SIZE_CACHEPRESSURE) == 0);
void* vs[n_pairs]; void* vs[n_pairs];
//void* v2; //void* v2;
...@@ -94,11 +96,11 @@ run_test (void) { ...@@ -94,11 +96,11 @@ run_test (void) {
expect.cache_pressure_size += attrs[i].cache_pressure_size; expect.cache_pressure_size += attrs[i].cache_pressure_size;
} }
toku_cachetable_get_status(ct, &ct_stat); toku_cachetable_get_status(ct, &ct_status);
assert(ct_stat.size_nonleaf == (uint64_t) expect.nonleaf_size); assert(STATUS_VALUE(CT_SIZE_NONLEAF ) == (uint64_t) expect.nonleaf_size);
assert(ct_stat.size_leaf == (uint64_t) expect.leaf_size); assert(STATUS_VALUE(CT_SIZE_LEAF ) == (uint64_t) expect.leaf_size);
assert(ct_stat.size_rollback == (uint64_t) expect.rollback_size); assert(STATUS_VALUE(CT_SIZE_ROLLBACK ) == (uint64_t) expect.rollback_size);
assert(ct_stat.size_cachepressure == (uint64_t) expect.cache_pressure_size); assert(STATUS_VALUE(CT_SIZE_CACHEPRESSURE) == (uint64_t) expect.cache_pressure_size);
void *big_v; void *big_v;
long big_s; long big_s;
...@@ -116,11 +118,11 @@ run_test (void) { ...@@ -116,11 +118,11 @@ run_test (void) {
usleep(2*1024*1024); usleep(2*1024*1024);
toku_cachetable_get_status(ct, &ct_stat); toku_cachetable_get_status(ct, &ct_status);
assert(ct_stat.size_nonleaf == (uint64_t) expect.nonleaf_size); assert(STATUS_VALUE(CT_SIZE_NONLEAF ) == (uint64_t) expect.nonleaf_size);
assert(ct_stat.size_leaf == (uint64_t) expect.leaf_size); assert(STATUS_VALUE(CT_SIZE_LEAF ) == (uint64_t) expect.leaf_size);
assert(ct_stat.size_rollback == (uint64_t) expect.rollback_size); assert(STATUS_VALUE(CT_SIZE_ROLLBACK ) == (uint64_t) expect.rollback_size);
assert(ct_stat.size_cachepressure == (uint64_t) expect.cache_pressure_size); assert(STATUS_VALUE(CT_SIZE_CACHEPRESSURE) == (uint64_t) expect.cache_pressure_size);
toku_cachetable_verify(ct); toku_cachetable_verify(ct);
r = toku_cachefile_close(&f1, 0, FALSE, ZERO_LSN); assert(r == 0 && f1 == 0); r = toku_cachefile_close(&f1, 0, FALSE, ZERO_LSN); assert(r == 0 && f1 == 0);
...@@ -133,3 +135,5 @@ test_main(int argc, const char *argv[]) { ...@@ -133,3 +135,5 @@ test_main(int argc, const char *argv[]) {
run_test(); run_test();
return 0; return 0;
} }
#undef STATUS_VALUE
/* -*- mode: C; c-basic-offset: 4 -*- */ /* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
#ident "$Id$" #ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved." #ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it." #ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
...@@ -14,13 +14,48 @@ BOOL garbage_collection_debug = FALSE; ...@@ -14,13 +14,48 @@ BOOL garbage_collection_debug = FALSE;
static void verify_snapshot_system(TOKULOGGER logger); static void verify_snapshot_system(TOKULOGGER logger);
// accountability ///////////////////////////////////////////////////////////////////////////////////
static TXN_STATUS_S status; // Engine status
//
// Status is intended for display to humans to help understand system behavior.
// It does not need to be perfectly thread-safe.
static TXN_STATUS_S txn_status;
#define STATUS_INIT(k,t,l) { \
txn_status.status[k].keyname = #k; \
txn_status.status[k].type = t; \
txn_status.status[k].legend = "txn: " l; \
}
static void
status_init(void) {
// Note, this function initializes the keyname, type, and legend fields.
// Value fields are initialized to zero by compiler.
STATUS_INIT(TXN_BEGIN, UINT64, "begin");
STATUS_INIT(TXN_COMMIT, UINT64, "successful commits");
STATUS_INIT(TXN_ABORT, UINT64, "aborts");
STATUS_INIT(TXN_CLOSE, UINT64, "close (should be sum of aborts and commits)");
STATUS_INIT(TXN_NUM_OPEN, UINT64, "number currently open (should be begin - close)");
STATUS_INIT(TXN_MAX_OPEN, UINT64, "max number open simultaneously");
STATUS_INIT(TXN_OLDEST_LIVE, UINT64, "xid of oldest live transaction");
STATUS_INIT(TXN_OLDEST_STARTTIME, UNIXTIME, "start time of oldest live transaction");
txn_status.initialized = true;
}
#undef STATUS_INIT
#define STATUS_VALUE(x) txn_status.status[x].value.num
void void
toku_txn_get_status(TXN_STATUS s) { toku_txn_get_status(TOKULOGGER logger, TXN_STATUS s) {
*s = status; if (!txn_status.initialized)
status_init();
{
time_t oldest_starttime;
STATUS_VALUE(TXN_OLDEST_LIVE) = toku_logger_get_oldest_living_xid(logger, &oldest_starttime);
STATUS_VALUE(TXN_OLDEST_STARTTIME) = (uint64_t) oldest_starttime;
}
*s = txn_status;
} }
...@@ -274,10 +309,10 @@ int toku_txn_begin_with_xid ( ...@@ -274,10 +309,10 @@ int toku_txn_begin_with_xid (
if (r != 0) goto died; if (r != 0) goto died;
*tokutxn = result; *tokutxn = result;
status.begin++; STATUS_VALUE(TXN_BEGIN)++;
status.num_open++; STATUS_VALUE(TXN_NUM_OPEN)++;
if (status.num_open > status.max_open) if (STATUS_VALUE(TXN_NUM_OPEN) > STATUS_VALUE(TXN_MAX_OPEN))
status.max_open = status.num_open; STATUS_VALUE(TXN_MAX_OPEN) = STATUS_VALUE(TXN_NUM_OPEN);
if (garbage_collection_debug) { if (garbage_collection_debug) {
verify_snapshot_system(logger); verify_snapshot_system(logger);
} }
...@@ -383,7 +418,7 @@ int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv ...@@ -383,7 +418,7 @@ int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv
} }
if (r==0) { if (r==0) {
r = toku_rollback_commit(txn, yield, yieldv, oplsn); r = toku_rollback_commit(txn, yield, yieldv, oplsn);
status.commit++; STATUS_VALUE(TXN_COMMIT)++;
} }
// Make sure we release that lock (even if there was an error) // Make sure we release that lock (even if there was an error)
if (release_multi_operation_client_lock) toku_multi_operation_client_unlock(); if (release_multi_operation_client_lock) toku_multi_operation_client_unlock();
...@@ -420,7 +455,7 @@ int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn, ...@@ -420,7 +455,7 @@ int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn,
r = toku_log_xabort(txn->logger, &txn->do_fsync_lsn, 0, txn->txnid64); r = toku_log_xabort(txn->logger, &txn->do_fsync_lsn, 0, txn->txnid64);
if (r==0) { if (r==0) {
r = toku_rollback_abort(txn, yield, yieldv, oplsn); r = toku_rollback_abort(txn, yield, yieldv, oplsn);
status.abort++; STATUS_VALUE(TXN_ABORT)++;
} }
// Make sure we multi_operation_client_unlock release will happen even if there is an error // Make sure we multi_operation_client_unlock release will happen even if there is an error
if (release_multi_operation_client_lock) toku_multi_operation_client_unlock(); if (release_multi_operation_client_lock) toku_multi_operation_client_unlock();
...@@ -463,8 +498,8 @@ void toku_txn_close_txn(TOKUTXN txn) { ...@@ -463,8 +498,8 @@ void toku_txn_close_txn(TOKUTXN txn) {
if (garbage_collection_debug) if (garbage_collection_debug)
verify_snapshot_system(logger); verify_snapshot_system(logger);
status.close++; STATUS_VALUE(TXN_CLOSE)++;
status.num_open--; STATUS_VALUE(TXN_NUM_OPEN)--;
return; return;
} }
...@@ -741,3 +776,5 @@ TOKUTXN_STATE ...@@ -741,3 +776,5 @@ TOKUTXN_STATE
toku_txn_get_state(TOKUTXN txn) { toku_txn_get_state(TOKUTXN txn) {
return txn->state; return txn->state;
} }
#undef STATUS_VALUE
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
#ifndef TOKUTXN_H #ifndef TOKUTXN_H
#define TOKUTXN_H #define TOKUTXN_H
...@@ -65,16 +66,24 @@ BOOL toku_txnid_newer(TXNID a, TXNID b); ...@@ -65,16 +66,24 @@ BOOL toku_txnid_newer(TXNID a, TXNID b);
void toku_txn_force_fsync_on_commit(TOKUTXN txn); void toku_txn_force_fsync_on_commit(TOKUTXN txn);
typedef struct txn_status { typedef enum {
u_int64_t begin; // total number of transactions begun (does not include recovered txns) TXN_BEGIN, // total number of transactions begun (does not include recovered txns)
u_int64_t commit; // successful commits TXN_COMMIT, // successful commits
u_int64_t abort; TXN_ABORT,
u_int64_t close; // should be sum of aborts and commits TXN_CLOSE, // should be sum of aborts and commits
u_int64_t num_open; // should be begin - close TXN_NUM_OPEN, // should be begin - close
u_int64_t max_open; // max value of num_open TXN_MAX_OPEN, // max value of num_open
TXN_OLDEST_LIVE, // xid of oldest live transaction
TXN_OLDEST_STARTTIME, // start time of oldest live txn
TXN_STATUS_NUM_ROWS
} txn_status_entry;
typedef struct {
BOOL initialized;
TOKU_ENGINE_STATUS_ROW_S status[TXN_STATUS_NUM_ROWS];
} TXN_STATUS_S, *TXN_STATUS; } TXN_STATUS_S, *TXN_STATUS;
void toku_txn_get_status(TXN_STATUS s); void toku_txn_get_status(TOKULOGGER logger, TXN_STATUS s);
BOOL toku_is_txn_in_live_root_txn_list(TOKUTXN txn, TXNID xid); BOOL toku_is_txn_in_live_root_txn_list(TOKUTXN txn, TXNID xid);
......
...@@ -31,11 +31,45 @@ ...@@ -31,11 +31,45 @@
#define ULE_DEBUG 0 #define ULE_DEBUG 0
static LE_STATUS_S status;
static uint32_t ule_get_innermost_numbytes(ULE ule); static uint32_t ule_get_innermost_numbytes(ULE ule);
///////////////////////////////////////////////////////////////////////////////////
// Engine status
//
// Status is intended for display to humans to help understand system behavior.
// It does not need to be perfectly thread-safe.
static LE_STATUS_S le_status;
#define STATUS_INIT(k,t,l) { \
le_status.status[k].keyname = #k; \
le_status.status[k].type = t; \
le_status.status[k].legend = "le: " l; \
}
static void
status_init(void) {
// Note, this function initializes the keyname, type, and legend fields.
// Value fields are initialized to zero by compiler.
STATUS_INIT(LE_MAX_COMMITTED_XR, UINT64, "max committed xr");
STATUS_INIT(LE_MAX_PROVISIONAL_XR, UINT64, "max provisional xr");
STATUS_INIT(LE_EXPANDED, UINT64, "expanded");
STATUS_INIT(LE_MAX_MEMSIZE, UINT64, "max memsize");
le_status.initialized = true;
}
#undef STATUS_INIT
void
toku_le_get_status(LE_STATUS statp) {
if (!le_status.initialized)
status_init();
*statp = le_status;
}
#define STATUS_VALUE(x) le_status.status[x].value.num
/////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////
// Accessor functions used by outside world (e.g. indexer) // Accessor functions used by outside world (e.g. indexer)
// //
...@@ -52,10 +86,6 @@ void toku_ule_free(ULEHANDLE ule_p) { ...@@ -52,10 +86,6 @@ void toku_ule_free(ULEHANDLE ule_p) {
toku_free(ule_p); toku_free(ule_p);
} }
void
toku_le_get_status(LE_STATUS s) {
*s = status;
}
/////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////
...@@ -625,15 +655,15 @@ uxr_unpack_data(UXR uxr, uint8_t *p) { ...@@ -625,15 +655,15 @@ uxr_unpack_data(UXR uxr, uint8_t *p) {
// executed too often to be worth making threadsafe // executed too often to be worth making threadsafe
static inline void static inline void
update_le_status(ULE ule, size_t memsize, LE_STATUS s) { update_le_status(ULE ule, size_t memsize) {
if (ule->num_cuxrs > s->max_committed_xr) if (ule->num_cuxrs > STATUS_VALUE(LE_MAX_COMMITTED_XR))
s->max_committed_xr = ule->num_cuxrs; STATUS_VALUE(LE_MAX_COMMITTED_XR) = ule->num_cuxrs;
if (ule->num_puxrs > s->max_provisional_xr) if (ule->num_puxrs > STATUS_VALUE(LE_MAX_PROVISIONAL_XR))
s->max_provisional_xr = ule->num_puxrs; STATUS_VALUE(LE_MAX_PROVISIONAL_XR) = ule->num_puxrs;
if (ule->num_cuxrs > MAX_TRANSACTION_RECORDS) if (ule->num_cuxrs > MAX_TRANSACTION_RECORDS)
s->expanded++; STATUS_VALUE(LE_EXPANDED)++;
if (memsize > s->max_memsize) if (memsize > STATUS_VALUE(LE_MAX_MEMSIZE))
s->max_memsize = memsize; STATUS_VALUE(LE_MAX_MEMSIZE) = memsize;
} }
// Purpose is to return a newly allocated leaf entry in packed format, or // Purpose is to return a newly allocated leaf entry in packed format, or
...@@ -801,7 +831,7 @@ found_insert:; ...@@ -801,7 +831,7 @@ found_insert:;
*new_leafentry_memorysize = memsize; *new_leafentry_memorysize = memsize;
rval = 0; rval = 0;
cleanup: cleanup:
update_le_status(ule, memsize, &status); update_le_status(ule, memsize);
return rval; return rval;
} }
...@@ -2260,5 +2290,7 @@ toku_le_upgrade_13_14(LEAFENTRY_13 old_leafentry, ...@@ -2260,5 +2290,7 @@ toku_le_upgrade_13_14(LEAFENTRY_13 old_leafentry,
void __attribute__((__constructor__)) toku_ule_drd_ignore(void); void __attribute__((__constructor__)) toku_ule_drd_ignore(void);
void void
toku_ule_drd_ignore(void) { toku_ule_drd_ignore(void) {
DRD_IGNORE_VAR(status); DRD_IGNORE_VAR(le_status);
} }
#undef STATUS_VALUE
...@@ -25,35 +25,53 @@ struct ydb_big_lock { ...@@ -25,35 +25,53 @@ struct ydb_big_lock {
}; };
static struct ydb_big_lock ydb_big_lock; static struct ydb_big_lock ydb_big_lock;
// status is intended for display to humans to help understand system behavior.
// It does not need to be perfectly thread-safe.
static SCHEDULE_STATUS_S status;
static inline u_int64_t u64max(u_int64_t a, u_int64_t b) {return a > b ? a : b; } static inline u_int64_t u64max(u_int64_t a, u_int64_t b) {return a > b ? a : b; }
static void /* Status is intended for display to humans to help understand system behavior.
init_status(void) { * It does not need to be perfectly thread-safe.
status.ydb_lock_ctr = 0; */
status.num_waiters_now = 0; static volatile YDB_LOCK_STATUS_S ydb_lock_status;
status.max_waiters = 0;
status.total_sleep_time = 0; #define STATUS_INIT(k,t,l) { \
status.max_time_ydb_lock_held = 0; ydb_lock_status.status[k].keyname = #k; \
status.total_time_ydb_lock_held = 0; ydb_lock_status.status[k].type = t; \
status.total_time_since_start = 0; ydb_lock_status.status[k].legend = "ydb lock: " l; \
}
static void
status_init(void) {
// Note, this function initializes the keyname, type, and legend fields.
// Value fields are initialized to zero by compiler.
STATUS_INIT(YDB_LOCK_TAKEN, UINT64, "taken");
STATUS_INIT(YDB_LOCK_RELEASED, UINT64, "released");
STATUS_INIT(YDB_NUM_WAITERS_NOW, UINT64, "num waiters now");
STATUS_INIT(YDB_MAX_WAITERS, UINT64, "max waiters");
STATUS_INIT(YDB_TOTAL_SLEEP_TIME, UINT64, "total sleep time (usec)");
STATUS_INIT(YDB_MAX_TIME_YDB_LOCK_HELD, TOKUTIME, "max time held (sec)");
STATUS_INIT(YDB_TOTAL_TIME_YDB_LOCK_HELD, TOKUTIME, "total time held (sec)");
STATUS_INIT(YDB_TOTAL_TIME_SINCE_START, TOKUTIME, "total time since start (sec)");
ydb_lock_status.initialized = true;
} }
#undef STATUS_INIT
void void
toku_ydb_lock_get_status(SCHEDULE_STATUS statp) { toku_ydb_lock_get_status(YDB_LOCK_STATUS statp) {
*statp = status; if (!ydb_lock_status.initialized)
status_init();
*statp = ydb_lock_status;
} }
#define STATUS_VALUE(x) ydb_lock_status.status[x].value.num
/* End of status section.
*/
int int
toku_ydb_lock_init(void) { toku_ydb_lock_init(void) {
int r; int r;
r = toku_pthread_mutex_init(&ydb_big_lock.lock, NULL); resource_assert_zero(r); r = toku_pthread_mutex_init(&ydb_big_lock.lock, NULL); resource_assert_zero(r);
ydb_big_lock.starttime = get_tokutime(); ydb_big_lock.starttime = get_tokutime();
ydb_big_lock.acquired_time = 0; ydb_big_lock.acquired_time = 0;
init_status();
return r; return r;
} }
...@@ -66,7 +84,7 @@ toku_ydb_lock_destroy(void) { ...@@ -66,7 +84,7 @@ toku_ydb_lock_destroy(void) {
void void
toku_ydb_lock(void) { toku_ydb_lock(void) {
u_int32_t new_num_waiters = __sync_add_and_fetch(&status.num_waiters_now, 1); u_int32_t new_num_waiters = __sync_add_and_fetch(&STATUS_VALUE(YDB_NUM_WAITERS_NOW), 1);
int r = toku_pthread_mutex_lock(&ydb_big_lock.lock); resource_assert_zero(r); int r = toku_pthread_mutex_lock(&ydb_big_lock.lock); resource_assert_zero(r);
...@@ -76,30 +94,29 @@ toku_ydb_lock(void) { ...@@ -76,30 +94,29 @@ toku_ydb_lock(void) {
ydb_big_lock.acquired_time = now; ydb_big_lock.acquired_time = now;
// Update status // Update status
status.ydb_lock_ctr++; STATUS_VALUE(YDB_LOCK_TAKEN)++;
if (new_num_waiters > status.max_waiters) status.max_waiters = new_num_waiters; if (new_num_waiters > STATUS_VALUE(YDB_MAX_WAITERS))
status.total_time_since_start = now - ydb_big_lock.starttime; STATUS_VALUE(YDB_MAX_WAITERS) = new_num_waiters;
STATUS_VALUE(YDB_TOTAL_TIME_SINCE_START) = now - ydb_big_lock.starttime;
// invariant((status.ydb_lock_ctr & 0x01) == 1);
} }
static void static void
ydb_unlock_internal(unsigned long useconds) { ydb_unlock_internal(unsigned long useconds) {
status.ydb_lock_ctr++; STATUS_VALUE(YDB_LOCK_RELEASED)++;
// invariant((status.ydb_lock_ctr & 0x01) == 0);
tokutime_t now = get_tokutime(); tokutime_t now = get_tokutime();
tokutime_t time_held = now - ydb_big_lock.acquired_time; tokutime_t time_held = now - ydb_big_lock.acquired_time;
status.total_time_ydb_lock_held += time_held; STATUS_VALUE(YDB_TOTAL_TIME_YDB_LOCK_HELD) += time_held;
if (time_held > status.max_time_ydb_lock_held) status.max_time_ydb_lock_held = time_held; if (time_held > STATUS_VALUE(YDB_MAX_TIME_YDB_LOCK_HELD))
status.total_time_since_start = now - ydb_big_lock.starttime; STATUS_VALUE(YDB_MAX_TIME_YDB_LOCK_HELD) = time_held;
STATUS_VALUE(YDB_TOTAL_TIME_SINCE_START) = now - ydb_big_lock.starttime;
int r = toku_pthread_mutex_unlock(&ydb_big_lock.lock); resource_assert_zero(r); int r = toku_pthread_mutex_unlock(&ydb_big_lock.lock); resource_assert_zero(r);
int new_num_waiters = __sync_add_and_fetch(&status.num_waiters_now, -1); int new_num_waiters = __sync_add_and_fetch(&STATUS_VALUE(YDB_NUM_WAITERS_NOW), -1);
if (new_num_waiters > 0 && useconds > 0) { if (new_num_waiters > 0 && useconds > 0) {
__sync_add_and_fetch(&status.total_sleep_time, useconds); __sync_add_and_fetch(&STATUS_VALUE(YDB_TOTAL_SLEEP_TIME), useconds);
usleep(useconds); usleep(useconds);
} }
} }
...@@ -118,3 +135,5 @@ toku_pthread_mutex_t * ...@@ -118,3 +135,5 @@ toku_pthread_mutex_t *
toku_ydb_mutex(void) { toku_ydb_mutex(void) {
return &ydb_big_lock.lock; return &ydb_big_lock.lock;
} }
#undef STATUS_VALUE
...@@ -24,8 +24,49 @@ ...@@ -24,8 +24,49 @@
#include "xids.h" #include "xids.h"
#include "log-internal.h" #include "log-internal.h"
// for now ///////////////////////////////////////////////////////////////////////////////////
static INDEXER_STATUS_S status; // Engine status
//
// Status is intended for display to humans to help understand system behavior.
// It does not need to be perfectly thread-safe.
static INDEXER_STATUS_S indexer_status;
#define STATUS_INIT(k,t,l) { \
indexer_status.status[k].keyname = #k; \
indexer_status.status[k].type = t; \
indexer_status.status[k].legend = "indexer: " l; \
}
static void
status_init(void) {
// Note, this function initializes the keyname, type, and legend fields.
// Value fields are initialized to zero by compiler.
STATUS_INIT(INDEXER_CREATE, UINT64, "number of indexers successfully created");
STATUS_INIT(INDEXER_CREATE_FAIL, UINT64, "number of calls to toku_indexer_create_indexer() that failed");
STATUS_INIT(INDEXER_BUILD, UINT64, "number of calls to indexer->build() succeeded");
STATUS_INIT(INDEXER_BUILD_FAIL, UINT64, "number of calls to indexer->build() failed");
STATUS_INIT(INDEXER_CLOSE, UINT64, "number of calls to indexer->close() that succeeded");
STATUS_INIT(INDEXER_CLOSE_FAIL, UINT64, "number of calls to indexer->close() that failed");
STATUS_INIT(INDEXER_ABORT, UINT64, "number of calls to indexer->abort()");
STATUS_INIT(INDEXER_CURRENT, UINT64, "number of indexers currently in existence");
STATUS_INIT(INDEXER_MAX, UINT64, "max number of indexers that ever existed simultaneously");
indexer_status.initialized = true;
}
#undef STATUS_INIT
void
toku_indexer_get_status(INDEXER_STATUS statp) {
if (!indexer_status.initialized)
status_init();
*statp = indexer_status;
}
#define STATUS_VALUE(x) indexer_status.status[x].value.num
#include "indexer-internal.h" #include "indexer-internal.h"
...@@ -175,13 +216,13 @@ create_exit: ...@@ -175,13 +216,13 @@ create_exit:
*indexerp = indexer; *indexerp = indexer;
(void) __sync_fetch_and_add(&status.create, 1); (void) __sync_fetch_and_add(&STATUS_VALUE(INDEXER_CREATE), 1);
(void) __sync_fetch_and_add(&status.current, 1); (void) __sync_fetch_and_add(&STATUS_VALUE(INDEXER_CURRENT), 1);
if ( status.current > status.max ) if ( STATUS_VALUE(INDEXER_CURRENT) > STATUS_VALUE(INDEXER_MAX) )
status.max = status.current; // not worth a lock to make threadsafe, may be inaccurate STATUS_VALUE(INDEXER_MAX) = STATUS_VALUE(INDEXER_CURRENT); // NOT WORTH A LOCK TO MAKE THREADSAFE), may be inaccurate
} else { } else {
(void) __sync_fetch_and_add(&status.create_fail, 1); (void) __sync_fetch_and_add(&STATUS_VALUE(INDEXER_CREATE_FAIL), 1);
free_indexer(indexer); free_indexer(indexer);
} }
...@@ -270,9 +311,9 @@ build_index(DB_INDEXER *indexer) { ...@@ -270,9 +311,9 @@ build_index(DB_INDEXER *indexer) {
// - unique checks? // - unique checks?
if ( result == 0 ) { if ( result == 0 ) {
(void) __sync_fetch_and_add(&status.build, 1); (void) __sync_fetch_and_add(&STATUS_VALUE(INDEXER_BUILD), 1);
} else { } else {
(void) __sync_fetch_and_add(&status.build_fail, 1); (void) __sync_fetch_and_add(&STATUS_VALUE(INDEXER_BUILD_FAIL), 1);
} }
...@@ -282,7 +323,7 @@ build_index(DB_INDEXER *indexer) { ...@@ -282,7 +323,7 @@ build_index(DB_INDEXER *indexer) {
static int static int
close_indexer(DB_INDEXER *indexer) { close_indexer(DB_INDEXER *indexer) {
int r = 0; int r = 0;
(void) __sync_fetch_and_sub(&status.current, 1); (void) __sync_fetch_and_sub(&STATUS_VALUE(INDEXER_CURRENT), 1);
toku_ydb_lock(); toku_ydb_lock();
{ {
...@@ -307,17 +348,17 @@ close_indexer(DB_INDEXER *indexer) { ...@@ -307,17 +348,17 @@ close_indexer(DB_INDEXER *indexer) {
toku_ydb_unlock(); toku_ydb_unlock();
if ( r == 0 ) { if ( r == 0 ) {
(void) __sync_fetch_and_add(&status.close, 1); (void) __sync_fetch_and_add(&STATUS_VALUE(INDEXER_CLOSE), 1);
} else { } else {
(void) __sync_fetch_and_add(&status.close_fail, 1); (void) __sync_fetch_and_add(&STATUS_VALUE(INDEXER_CLOSE_FAIL), 1);
} }
return r; return r;
} }
static int static int
abort_indexer(DB_INDEXER *indexer) { abort_indexer(DB_INDEXER *indexer) {
(void) __sync_fetch_and_sub(&status.current, 1); (void) __sync_fetch_and_sub(&STATUS_VALUE(INDEXER_CURRENT), 1);
(void) __sync_fetch_and_add(&status.abort, 1); (void) __sync_fetch_and_add(&STATUS_VALUE(INDEXER_ABORT), 1);
toku_ydb_lock(); toku_ydb_lock();
{ {
...@@ -391,10 +432,6 @@ maybe_call_poll_func(DB_INDEXER *indexer, uint64_t loop_count) { ...@@ -391,10 +432,6 @@ maybe_call_poll_func(DB_INDEXER *indexer, uint64_t loop_count) {
return result; return result;
} }
void
toku_indexer_get_status(INDEXER_STATUS s) {
*s = status;
}
// this allows us to force errors under test. Flags are defined in indexer.h // this allows us to force errors under test. Flags are defined in indexer.h
void void
...@@ -407,3 +444,7 @@ DB * ...@@ -407,3 +444,7 @@ DB *
toku_indexer_get_src_db(DB_INDEXER *indexer) { toku_indexer_get_src_db(DB_INDEXER *indexer) {
return indexer->i->src_db; return indexer->i->src_db;
} }
#undef STATUS_VALUE
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
#ifndef TOKU_INDEXER_H #ifndef TOKU_INDEXER_H
#define TOKU_INDEXER_H #define TOKU_INDEXER_H
...@@ -70,16 +71,22 @@ void toku_indexer_set_test_only_flags(DB_INDEXER *indexer, int flags) __attribut ...@@ -70,16 +71,22 @@ void toku_indexer_set_test_only_flags(DB_INDEXER *indexer, int flags) __attribut
#define INDEXER_TEST_ONLY_ERROR_CALLBACK 1 #define INDEXER_TEST_ONLY_ERROR_CALLBACK 1
typedef struct indexer_status { typedef enum {
uint64_t create; // number of indexers successfully created INDEXER_CREATE = 0, // number of indexers successfully created
uint64_t create_fail; // number of calls to toku_indexer_create_indexer() that failed INDEXER_CREATE_FAIL, // number of calls to toku_indexer_create_indexer() that failed
uint64_t build; // number of calls to indexer->build() succeeded INDEXER_BUILD, // number of calls to indexer->build() succeeded
uint64_t build_fail; // number of calls to indexer->build() failed INDEXER_BUILD_FAIL, // number of calls to indexer->build() failed
uint64_t close; // number of calls to indexer->close() that succeeded INDEXER_CLOSE, // number of calls to indexer->close() that succeeded
uint64_t close_fail; // number of calls to indexer->close() that failed INDEXER_CLOSE_FAIL, // number of calls to indexer->close() that failed
uint64_t abort; // number of calls to indexer->abort() INDEXER_ABORT, // number of calls to indexer->abort()
uint32_t current; // number of indexers currently in existence INDEXER_CURRENT, // number of indexers currently in existence
uint32_t max; // max number of indexers that ever existed simultaneously INDEXER_MAX, // max number of indexers that ever existed simultaneously
INDEXER_STATUS_NUM_ROWS
} indexer_status_entry;
typedef struct {
BOOL initialized;
TOKU_ENGINE_STATUS_ROW_S status[INDEXER_STATUS_NUM_ROWS];
} INDEXER_STATUS_S, *INDEXER_STATUS; } INDEXER_STATUS_S, *INDEXER_STATUS;
void toku_indexer_get_status(INDEXER_STATUS s); void toku_indexer_get_status(INDEXER_STATUS s);
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
...@@ -19,11 +19,15 @@ static uint64_t htonl64(uint64_t x) { ...@@ -19,11 +19,15 @@ static uint64_t htonl64(uint64_t x) {
struct my_ltm_status { struct my_ltm_status {
uint32_t max_locks, curr_locks; uint32_t max_locks, curr_locks;
uint64_t max_lock_memory, curr_lock_memory; uint64_t max_lock_memory, curr_lock_memory;
LTM_STATUS_S status;
}; };
static void my_ltm_get_status(toku_ltm *ltm, struct my_ltm_status *my_status) { static void my_ltm_get_status(toku_ltm *ltm, struct my_ltm_status *my_status) {
toku_ltm_get_status(ltm, &my_status->max_locks, &my_status->curr_locks, &my_status->max_lock_memory, &my_status->curr_lock_memory, &my_status->status); LTM_STATUS_S status;
toku_ltm_get_status(ltm, &status);
my_status->max_locks = status.status[LTM_LOCKS_LIMIT].value;
my_status->curr_locks = status.status[LTM_LOCKS_CURR].value;
my_status->max_lock_memory = status.status[LTM_LOCK_MEMORY_LIMIT].value;
my_status->curr_lock_memory = status.status[LTM_LOCK_MEMORY_CURR].value;
} }
static void *my_malloc(size_t s) { static void *my_malloc(size_t s) {
......
This diff is collapsed.
...@@ -94,10 +94,9 @@ run_test(void) { ...@@ -94,10 +94,9 @@ run_test(void) {
r = db->put(db, txn, &k, &v, 0); CKERR(r); r = db->put(db, txn, &k, &v, 0); CKERR(r);
} }
r = txn->commit(txn, 0); CKERR(r); r = txn->commit(txn, 0); CKERR(r);
ENGINE_STATUS es;
r = env->get_engine_status(env, &es, NULL, 0); uint64_t merge_leaf = get_engine_status_val(env, "BRT_FLUSHER_MERGE_LEAF");
CKERR(r); if (merge_leaf > 0) {
if (es.merge_leaf > 0) {
if (verbose) printf("t=%d\n", t); if (verbose) printf("t=%d\n", t);
break; break;
} }
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment