Commit 22b9259f authored by Sergei Golubchik's avatar Sergei Golubchik

wt needs to use its own implementation of rwlocks with

reader preference, at least where system rwlocks are fair.

include/my_global.h:
  wt uses mutex-based rwlock implementation unless on linux
include/waiting_threads.h:
  mutex-based rwlock implementation with reader preference
mysys/thr_rwlock.c:
  revert the change. make my_rw_locks fair
mysys/waiting_threads.c:
  mutex-based rwlock implementation with reader preference.
  convert complex multi-line macros to static functions
parent 9bbce9be
...@@ -1524,6 +1524,15 @@ inline void operator delete[](void*, void*) { /* Do nothing */ } ...@@ -1524,6 +1524,15 @@ inline void operator delete[](void*, void*) { /* Do nothing */ }
*/ */
#ifdef TARGET_OS_LINUX #ifdef TARGET_OS_LINUX
#define NEED_EXPLICIT_SYNC_DIR 1 #define NEED_EXPLICIT_SYNC_DIR 1
#else
/*
On linux default rwlock scheduling policy is good enough for
waiting_threads.c, on other systems use our special implementation
(which is slower).
QQ perhaps this should be tested in configure ? how ?
*/
#define WT_RWLOCKS_USE_MUTEXES 1
#endif #endif
#if !defined(__cplusplus) && !defined(bool) #if !defined(__cplusplus) && !defined(bool)
......
...@@ -67,7 +67,6 @@ extern uint32 wt_success_stats; ...@@ -67,7 +67,6 @@ extern uint32 wt_success_stats;
e.g. accessing a resource by thd->waiting_for is safe, e.g. accessing a resource by thd->waiting_for is safe,
a resource cannot be freed as there's a thread waiting for it a resource cannot be freed as there's a thread waiting for it
*/ */
typedef struct st_wt_resource { typedef struct st_wt_resource {
WT_RESOURCE_ID id; WT_RESOURCE_ID id;
uint waiter_count; uint waiter_count;
...@@ -76,11 +75,27 @@ typedef struct st_wt_resource { ...@@ -76,11 +75,27 @@ typedef struct st_wt_resource {
pthread_mutex_t *mutex; pthread_mutex_t *mutex;
#endif #endif
/* /*
before the 'lock' all elements are mutable, after - immutable before the 'lock' all elements are mutable, after (and including) -
in the sense that lf_hash_insert() won't memcpy() over them. immutable in the sense that lf_hash_insert() won't memcpy() over them.
See wt_init(). See wt_init().
*/ */
#ifdef WT_RWLOCKS_USE_MUTEXES
/*
we need a special rwlock-like 'lock' to allow readers bypass
waiting writers, otherwise readers can deadlock.
writer starvation is technically possible, but unlikely, because
the contention is expected to be low.
*/
struct {
pthread_cond_t cond;
pthread_mutex_t mutex;
uint readers: 16;
uint pending_writers: 15;
uint write_locked: 1;
} lock;
#else
rw_lock_t lock; rw_lock_t lock;
#endif
pthread_cond_t cond; pthread_cond_t cond;
DYNAMIC_ARRAY owners; DYNAMIC_ARRAY owners;
} WT_RESOURCE; } WT_RESOURCE;
......
...@@ -89,7 +89,7 @@ int my_rw_rdlock(rw_lock_t *rwp) ...@@ -89,7 +89,7 @@ int my_rw_rdlock(rw_lock_t *rwp)
pthread_mutex_lock(&rwp->lock); pthread_mutex_lock(&rwp->lock);
/* active or queued writers */ /* active or queued writers */
while (( rwp->state < 0 )) while ((rwp->state < 0 ) || rwp->waiters)
pthread_cond_wait( &rwp->readers, &rwp->lock); pthread_cond_wait( &rwp->readers, &rwp->lock);
rwp->state++; rwp->state++;
...@@ -101,7 +101,7 @@ int my_rw_tryrdlock(rw_lock_t *rwp) ...@@ -101,7 +101,7 @@ int my_rw_tryrdlock(rw_lock_t *rwp)
{ {
int res; int res;
pthread_mutex_lock(&rwp->lock); pthread_mutex_lock(&rwp->lock);
if ((rwp->state < 0 )) if ((rwp->state < 0 ) || rwp->waiters)
res= EBUSY; /* Can't get lock */ res= EBUSY; /* Can't get lock */
else else
{ {
......
...@@ -133,56 +133,105 @@ uint32 wt_cycle_stats[2][WT_CYCLE_STATS+1], wt_success_stats; ...@@ -133,56 +133,105 @@ uint32 wt_cycle_stats[2][WT_CYCLE_STATS+1], wt_success_stats;
static my_atomic_rwlock_t cycle_stats_lock, wait_stats_lock, success_stats_lock; static my_atomic_rwlock_t cycle_stats_lock, wait_stats_lock, success_stats_lock;
#define increment_success_stats() \ static void increment_success_stats()
do { \ {
my_atomic_rwlock_wrlock(&success_stats_lock); \ my_atomic_rwlock_wrlock(&success_stats_lock);
my_atomic_add32(&wt_success_stats, 1); \ my_atomic_add32(&wt_success_stats, 1);
my_atomic_rwlock_wrunlock(&success_stats_lock); \ my_atomic_rwlock_wrunlock(&success_stats_lock);
} while (0) }
#define increment_cycle_stats(X,SLOT) \ static void increment_cycle_stats(uint depth, uint slot)
do { \ {
uint i= (X); \ if (depth >= WT_CYCLE_STATS)
if (i >= WT_CYCLE_STATS) \ depth= WT_CYCLE_STATS;
i= WT_CYCLE_STATS; \ my_atomic_rwlock_wrlock(&cycle_stats_lock);
my_atomic_rwlock_wrlock(&cycle_stats_lock); \ my_atomic_add32(&wt_cycle_stats[slot][depth], 1);
my_atomic_add32(&wt_cycle_stats[SLOT][i], 1); \ my_atomic_rwlock_wrunlock(&cycle_stats_lock);
my_atomic_rwlock_wrunlock(&cycle_stats_lock); \ }
} while (0)
static void increment_wait_stats(ulonglong waited,int ret)
#define increment_wait_stats(X,RET) \ {
do { \ uint i;
uint i; \ if ((ret) == ETIMEDOUT)
if ((RET) == ETIMEDOUT) \ i= WT_WAIT_STATS;
i= WT_WAIT_STATS; \ else
else \ for (i=0; i < WT_WAIT_STATS && waited/10 > wt_wait_table[i]; i++) ;
{ \ my_atomic_rwlock_wrlock(&wait_stats_lock);
ulonglong w=(X)/10; \ my_atomic_add32(wt_wait_stats+i, 1);
for (i=0; i < WT_WAIT_STATS && w > wt_wait_table[i]; i++) ; \ my_atomic_rwlock_wrunlock(&wait_stats_lock);
} \ }
my_atomic_rwlock_wrlock(&wait_stats_lock); \
my_atomic_add32(wt_wait_stats+i, 1); \ #ifdef WT_RWLOCKS_USE_MUTEXES
my_atomic_rwlock_wrunlock(&wait_stats_lock); \ static void rc_rwlock_init(WT_RESOURCE *rc)
} while (0) {
pthread_cond_init(&rc->lock.cond, 0);
#define rc_rdlock(X) \ pthread_mutex_init(&rc->lock.mutex, MY_MUTEX_INIT_FAST);
do { \ }
WT_RESOURCE *R=(X); \ static void rc_rwlock_destroy(WT_RESOURCE *rc)
DBUG_PRINT("wt", ("LOCK resid=%lld for READ", R->id.value)); \ {
rw_rdlock(&R->lock); \ pthread_cond_destroy(&rc->lock.cond);
} while (0) pthread_mutex_destroy(&rc->lock.mutex);
#define rc_wrlock(X) \ }
do { \ static void rc_rdlock(WT_RESOURCE *rc)
WT_RESOURCE *R=(X); \ {
DBUG_PRINT("wt", ("LOCK resid=%lld for WRITE", R->id.value)); \ DBUG_PRINT("wt", ("TRYLOCK resid=%ld for READ", (ulong)rc->id.value));
rw_wrlock(&R->lock); \ pthread_mutex_lock(&rc->lock.mutex);
} while (0) while (rc->lock.write_locked)
#define rc_unlock(X) \ pthread_cond_wait(&rc->lock.cond, &rc->lock.mutex);
do { \ rc->lock.readers++;
WT_RESOURCE *R=(X); \ pthread_mutex_unlock(&rc->lock.mutex);
DBUG_PRINT("wt", ("UNLOCK resid=%lld", R->id.value)); \ DBUG_PRINT("wt", ("LOCK resid=%ld for READ", (ulong)rc->id.value));
rw_unlock(&R->lock); \ }
} while (0) static void rc_wrlock(WT_RESOURCE *rc)
{
DBUG_PRINT("wt", ("TRYLOCK resid=%ld for WRITE", (ulong)rc->id.value));
pthread_mutex_lock(&rc->lock.mutex);
while (rc->lock.write_locked || rc->lock.readers)
pthread_cond_wait(&rc->lock.cond, &rc->lock.mutex);
rc->lock.write_locked=1;
pthread_mutex_unlock(&rc->lock.mutex);
DBUG_PRINT("wt", ("LOCK resid=%ld for WRITE", (ulong)rc->id.value));
}
static void rc_unlock(WT_RESOURCE *rc)
{
DBUG_PRINT("wt", ("UNLOCK resid=%ld", (ulong)rc->id.value));
pthread_mutex_lock(&rc->lock.mutex);
if (rc->lock.write_locked)
{
rc->lock.write_locked=0;
pthread_cond_broadcast(&rc->lock.cond);
}
else if (--rc->lock.readers == 0)
pthread_cond_broadcast(&rc->lock.cond);
pthread_mutex_unlock(&rc->lock.mutex);
}
#else
static void rc_rwlock_init(WT_RESOURCE *rc)
{
my_rwlock_init(&rc->lock, 0);
}
static void rc_rwlock_destroy(WT_RESOURCE *rc)
{
rwlock_destroy(&rc->lock);
}
static void rc_rdlock(WT_RESOURCE *rc)
{
DBUG_PRINT("wt", ("TRYLOCK resid=%ld for READ", (ulong)rc->id.value));
rw_rdlock(&rc->lock);
DBUG_PRINT("wt", ("LOCK resid=%ld for READ", (ulong)rc->id.value));
}
static void rc_wrlock(WT_RESOURCE *rc)
{
DBUG_PRINT("wt", ("TRYLOCK resid=%ld for WRITE", (ulong)rc->id.value));
rw_wrlock(&rc->lock);
DBUG_PRINT("wt", ("LOCK resid=%ld for WRITE", (ulong)rc->id.value));
}
static void rc_unlock(WT_RESOURCE *rc)
{
DBUG_PRINT("wt", ("UNLOCK resid=%ld", (ulong)rc->id.value));
rw_unlock(&rc->lock);
}
#endif
/* /*
All resources are stored in a lock-free hash. Different threads All resources are stored in a lock-free hash. Different threads
...@@ -202,7 +251,7 @@ static void wt_resource_init(uchar *arg) ...@@ -202,7 +251,7 @@ static void wt_resource_init(uchar *arg)
DBUG_ENTER("wt_resource_init"); DBUG_ENTER("wt_resource_init");
bzero(rc, sizeof(*rc)); bzero(rc, sizeof(*rc));
my_rwlock_init(&rc->lock, 0); rc_rwlock_init(rc);
pthread_cond_init(&rc->cond, 0); pthread_cond_init(&rc->cond, 0);
my_init_dynamic_array(&rc->owners, sizeof(WT_THD *), 0, 5); my_init_dynamic_array(&rc->owners, sizeof(WT_THD *), 0, 5);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
...@@ -220,7 +269,7 @@ static void wt_resource_destroy(uchar *arg) ...@@ -220,7 +269,7 @@ static void wt_resource_destroy(uchar *arg)
DBUG_ENTER("wt_resource_destroy"); DBUG_ENTER("wt_resource_destroy");
DBUG_ASSERT(rc->owners.elements == 0); DBUG_ASSERT(rc->owners.elements == 0);
rwlock_destroy(&rc->lock); rc_rwlock_destroy(rc);
pthread_cond_destroy(&rc->cond); pthread_cond_destroy(&rc->cond);
delete_dynamic(&rc->owners); delete_dynamic(&rc->owners);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
...@@ -490,7 +539,7 @@ static int deadlock_search(struct deadlock_arg *arg, WT_THD *blocker, ...@@ -490,7 +539,7 @@ static int deadlock_search(struct deadlock_arg *arg, WT_THD *blocker,
} }
end: end:
/* /*
Note that 'rc' is locked in this function, but it's never unlocked there. Note that 'rc' is locked in this function, but it's never unlocked here.
Instead it's saved in arg->rc and the *caller* is expected to unlock it. Instead it's saved in arg->rc and the *caller* is expected to unlock it.
It's done to support different killing strategies. This is how it works: It's done to support different killing strategies. This is how it works:
Assuming a graph Assuming a graph
...@@ -549,6 +598,7 @@ static int deadlock(WT_THD *thd, WT_THD *blocker, uint depth, ...@@ -549,6 +598,7 @@ static int deadlock(WT_THD *thd, WT_THD *blocker, uint depth,
struct deadlock_arg arg= {thd, max_depth, 0, 0}; struct deadlock_arg arg= {thd, max_depth, 0, 0};
int ret; int ret;
DBUG_ENTER("deadlock"); DBUG_ENTER("deadlock");
DBUG_ASSERT(depth < 2);
ret= deadlock_search(&arg, blocker, depth); ret= deadlock_search(&arg, blocker, depth);
if (ret == WT_DEPTH_EXCEEDED) if (ret == WT_DEPTH_EXCEEDED)
{ {
...@@ -688,8 +738,8 @@ int wt_thd_will_wait_for(WT_THD *thd, WT_THD *blocker, WT_RESOURCE_ID *resid) ...@@ -688,8 +738,8 @@ int wt_thd_will_wait_for(WT_THD *thd, WT_THD *blocker, WT_RESOURCE_ID *resid)
LF_REQUIRE_PINS(3); LF_REQUIRE_PINS(3);
DBUG_PRINT("wt", ("enter: thd=%s, blocker=%s, resid=%llu", DBUG_PRINT("wt", ("enter: thd=%s, blocker=%s, resid=%lu",
thd->name, blocker->name, resid->value)); thd->name, blocker->name, (ulong)resid->value));
if (fix_thd_pins(thd)) if (fix_thd_pins(thd))
DBUG_RETURN(WT_DEADLOCK); DBUG_RETURN(WT_DEADLOCK);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment