Commit 7460fb68 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Merge changes from tokudb.2826. Fixes #2826. close[t:2826]

git-svn-id: file:///svn/toku/tokudb@22664 c7de825b-a66e-492c-adef-691d508d4ae1
parent 6810ac68
# -*- Mode: Makefile -*-
CPPFLAGS = -D_GNU_SOURCE -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE
CPPFLAGS += -I../../toku_include -I.. -I.
CFLAGS = -Wall -Werror -g -O0 -std=c99
OPTFLAGS = -O0
CFLAGS = -W -Wall -Werror -g $(OPTFLAGS) -std=c99
ifeq ($(GCCVERSION),4.4.2)
CFLAGS += -Wno-deprecated
else ifeq ($(GCCVERSION),4.4.1)
......@@ -38,6 +39,9 @@ all: $(TARGETS)
build: $(TARGETS)
check: $(TARGETS) $(RUNTARGETS);
test-rwlock: OPTFLAGS=-O3
test-rwlock: ../toku_pthread.h
# pwrite4g needs an argument to tell it which directory to write temporary files
test-pwrite4g.tdbrun: TEST_EXTRA_ARGS=.
%.tdbrun: %
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2010 Tokutek Inc. All rights reserved."
// Here are some timing numbers:
// (Note: The not-quite-working version with cas can be found in r22519 of https://svn.tokutek.com/tokudb/toku/tokudb.2825/) It's about as fast as "Best cas".)
//
// On ramie (2.53GHz E5540)
// Best nop time= 1.074300ns
// Best cas time= 8.595600ns
// Best mutex time= 19.340201ns
// Best rwlock time= 34.024799ns
// Best newbrt rwlock time= 38.680500ns
// Best prelocked time= 2.148700ns
// Best fair rwlock time= 45.127600ns
// On laptop
// Best nop time= 2.876000ns
// Best cas time= 15.362500ns
// Best mutex time= 51.951498ns
// Best rwlock time= 97.721201ns
// Best newbrt rwlock time=110.456800ns
// Best prelocked time= 4.240100ns
// Best fair rwlock time=113.119102ns
//
// Analysis: If the mutex can be prelocked (as cachetable does, it uses the same mutex for the cachetable and for the condition variable protecting the cache table)
// then you can save quite a bit. What does the cachetable do?
// During pin: (In the common case:) It grabs the mutex, grabs a read lock, and releases the mutex.
// During unpin: It grabs the mutex, unlocks the rwlock lock in the pair, and releases the mutex.
// Both actions must acquire a cachetable lock during that time, so definitely saves time to do it that way.
#include <toku_pthread.h>
#include <toku_portability.h>
#include <toku_time.h>
#include <pthread.h>
#include <toku_assert.h>
#include <sys/time.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include "../../newbrt/rwlock.h"
#include <sys/types.h>
static int verbose=1;
static int timing_only=0;
static void parse_args (int argc, const char *argv[]) {
const char *progname = argv[0];
argc--; argv++;
while (argc>0) {
if (strcmp(argv[0], "-v")==0) {
verbose++;
} else if (strcmp(argv[0], "-q")==0) {
verbose--;
} else if (strcmp(argv[0], "--timing-only")==0) {
timing_only=1;
} else {
fprintf(stderr, "Usage: %s {-q}* {-v}* {--timing-only}\n", progname);
exit(1);
}
argc--; argv++;
}
}
static const int T=6;
static const int N=10000000;
static double best_nop_time=1e12;
static double best_cas_time=1e12;
static double best_mutex_time=1e12;
static double best_rwlock_time=1e12;
static double best_newbrt_time=1e12;
static double best_prelocked_time=1e12;
static double best_fair_rwlock_time=1e12;
static double mind(double a, double b) { if (a<b) return a; else return b; }
#if 0
// gcc 4.4.4 (fedora 12) doesn't introduce memory barriers on these writes, so I think that volatile is not enough for sequential consistency.
// Intel guarantees that writes are seen in the same order as they were performed on one processor. But if there were two processors, funny things could happen.
volatile int sc_a, sc_b;
void sequential_consistency (void) {
sc_a = 1;
sc_b = 0;
}
#endif
// Declaring val to be volatile produces essentially identical code as putting the asm volatile memory statements in.
// gcc is not introducing memory barriers to force sequential consistency on volatile memory writes.
// That's probably good enough for us, since we'll have a barrier instruction anywhere it matters.
volatile int val = 0;
/* not static */
void time_nop (void) {
struct timeval start,end;
for (int t=0; t<T; t++) {
gettimeofday(&start, NULL);
for (int i=0; i<N; i++) {
if (val!=0) abort();
val=1;
//__asm__ volatile ("" : : : "memory");
val=0;
//__asm__ volatile ("" : : : "memory");
}
gettimeofday(&end, NULL);
double diff = 1e9*toku_tdiff(&end, &start)/N;
if (verbose>1)
fprintf(stderr, "nop = %.6fns/(lock+unlock)\n", diff);
best_nop_time=mind(best_nop_time,diff);
}
}
/* not static */
void time_cas (void) {
volatile int val = 0;
struct timeval start,end;
for (int t=0; t<T; t++) {
gettimeofday(&start, NULL);
for (int i=0; i<N; i++) {
{ int r = __sync_val_compare_and_swap(&val, 0, 1); assert(r==0); }
val = 0;
}
gettimeofday(&end, NULL);
double diff = 1e9*toku_tdiff(&end, &start)/N;
if (verbose>1)
fprintf(stderr, "cas = %.6fns/(lock+unlock)\n", diff);
best_cas_time=mind(best_cas_time,diff);
}
}
/* not static */
void time_pthread_mutex (void) {
pthread_mutex_t mutex;
{ int r = pthread_mutex_init(&mutex, NULL); assert(r==0); }
struct timeval start,end;
pthread_mutex_lock(&mutex);
pthread_mutex_unlock(&mutex);
for (int t=0; t<T; t++) {
gettimeofday(&start, NULL);
for (int i=0; i<N; i++) {
pthread_mutex_lock(&mutex);
pthread_mutex_unlock(&mutex);
}
gettimeofday(&end, NULL);
double diff = 1e9*toku_tdiff(&end, &start)/N;
if (verbose>1)
fprintf(stderr, "pthread_mutex = %.6fns/(lock+unlock)\n", diff);
best_mutex_time=mind(best_mutex_time,diff);
}
{ int r = pthread_mutex_destroy(&mutex); assert(r==0); }
}
/* not static */
void time_pthread_rwlock (void) {
pthread_rwlock_t mutex;
{ int r = pthread_rwlock_init(&mutex, NULL); assert(r==0); }
struct timeval start,end;
pthread_rwlock_rdlock(&mutex);
pthread_rwlock_unlock(&mutex);
for (int t=0; t<T; t++) {
gettimeofday(&start, NULL);
for (int i=0; i<N; i++) {
pthread_rwlock_rdlock(&mutex);
pthread_rwlock_unlock(&mutex);
}
gettimeofday(&end, NULL);
double diff = 1e9*toku_tdiff(&end, &start)/N;
if (verbose>1)
fprintf(stderr, "pthread_rwlock(r) = %.6fns/(lock+unlock)\n", diff);
best_rwlock_time=mind(best_rwlock_time,diff);
}
{ int r = pthread_rwlock_destroy(&mutex); assert(r==0); }
}
static void newbrt_rwlock_lock (RWLOCK rwlock, toku_pthread_mutex_t *mutex) {
{ int r = toku_pthread_mutex_lock(mutex); assert(r==0); }
rwlock_read_lock(rwlock, mutex);
{ int r = toku_pthread_mutex_unlock(mutex); assert(r==0); }
}
static void newbrt_rwlock_unlock (RWLOCK rwlock, toku_pthread_mutex_t *mutex) {
{ int r = toku_pthread_mutex_lock(mutex); assert(r==0); }
rwlock_read_unlock(rwlock);
{ int r = toku_pthread_mutex_unlock(mutex); assert(r==0); }
}
// Time the read lock that's in newbrt/rwlock.h
/* not static */
void time_newbrt_rwlock (void) {
struct rwlock rwlock;
toku_pthread_mutex_t external_mutex;
{ int r = pthread_mutex_init(&external_mutex, NULL); assert(r==0); }
rwlock_init(&rwlock);
struct timeval start,end;
newbrt_rwlock_lock(&rwlock, &external_mutex);
newbrt_rwlock_unlock(&rwlock, &external_mutex);
for (int t=0; t<T; t++) {
gettimeofday(&start, NULL);
for (int i=0; i<N; i++) {
newbrt_rwlock_lock(&rwlock, &external_mutex);
newbrt_rwlock_unlock(&rwlock, &external_mutex);
}
gettimeofday(&end, NULL);
double diff = 1e9*toku_tdiff(&end, &start)/N;
if (verbose>1)
fprintf(stderr, "newbrt_rwlock(r) = %.6fns/(lock+unlock)\n", diff);
best_newbrt_time=mind(best_newbrt_time,diff);
}
rwlock_destroy(&rwlock);
{ int r = pthread_mutex_destroy(&external_mutex); assert(r==0); }
}
// Time the read lock that's in newbrt/rwlock.h, assuming the mutex is already held.
/* not static*/
void time_newbrt_prelocked_rwlock (void) {
struct rwlock rwlock;
toku_pthread_mutex_t external_mutex;
{ int r = pthread_mutex_init(&external_mutex, NULL); assert(r==0); }
{ int r = toku_pthread_mutex_lock(&external_mutex); assert(r==0); }
rwlock_init(&rwlock);
struct timeval start,end;
rwlock_read_lock(&rwlock, &external_mutex);
rwlock_read_unlock(&rwlock);
for (int t=0; t<T; t++) {
gettimeofday(&start, NULL);
for (int i=0; i<N; i++) {
rwlock_read_lock(&rwlock, &external_mutex);
rwlock_read_unlock(&rwlock);
}
gettimeofday(&end, NULL);
double diff = 1e9*toku_tdiff(&end, &start)/N;
if (verbose>1)
fprintf(stderr, "newbrt_rwlock(r) = %.6fns/(lock+unlock)\n", diff);
best_prelocked_time=mind(best_prelocked_time,diff);
}
rwlock_destroy(&rwlock);
{ int r = toku_pthread_mutex_unlock(&external_mutex); assert(r==0); }
{ int r = pthread_mutex_destroy(&external_mutex); assert(r==0); }
}
/* not static*/
void time_toku_fair_rwlock (void) {
toku_fair_rwlock_t mutex;
{ int r = toku_fair_rwlock_init(&mutex); assert(r==0); }
struct timeval start,end;
toku_fair_rwlock_rdlock(&mutex);
toku_fair_rwlock_unlock(&mutex);
for (int t=0; t<T; t++) {
gettimeofday(&start, NULL);
for (int i=0; i<N; i++) {
toku_fair_rwlock_rdlock(&mutex);
toku_fair_rwlock_unlock(&mutex);
}
gettimeofday(&end, NULL);
double diff = 1e9*toku_tdiff(&end, &start)/N;
if (verbose>1)
fprintf(stderr, "pthread_fair(r) = %.6fns/(lock+unlock)\n", diff);
best_fair_rwlock_time=mind(best_fair_rwlock_time,diff);
}
{ int r = toku_fair_rwlock_destroy(&mutex); assert(r==0); }
}
#define N 6
#define T 100000
#define L 5
#define N_LOG_ENTRIES (L*N*4)
static toku_fair_rwlock_t rwlock;
static struct log_s {
int threadid, loopid;
char action;
} actionlog[N_LOG_ENTRIES];
static int log_counter=0;
static void logit (int threadid, int loopid, char action) {
int my_log_counter = __sync_fetch_and_add(&log_counter, 1);
assert(my_log_counter<N_LOG_ENTRIES);
actionlog[my_log_counter].threadid = threadid;
actionlog[my_log_counter].loopid = loopid;
actionlog[my_log_counter].action = action;
}
// The action should look like this:
// Threads 0-2 are reader threads.
// Threads 3-6 are writer threads.
// The threads all repeatedly grab the lock, wait T steps, and release.
// If the readers can starve the writers, then most of the writers will be at the end.
// If the writers can starve the readers, then most of the readers will be at the end.
// The reader threads all grab the lock, wait T*2 steps, and release the lock.
// The writer threads
// First the writer threads wait time T while the reader threads all go for the lock.
// Before the first one lets go, the writer threads wake up and try to grab the lock. But the readers are still
// 3 threads (0-2) try to grab the lock all at once. They'll get it. They each sleep for time T*2
// 3 threads (3-6) try to grab the write lock. They'll get it one after another.
extern __thread int mytid;
static void grab_rdlock (int threadid, int iteration) {
logit(threadid, iteration, 't');
{ int r = toku_fair_rwlock_rdlock(&rwlock); assert(r==0); }
logit(threadid, iteration, 'R');
}
static void release_rdlock (int threadid, int iteration) {
logit(threadid, iteration, 'u');
{ int r = toku_fair_rwlock_unlock(&rwlock); assert(r==0); }
}
static void grab_wrlock (int threadid, int iteration) {
logit(threadid, iteration, 'T');
{ int r = toku_fair_rwlock_wrlock(&rwlock); assert(r==0); }
logit(threadid, iteration, 'W');
}
static void release_wrlock (int threadid, int iteration) {
logit(threadid, iteration, 'U');
{ int r = toku_fair_rwlock_unlock(&rwlock); assert(r==0);}
}
static void *start_thread (void *vv) {
int *vp=(int*)vv;
int v=*vp;
//printf("T%d=%ld\n", v, pthread_self());
switch(v) {
case 0:
case 1:
case 2:
for (int i=0; i<L; i++) {
grab_rdlock(v, i);
usleep(T);
release_rdlock(v, i);
}
break;
case 3:
case 4:
case 5:
for (int i=0; i<L; i++) {
grab_wrlock(v, i);
usleep(T);
release_wrlock(v, i);
}
}
return NULL;
}
static void *start_thread_random (void *vv) {
int *vp=(int*)vv;
int v=*vp;
for (int i=0; i<L; i++) {
if (random()%2==0) {
grab_rdlock(v, i);
for (int j=0; j<random()%20; j++) sched_yield();
release_rdlock(v, i);
for (int j=0; j<random()%20; j++) sched_yield();
} else {
grab_wrlock(v, i);
for (int j=0; j<random()%20; j++) sched_yield();
release_wrlock(v, i);
for (int j=0; j<random()%20; j++) sched_yield();
}
}
return NULL;
}
static void check_actionlog (int expected_writer_max_count,
int expected_reader_parallelism_min,
int expected_reader_parallelism_max)
// Effect:
// Make sure that writers are exclusive.
// Make sure that anyone who asks for a lock doesn't have one.
// Make sure that anyone granted a lock actually asked for a lock.
// Make sure that anyone who releases a lock has it.
// Make sure that readers don't starve writers, and writers don't starve readers. (Not sure how to code this up...)
{
int reader_max=0;
int writer_max=0;
int state=0;
char tstate[N];
for (int i=0; i<N; i++) tstate[i]=0;
for (int i=0; i<log_counter; i++) {
switch (actionlog[i].action) {
case 't': // fall through to 'T'
case 'T':
assert(tstate[actionlog[i].threadid]==0);
tstate[actionlog[i].threadid]=actionlog[i].action;
break;
case 'W':
assert(tstate[actionlog[i].threadid]=='T');
tstate[actionlog[i].threadid]=actionlog[i].action;
assert(state==0);
state=-1;
writer_max = 1;
break;
case 'U':
assert(tstate[actionlog[i].threadid]=='W');
tstate[actionlog[i].threadid]=0;
assert(state==-1);
state=0;
break;
case 'R':
assert(tstate[actionlog[i].threadid]=='t');
tstate[actionlog[i].threadid]=actionlog[i].action;
if (state<0) { printf("On step %d\n", i); }
assert(state>=0);
state++;
if (state>reader_max) reader_max=state;
break;
case 'u':
assert(tstate[actionlog[i].threadid]=='R');
tstate[actionlog[i].threadid]=0;
assert(state>=0);
state--;
break;
default:
abort();
}
}
assert(reader_max>=expected_reader_parallelism_min);
assert(reader_max<=expected_reader_parallelism_max);
assert(writer_max==expected_writer_max_count);
}
static void test_rwlock_internal (void *(*start_th)(void*), int max_wr, int min_rd, int max_rd) {
if (verbose>=2) printf("Running threads:\n");
log_counter=0;
pthread_t threads[N];
int v[N];
{
int r = toku_fair_rwlock_init(&rwlock);
assert(r==0);
}
for (int i=0; i<N; i++) {
v[i]=i;
int r = pthread_create(&threads[i], NULL, start_th, &v[i]);
assert(r==0);
}
for (int i=0; i<N; i++) {
void *rv;
int r = pthread_join(threads[i], &rv);
assert(rv==NULL);
assert(r==0);
}
if (verbose>1) {
for (int i=0; i<log_counter; i++) {
printf("%d: %*s%c%d\n", i, actionlog[i].threadid*4, "", actionlog[i].action, actionlog[i].loopid);
}
}
check_actionlog(max_wr, min_rd, max_rd);
{
int r = toku_fair_rwlock_destroy(&rwlock);
assert(r==0);
}
if (verbose>2) printf("OK\n");
}
static void test_rwlock (void) {
test_rwlock_internal(start_thread, 1, 2, 3);
for (int i=0; i<10; i++) {
test_rwlock_internal(start_thread_random, 1, 0, N);
}
}
int main (int argc, const char *argv[]) {
parse_args(argc, argv);
if (timing_only) {
time_nop();
time_cas();
time_pthread_mutex();
time_pthread_rwlock();
time_newbrt_rwlock();
time_newbrt_prelocked_rwlock();
time_toku_fair_rwlock();
if (verbose>0) {
printf("// Best nop time=%10.6fns\n", best_nop_time);
printf("// Best cas time=%10.6fns\n", best_cas_time);
printf("// Best mutex time=%10.6fns\n", best_mutex_time);
printf("// Best rwlock time=%10.6fns\n", best_rwlock_time);
printf("// Best newbrt rwlock time=%10.6fns\n", best_newbrt_time);
printf("// Best prelocked time=%10.6fns\n", best_prelocked_time);
printf("// Best fair rwlock time=%10.6fns\n", best_fair_rwlock_time);
}
} else {
test_rwlock();
}
return 0;
}
......@@ -168,6 +168,22 @@ toku_pthread_setspecific(toku_pthread_key_t key, void *data) {
return pthread_setspecific(key, data);
}
// Fair readers/writer locks. These are fair (meaning first-come first-served. No reader starvation, and no writer starvation). And they are
// probably faster than the linux readers/writer locks (pthread_rwlock_t).
struct toku_fair_rwlock_waiter_state; // this structure is used internally.
typedef struct toku_fair_rwlock_s {
toku_pthread_mutex_t mutex;
int state; // 0 means no locks, + is number of readers locked, -1 is a writer
struct toku_fair_rwlock_waiter_state *waiters_head, *waiters_tail;
} toku_fair_rwlock_t;
int toku_fair_rwlock_init (toku_fair_rwlock_t *rwlock);
int toku_fair_rwlock_destroy (toku_fair_rwlock_t *rwlock);
int toku_fair_rwlock_rdlock (toku_fair_rwlock_t *rwlock);
int toku_fair_rwlock_wrlock (toku_fair_rwlock_t *rwlock);
int toku_fair_rwlock_unlock (toku_fair_rwlock_t *rwlock);
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2010 Tokutek Inc. All rights reserved."
#include <pthread.h>
#include <toku_assert.h>
#include "toku_pthread.h"
struct toku_fair_rwlock_waiter_state {
char is_read;
struct toku_fair_rwlock_waiter_state *next;
pthread_cond_t cond;
};
static __thread struct toku_fair_rwlock_waiter_state waitstate = {0, NULL, PTHREAD_COND_INITIALIZER };
int toku_fair_rwlock_init (toku_fair_rwlock_t *rwlock) {
rwlock->state=0;
rwlock->waiters_head = NULL;
rwlock->waiters_tail = NULL;
return toku_pthread_mutex_init(&rwlock->mutex, NULL);
}
int toku_fair_rwlock_destroy (toku_fair_rwlock_t *rwlock) {
return toku_pthread_mutex_destroy(&rwlock->mutex);
}
int toku_fair_rwlock_rdlock (toku_fair_rwlock_t *rwlock) {
int r = toku_pthread_mutex_lock(&rwlock->mutex);
assert(r==0);
if (rwlock->waiters_head!=NULL || rwlock->state<0) {
// Someone is ahead of me in the queue, or someone has a lock.
// We use per-thread-state for the condition variable. A thread cannot get control and try to reuse the waiter state for something else.
if (rwlock->waiters_tail) {
rwlock->waiters_tail->next = &waitstate;
} else {
rwlock->waiters_head = &waitstate;
}
rwlock->waiters_tail = &waitstate;
waitstate.next = NULL;
waitstate.is_read = 1;
do {
r = toku_pthread_cond_wait(&waitstate.cond, &rwlock->mutex);
assert(r==0);
} while (rwlock->waiters_head!=&waitstate || rwlock->state<0);
rwlock->state++;
rwlock->waiters_head=waitstate.next;
if (waitstate.next==NULL) rwlock->waiters_tail=NULL;
if (rwlock->waiters_head && rwlock->waiters_head->is_read) {
r = toku_pthread_cond_signal(&rwlock->waiters_head->cond);
assert(r==0);
}
} else {
// No one is waiting, and any holders are readers.
rwlock->state++;
}
r = toku_pthread_mutex_unlock(&rwlock->mutex);
assert(r==0);
return 0;
}
int toku_fair_rwlock_wrlock (toku_fair_rwlock_t *rwlock) {
int r = toku_pthread_mutex_lock(&rwlock->mutex);
assert(r==0);
if (rwlock->waiters_head!=NULL || rwlock->state!=0) {
// Someone else is ahead of me, or someone has a lock the lock, so we must wait our turn.
if (rwlock->waiters_tail) {
rwlock->waiters_tail->next = &waitstate;
} else {
rwlock->waiters_head = &waitstate;
}
rwlock->waiters_tail = &waitstate;
waitstate.next = NULL;
waitstate.is_read = 0;
do {
r = toku_pthread_cond_wait(&waitstate.cond, &rwlock->mutex);
assert(r==0);
} while (rwlock->waiters_head!=&waitstate || rwlock->state!=0);
rwlock->waiters_head = waitstate.next;
if (waitstate.next==NULL) rwlock->waiters_tail=NULL;
}
rwlock->state = -1;
r = toku_pthread_mutex_unlock(&rwlock->mutex);
assert(r==0);
return 0;
}
int toku_fair_rwlock_unlock (toku_fair_rwlock_t *rwlock) {
int r = toku_pthread_mutex_lock(&rwlock->mutex);
assert(r==0);
assert(rwlock->state!=0);
if (rwlock->state>0) {
rwlock->state--;
} else {
rwlock->state=0;
}
if (rwlock->state==0 && rwlock->waiters_head) {
r = toku_pthread_cond_signal(&rwlock->waiters_head->cond);
assert(r==0);
} else {
// printf(" No one to wake\n");
}
r = toku_pthread_mutex_unlock(&rwlock->mutex);
assert(r==0);
return 0;
}
......@@ -4,7 +4,7 @@
#include <stdio.h>
#include <unistd.h>
int test_main(int argc, char *const argv[]) {
int test_main(int argc __attribute__((__unused__)), char *const argv[] __attribute__((__unused__))) {
int r;
toku_pthread_rwlock_t rwlock;
......
......@@ -15,7 +15,7 @@ static void *f(void *arg) {
return arg;
}
int test_main(int argc, char *const argv[]) {
int test_main(int argc __attribute__((__unused__)), char *const argv[] __attribute__((__unused__))) {
int r;
toku_pthread_rwlock_t rwlock;
toku_pthread_t tid;
......
......@@ -37,7 +37,7 @@ check_snprintf(int i) {
}
int test_main(int argc, char *const argv[]) {
int test_main(int argc __attribute__((__unused__)), char *const argv[] __attribute__((__unused__))) {
int i;
for (i = 0; i < 8; i++) {
check_snprintf(i);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment