Commit 6f6974aa authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

port fairsched from 2.2.0 to main (for linux only) refs[t:2083]

git-svn-id: file:///svn/toku/tokudb@15573 c7de825b-a66e-492c-adef-691d508d4ae1
parent fbce5906
......@@ -52,117 +52,117 @@ toku_pthread_rwlock_wrunlock(toku_pthread_rwlock_t *rwlock) {
return pthread_rwlock_unlock(rwlock);
}
int toku_pthread_yield(void) __attribute__((__visibility__("default")));
int
toku_pthread_yield(void) __attribute__((__visibility__("default")));
static inline
int toku_pthread_attr_init(toku_pthread_attr_t *attr) {
static inline int toku_pthread_attr_init(toku_pthread_attr_t *attr) {
return pthread_attr_init(attr);
}
static inline
int toku_pthread_attr_destroy(toku_pthread_attr_t *attr) {
static inline int
toku_pthread_attr_destroy(toku_pthread_attr_t *attr) {
return pthread_attr_destroy(attr);
}
static inline
int toku_pthread_attr_getstacksize(toku_pthread_attr_t *attr, size_t *stacksize) {
static inline int
toku_pthread_attr_getstacksize(toku_pthread_attr_t *attr, size_t *stacksize) {
return pthread_attr_getstacksize(attr, stacksize);
}
static inline
int toku_pthread_attr_setstacksize(toku_pthread_attr_t *attr, size_t stacksize) {
static inline int
toku_pthread_attr_setstacksize(toku_pthread_attr_t *attr, size_t stacksize) {
return pthread_attr_setstacksize(attr, stacksize);
}
static inline
int toku_pthread_create(toku_pthread_t *thread, const toku_pthread_attr_t *attr, void *(*start_function)(void *), void *arg) {
static inline int
toku_pthread_create(toku_pthread_t *thread, const toku_pthread_attr_t *attr, void *(*start_function)(void *), void *arg) {
return pthread_create(thread, attr, start_function, arg);
}
static inline
int toku_pthread_join(toku_pthread_t thread, void **value_ptr) {
static inline int
toku_pthread_join(toku_pthread_t thread, void **value_ptr) {
return pthread_join(thread, value_ptr);
}
static inline
toku_pthread_t toku_pthread_self(void) {
static inline toku_pthread_t
toku_pthread_self(void) {
return pthread_self();
}
#define TOKU_PTHREAD_MUTEX_INITIALIZER PTHREAD_MUTEX_INITIALIZER
static inline
int toku_pthread_mutex_init(toku_pthread_mutex_t *mutex, const toku_pthread_mutexattr_t *attr) {
static inline int
toku_pthread_mutex_init(toku_pthread_mutex_t *mutex, const toku_pthread_mutexattr_t *attr) {
return pthread_mutex_init(mutex, attr);
}
static inline
int toku_pthread_mutex_destroy(toku_pthread_mutex_t *mutex) {
static inline int
toku_pthread_mutex_destroy(toku_pthread_mutex_t *mutex) {
return pthread_mutex_destroy(mutex);
}
static inline
int toku_pthread_mutex_lock(toku_pthread_mutex_t *mutex) {
static inline int
toku_pthread_mutex_lock(toku_pthread_mutex_t *mutex) {
return pthread_mutex_lock(mutex);
}
static inline
int toku_pthread_mutex_trylock(toku_pthread_mutex_t *mutex) {
static inline int
toku_pthread_mutex_trylock(toku_pthread_mutex_t *mutex) {
return pthread_mutex_trylock(mutex);
}
static inline
int toku_pthread_mutex_unlock(toku_pthread_mutex_t *mutex) {
static inline int
toku_pthread_mutex_unlock(toku_pthread_mutex_t *mutex) {
return pthread_mutex_unlock(mutex);
}
static inline
int toku_pthread_cond_init(toku_pthread_cond_t *cond, const toku_pthread_condattr_t *attr) {
static inline int
toku_pthread_cond_init(toku_pthread_cond_t *cond, const toku_pthread_condattr_t *attr) {
return pthread_cond_init(cond, attr);
}
static inline
int toku_pthread_cond_destroy(toku_pthread_cond_t *cond) {
static inline int
toku_pthread_cond_destroy(toku_pthread_cond_t *cond) {
return pthread_cond_destroy(cond);
}
static inline
int toku_pthread_cond_wait(toku_pthread_cond_t *cond, toku_pthread_mutex_t *mutex) {
static inline int
toku_pthread_cond_wait(toku_pthread_cond_t *cond, toku_pthread_mutex_t *mutex) {
return pthread_cond_wait(cond, mutex);
}
static inline
int toku_pthread_cond_timedwait(toku_pthread_cond_t *cond, toku_pthread_mutex_t *mutex, toku_timespec_t *wakeup_at) {
static inline int
toku_pthread_cond_timedwait(toku_pthread_cond_t *cond, toku_pthread_mutex_t *mutex, toku_timespec_t *wakeup_at) {
return pthread_cond_timedwait(cond, mutex, wakeup_at);
}
static inline
int toku_pthread_cond_signal(toku_pthread_cond_t *cond) {
static inline int
toku_pthread_cond_signal(toku_pthread_cond_t *cond) {
return pthread_cond_signal(cond);
}
static inline
int toku_pthread_cond_broadcast(toku_pthread_cond_t *cond) {
static inline int
toku_pthread_cond_broadcast(toku_pthread_cond_t *cond) {
return pthread_cond_broadcast(cond);
}
static inline
int toku_pthread_key_create(toku_pthread_key_t *key, void (*destroyf)(void *)) {
static inline int
toku_pthread_key_create(toku_pthread_key_t *key, void (*destroyf)(void *)) {
return pthread_key_create(key, destroyf);
}
static inline
int toku_pthread_key_delete(toku_pthread_key_t key) {
static inline int
toku_pthread_key_delete(toku_pthread_key_t key) {
return pthread_key_delete(key);
}
static inline
void *toku_pthread_getspecific(toku_pthread_key_t key) {
static inline void *
toku_pthread_getspecific(toku_pthread_key_t key) {
return pthread_getspecific(key);
}
static inline
int toku_pthread_setspecific(toku_pthread_key_t key, void *data) {
static inline int
toku_pthread_setspecific(toku_pthread_key_t key, void *data) {
return pthread_setspecific(key, data);
}
......
......@@ -45,7 +45,7 @@ static void cachetable_reader(WORKITEM);
#define WHEN_TRACE_CT(x) ((void)0)
#endif
#define TOKU_DO_WAIT_TIME 0
#define TOKU_DO_WAIT_TIME 1
// these should be in the cachetable object, but we make them file-wide so that gdb can get them easily
static u_int64_t cachetable_hit;
......@@ -57,8 +57,8 @@ static u_int64_t cachetable_prefetches; // how many times has a block been pr
static u_int64_t cachetable_maybe_get_and_pins; // how many times has get_and_pin been called?
static u_int64_t cachetable_maybe_get_and_pin_hits; // how many times has get_and_pin() returned with a node?
#if TOKU_DO_WAIT_TIME
static u_int64_t cachetable_miss_time;
static u_int64_t cachetable_wait_time;
static u_int64_t cachetable_misstime;
static u_int64_t cachetable_waittime;
#endif
static u_int32_t cachetable_lock_ctr = 0;
......@@ -1074,8 +1074,10 @@ int toku_cachetable_put(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, v
}
#if TOKU_DO_WAIT_TIME
static u_int64_t tdelta(struct timeval *tnew, struct timeval *told) {
return (tnew->tv_sec * 1000000ULL + tnew->tv_usec) - (told->tv_sec * 1000000ULL + told->tv_usec);
static uint64_t get_tnow(void) {
struct timeval tv;
int r = gettimeofday(&tv, NULL); assert(r == 0);
return tv.tv_sec * 1000000ULL + tv.tv_usec;
}
#endif
......@@ -1142,7 +1144,7 @@ int toku_cachetable_get_and_pin(CACHEFILE cachefile, CACHEKEY key, u_int32_t ful
count++;
if (p->key.b==key.b && p->cachefile==cachefile) {
#if TOKU_DO_WAIT_TIME
struct timeval t0;
uint64_t t0 = 0;
int do_wait_time = 0;
#endif
if (p->rwlock.writer || p->rwlock.want_write) {
......@@ -1152,7 +1154,7 @@ int toku_cachetable_get_and_pin(CACHEFILE cachefile, CACHEKEY key, u_int32_t ful
cachetable_wait_writing++;
#if TOKU_DO_WAIT_TIME
do_wait_time = 1;
gettimeofday(&t0, NULL);
t0 = get_tnow();
#endif
}
if (p->checkpoint_pending) {
......@@ -1180,11 +1182,8 @@ int toku_cachetable_get_and_pin(CACHEFILE cachefile, CACHEKEY key, u_int32_t ful
rwlock_read_lock(&p->rwlock, ct->mutex);
}
#if TOKU_DO_WAIT_TIME
if (do_wait_time) {
struct timeval tnow;
gettimeofday(&tnow, NULL);
cachetable_wait_time += tdelta(&tnow, &t0);
}
if (do_wait_time)
cachetable_waittime += get_tnow() - t0;
#endif
get_and_pin_footprint = 8;
if (p->state == CTPAIR_INVALID) {
......@@ -1217,8 +1216,7 @@ int toku_cachetable_get_and_pin(CACHEFILE cachefile, CACHEKEY key, u_int32_t ful
get_and_pin_footprint = 10;
rwlock_write_lock(&p->rwlock, ct->mutex);
#if TOKU_DO_WAIT_TIME
struct timeval t0;
gettimeofday(&t0, NULL);
uint64_t t0 = get_tnow();
#endif
r = cachetable_fetch_pair(ct, cachefile, p);
if (r) {
......@@ -1228,9 +1226,7 @@ int toku_cachetable_get_and_pin(CACHEFILE cachefile, CACHEKEY key, u_int32_t ful
}
cachetable_miss++;
#if TOKU_DO_WAIT_TIME
struct timeval tnow;
gettimeofday(&tnow, NULL);
cachetable_miss_time += tdelta(&tnow, &t0);
cachetable_misstime += get_tnow() - t0;
#endif
get_and_pin_footprint = 11;
rwlock_read_lock(&p->rwlock, ct->mutex);
......@@ -1611,6 +1607,15 @@ toku_cachetable_close (CACHETABLE *ctp) {
return 0;
}
void toku_cachetable_get_miss_times(CACHETABLE UU(ct), uint64_t *misscount, uint64_t *misstime) {
if (misscount) *misscount = cachetable_miss;
#if TOKU_DO_WAIT_TIME
if (misstime) *misstime = cachetable_misstime;
#else
if (misstime) *misstime = 0;
#endif
}
int toku_cachetable_unpin_and_remove (CACHEFILE cachefile, CACHEKEY key) {
int r = ENOENT;
// Removing something already present is OK.
......
......@@ -60,10 +60,12 @@ int toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger, char **erro
void toku_cachetable_minicron_shutdown(CACHETABLE ct);
// Close the cachetable.
// Effects: All of the memory objects are flushed to disk, and the cachetable is
// destroyed.
// Effects: All of the memory objects are flushed to disk, and the cachetable is destroyed.
int toku_cachetable_close (CACHETABLE*); /* Flushes everything to disk, and destroys the cachetable. */
// Get the number of cachetable misses (in misscount) and the accumulated time waiting for reads (in misstime, units of microseconds)
void toku_cachetable_get_miss_times(CACHETABLE ct, uint64_t *misscount, uint64_t *misstime);
// Open a file and bind the file to a new cachefile object.
int toku_cachetable_openf (CACHEFILE *,CACHETABLE, const char */*fname*/, const char */*fname_relative_to_env*/,int flags, mode_t mode);
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2009 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
......@@ -13,38 +13,209 @@
#include <toku_portability.h>
#include "ydb-internal.h"
#include <string.h>
#include <assert.h>
#include <toku_pthread.h>
#include <sys/types.h>
static toku_pthread_mutex_t ydb_big_lock = TOKU_PTHREAD_MUTEX_INITIALIZER;
static u_int32_t ydb_lock_ctr = 0; // useful for debug at a live installation
#if __linux__
#define YDB_LOCK_MISS_TIME 1
#else
#define YDB_LOCK_FIFO 0
#endif
int
toku_ydb_lock_ctr(void) {
return ydb_lock_ctr;
struct ydb_big_lock {
toku_pthread_mutex_t lock;
#if YDB_LOCK_MISS_TIME
int32_t waiters;
toku_pthread_key_t time_key;
uint64_t start_misscount, start_misstime;
#endif
};
static struct ydb_big_lock ydb_big_lock;
// status is intended for display to humans to help understand system behavior.
// It does not need to be perfectly thread-safe.
static SCHEDULE_STATUS_S status;
static inline u_int64_t max(u_int64_t a, u_int64_t b) {return a > b ? a : b; }
#define MAX_SLEEP 1000000 // 1 second covers the case of a 5 level tree with 30 millisecond read delays and a few waiting threads
#if YDB_LOCK_MISS_TIME
#include "toku_atomic.h"
#define MAXTHELD 250000 // if lock was apparently held longer than 250 msec, then theld is probably invalid (do we still need this?)
struct ydbtime { // one per thread,
uint64_t tacquire; // valid only when lock is not held, this is the next time the thread may take the lock (0 if no latency required)
uint64_t theld_prev; // how long was lock held the previous time this thread held the lock
};
// get a timestamp in units of microseconds
static uint64_t
get_tnow(void) {
struct timeval tv;
int r = gettimeofday(&tv, NULL);
assert(r == 0);
return tv.tv_sec * 1000000ULL + tv.tv_usec;
}
#endif
static void
init_status(void) {
uint64_t cpuhz = 0;
#if YDB_LOCK_MISS_TIME
int r = toku_os_get_processor_frequency(&cpuhz); assert(r == 0);
#endif
status.ydb_lock_ctr = 0;
status.max_possible_sleep = MAX_SLEEP;
status.processor_freq_mhz = cpuhz / 1000000ULL;
status.max_requested_sleep = 0;
status.times_max_sleep_used = 0;
status.total_sleepers = 0;
status.total_sleep_time = 0;
status.max_waiters = 0;
status.total_waiters = 0;
status.total_clients = 0;
status.time_ydb_lock_held_unavailable = 0;
status.max_time_ydb_lock_held = 0;
status.total_time_ydb_lock_held = 0;
}
void
toku_ydb_lock_get_status(SCHEDULE_STATUS statp) {
*statp = status;
}
int
int
toku_ydb_lock_init(void) {
ydb_lock_ctr = 0;
int r = toku_pthread_mutex_init(&ydb_big_lock, NULL); assert(r == 0);
int r;
r = toku_pthread_mutex_init(&ydb_big_lock.lock, NULL); assert(r == 0);
#if YDB_LOCK_MISS_TIME
ydb_big_lock.waiters = 0;
r = toku_pthread_key_create(&ydb_big_lock.time_key, toku_free); assert(r == 0);
#endif
init_status();
return r;
}
int
int
toku_ydb_lock_destroy(void) {
int r = toku_pthread_mutex_destroy(&ydb_big_lock); assert(r == 0);
int r;
r = toku_pthread_mutex_destroy(&ydb_big_lock.lock); assert(r == 0);
#if YDB_LOCK_MISS_TIME
r = toku_pthread_key_delete(ydb_big_lock.time_key); assert(r == 0);
#endif
return r;
}
void toku_ydb_lock(void) {
void
toku_ydb_lock(void) {
#if YDB_LOCK_FIFO
int r = toku_pthread_mutex_lock(&ydb_big_lock); assert(r == 0);
ydb_lock_ctr++;
#endif
#if YDB_LOCK_MISS_TIME
int r;
u_int64_t requested_sleep = 0;
struct ydbtime *ydbtime = toku_pthread_getspecific(ydb_big_lock.time_key);
if (!ydbtime) { // allocate the per thread timestamp if not yet allocated
ydbtime = toku_malloc(sizeof (struct ydbtime));
assert(ydbtime);
memset(ydbtime, 0, sizeof (struct ydbtime));
r = toku_pthread_setspecific(ydb_big_lock.time_key, ydbtime);
assert(r == 0);
(void) toku_sync_fetch_and_add_uint64(&status.total_clients, 1);
}
if (ydbtime->tacquire) { // delay the thread if the lock acquire time is set and is less than the current time
if (0) printf("%"PRIu64"\n", ydbtime->tacquire);
uint64_t t = get_tnow();
if (t < ydbtime->tacquire) {
t = ydbtime->tacquire - t;
requested_sleep = t;
// put an upper bound on the sleep time since the timestamps may be crazy due to thread movement between cpu's or cpu frequency changes
if (t > MAX_SLEEP) {
t = MAX_SLEEP;
(void) toku_sync_fetch_and_add_uint64(&status.times_max_sleep_used, 1);
}
(void) toku_sync_fetch_and_add_uint64(&status.total_sleep_time, t);
(void) toku_sync_fetch_and_add_uint64(&status.total_sleepers, 1);
usleep(t);
}
}
r = toku_pthread_mutex_trylock(&ydb_big_lock.lock);
if (r != 0) { // if we can not get the lock, bump the count of the lock waits, and block on the lock
assert(r == EBUSY);
(void) toku_sync_fetch_and_add_int32(&ydb_big_lock.waiters, 1);
(void) toku_sync_fetch_and_add_uint64(&status.total_waiters, 1);
r = toku_pthread_mutex_lock(&ydb_big_lock.lock);
assert(r == 0);
(void) toku_sync_fetch_and_add_int32(&ydb_big_lock.waiters, -1);
}
status.max_requested_sleep = max(status.max_requested_sleep, requested_sleep);
toku_cachetable_get_miss_times(NULL, &ydb_big_lock.start_misscount, &ydb_big_lock.start_misstime);
#endif
status.ydb_lock_ctr++;
assert((status.ydb_lock_ctr & 0x01) == 1);
}
void toku_ydb_unlock(void) {
ydb_lock_ctr++;
void
toku_ydb_unlock(void) {
status.ydb_lock_ctr++;
assert((status.ydb_lock_ctr & 0x01) == 0);
#if YDB_LOCK_FIFO
int r = toku_pthread_mutex_unlock(&ydb_big_lock); assert(r == 0);
#endif
#if YDB_LOCK_MISS_TIME
struct ydbtime *ydbtime = toku_pthread_getspecific(ydb_big_lock.time_key);
assert(ydbtime);
int r;
uint64_t theld;
int waiters = ydb_big_lock.waiters; // get the number of lock waiters (used to compute the lock acquisition time)
if (waiters == 0) {
theld = 0;
} else {
uint64_t misscount, misstime;
toku_cachetable_get_miss_times(NULL, &misscount, &misstime);
misscount -= ydb_big_lock.start_misscount; // how many cache misses for this operation
misstime -= ydb_big_lock.start_misstime; // how many usec spent waiting for disk read this operation
if (0 && (misscount || misstime))
printf("%d %"PRIu64" %"PRIu64"\n", waiters, misscount, misstime);
if (misscount == 0) {
theld = 0;
} else {
theld = misstime ? misstime : misscount * 20000ULL; // if we decide not to compile in misstime, then backoff to 20 milliseconds per cache miss
if (theld < MAXTHELD) {
status.max_time_ydb_lock_held = max(status.max_time_ydb_lock_held, theld);
ydbtime->theld_prev = theld;
} else { // thread appears to have migrated (theld out of range)
theld = ydbtime->theld_prev; // if time measurement unavailable, assume same as previous use of ydb lock by this thread
status.time_ydb_lock_held_unavailable++;
}
status.max_waiters = max(status.max_waiters, waiters);
status.total_time_ydb_lock_held += theld;
}
}
r = toku_pthread_mutex_unlock(&ydb_big_lock.lock); assert(r == 0);
// we use a lower bound of 100 microseconds on the sleep time to avoid system call overhead for short sleeps
if (waiters == 0 || theld <= 100ULL)
ydbtime->tacquire = 0; // there is no delay on acquiring the lock the next time since there was no lock contention or the lock was not held very long
else
ydbtime->tacquire = get_tnow() + theld * waiters; // set the min time from now that the lock can not be reacquired
#endif
}
int
toku_ydb_lock_ctr(void) {
return status.ydb_lock_ctr;
}
......@@ -70,11 +70,29 @@ struct __toku_db_env_internal {
Ephemeral locking
********************************************************* */
typedef struct {
u_int64_t ydb_lock_ctr; /* how many times has ydb lock been taken/released */
u_int64_t max_possible_sleep; /* max possible sleep time for ydb lock scheduling (constant) */
u_int64_t processor_freq_mhz; /* clock frequency in MHz */
u_int64_t max_requested_sleep; /* max sleep time requested, can be larger than max possible */
u_int64_t times_max_sleep_used; /* number of times the max_possible_sleep was used to sleep */
u_int64_t total_sleepers; /* total number of times a client slept for ydb lock scheduling */
u_int64_t total_sleep_time; /* total time spent sleeping for ydb lock scheduling */
u_int64_t max_waiters; /* max number of simultaneous client threads kept waiting for ydb lock */
u_int64_t total_waiters; /* total number of times a client thread waited for ydb lock */
u_int64_t total_clients; /* total number of separate client threads that use ydb lock */
u_int64_t time_ydb_lock_held_unavailable; /* number of times a thread migrated and theld is unavailable */
u_int64_t max_time_ydb_lock_held; /* max time a client thread held the ydb lock */
u_int64_t total_time_ydb_lock_held;/* total time client threads held the ydb lock */
} SCHEDULE_STATUS_S, *SCHEDULE_STATUS;
int toku_ydb_lock_init(void);
int toku_ydb_lock_destroy(void);
void toku_ydb_lock(void);
void toku_ydb_unlock(void);
int toku_ydb_lock_ctr(void);
void toku_ydb_lock_get_status(SCHEDULE_STATUS statp);
/* *********************************************************
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment