Commit cca54d3e authored by Sergei Golubchik's avatar Sergei Golubchik

WL#3064 - waiting threads - wait-for graph and deadlock detection

client/mysqltest.c:
  compiler warnings
configure.in:
  remove old tests for unused programs
  disable the use of gcc built-ins if smp assembler atomics were selected explictily.
  add waiting_threads.o to THREAD_LOBJECTS
include/lf.h:
  replace the end-of-stack pointer with the pointer to the end-of-stack pointer.
  the latter could be stored in THD (mysys_vars) and updated in pool-of-threads
  scheduler.
  constructor/destructor in lf-alloc
include/my_pthread.h:
  shuffle set_timespec/set_timespec_nsec macros a bit to be able to fill
  several timeout structures with only one my_getsystime() call
include/waiting_threads.h:
  waiting threads - wait-for graph and deadlock detection
mysys/Makefile.am:
  add waiting_threads.c
mysys/lf_alloc-pin.c:
  replace the end-of-stack pointer with the pointer to the end-of-stack pointer.
  the latter could be stored in THD (mysys_vars) and updated in pool-of-threads
  scheduler.
  constructor/destructor in lf-alloc
mysys/lf_hash.c:
  constructor/destructor in lf-alloc
mysys/my_thr_init.c:
  remember end-of-stack pointer in the mysys_var
mysys/waiting_threads.c:
  waiting threads - wait-for graph and deadlock detection
storage/maria/ha_maria.cc:
  replace the end-of-stack pointer with the pointer to the end-of-stack pointer.
  the latter could be stored in THD (mysys_vars) and updated in pool-of-threads
  scheduler.
storage/maria/ma_commit.c:
  replace the end-of-stack pointer with the pointer to the end-of-stack pointer.
  the latter could be stored in THD (mysys_vars) and updated in pool-of-threads
  scheduler.
storage/maria/trnman.c:
  replace the end-of-stack pointer with the pointer to the end-of-stack pointer.
  the latter could be stored in THD (mysys_vars) and updated in pool-of-threads
  scheduler.
storage/maria/trnman_public.h:
  replace the end-of-stack pointer with the pointer to the end-of-stack pointer.
  the latter could be stored in THD (mysys_vars) and updated in pool-of-threads
  scheduler.
storage/maria/unittest/trnman-t.c:
  replace the end-of-stack pointer with the pointer to the end-of-stack pointer.
  the latter could be stored in THD (mysys_vars) and updated in pool-of-threads
  scheduler.
unittest/mysys/Makefile.am:
  add waiting_threads-t
unittest/mysys/lf-t.c:
  factor out the common code for multi-threaded stress unit tests
  move lf tests to a separate file
unittest/mysys/my_atomic-t.c:
  factor out the common code for multi-threaded stress unit tests
  move lf tests to a separate file
unittest/mysys/thr_template.c:
  factor out the common code for multi-threaded stress unit tests
unittest/mysys/waiting_threads-t.c:
  wt tests
parent 8848f6bd
...@@ -2815,7 +2815,7 @@ void do_mkdir(struct st_command *command) ...@@ -2815,7 +2815,7 @@ void do_mkdir(struct st_command *command)
int error; int error;
static DYNAMIC_STRING ds_dirname; static DYNAMIC_STRING ds_dirname;
const struct command_arg mkdir_args[] = { const struct command_arg mkdir_args[] = {
"dirname", ARG_STRING, TRUE, &ds_dirname, "Directory to create" {"dirname", ARG_STRING, TRUE, &ds_dirname, "Directory to create"}
}; };
DBUG_ENTER("do_mkdir"); DBUG_ENTER("do_mkdir");
...@@ -2845,7 +2845,7 @@ void do_rmdir(struct st_command *command) ...@@ -2845,7 +2845,7 @@ void do_rmdir(struct st_command *command)
int error; int error;
static DYNAMIC_STRING ds_dirname; static DYNAMIC_STRING ds_dirname;
const struct command_arg rmdir_args[] = { const struct command_arg rmdir_args[] = {
"dirname", ARG_STRING, TRUE, &ds_dirname, "Directory to remove" { "dirname", ARG_STRING, TRUE, &ds_dirname, "Directory to remove" }
}; };
DBUG_ENTER("do_rmdir"); DBUG_ENTER("do_rmdir");
......
...@@ -250,8 +250,6 @@ test -z "$INSTALL_SCRIPT" && INSTALL_SCRIPT='${INSTALL_PROGRAM}' ...@@ -250,8 +250,6 @@ test -z "$INSTALL_SCRIPT" && INSTALL_SCRIPT='${INSTALL_PROGRAM}'
# Not critical since the generated file is distributed # Not critical since the generated file is distributed
AC_CHECK_PROGS(YACC, ['bison -y -p MYSQL']) AC_CHECK_PROGS(YACC, ['bison -y -p MYSQL'])
AC_CHECK_PROG(PDFMANUAL, pdftex, manual.pdf)
AC_CHECK_PROG(DVIS, tex, manual.dvi)
#check the return type of sprintf #check the return type of sprintf
AC_MSG_CHECKING("return type of sprintf") AC_MSG_CHECKING("return type of sprintf")
...@@ -1726,18 +1724,17 @@ fi ...@@ -1726,18 +1724,17 @@ fi
AC_ARG_WITH([atomic-ops], AC_ARG_WITH([atomic-ops],
AC_HELP_STRING([--with-atomic-ops=rwlocks|smp|up], AC_HELP_STRING([--with-atomic-ops=rwlocks|smp|up],
[Implement atomic operations using pthread rwlocks or atomic CPU [Implement atomic operations using pthread rwlocks or atomic CPU
instructions for multi-processor (default) or uniprocessor instructions for multi-processor or uniprocessor
configuration]), , [with_atomic_ops=smp]) configuration. By default gcc built-in sync functions are used,
if available and 'smp' configuration otherwise.]))
case "$with_atomic_ops" in case "$with_atomic_ops" in
"up") AC_DEFINE([MY_ATOMIC_MODE_DUMMY], [1], "up") AC_DEFINE([MY_ATOMIC_MODE_DUMMY], [1],
[Assume single-CPU mode, no concurrency]) ;; [Assume single-CPU mode, no concurrency]) ;;
"rwlocks") AC_DEFINE([MY_ATOMIC_MODE_RWLOCKS], [1], "rwlocks") AC_DEFINE([MY_ATOMIC_MODE_RWLOCKS], [1],
[Use pthread rwlocks for atomic ops]) ;; [Use pthread rwlocks for atomic ops]) ;;
"smp") ;; "smp") ;;
*) AC_MSG_ERROR(["$with_atomic_ops" is not a valid value for --with-atomic-ops]) ;; "")
esac AC_CACHE_CHECK([whether the compiler provides atomic builtins],
AC_CACHE_CHECK([whether the compiler provides atomic builtins],
[mysql_cv_gcc_atomic_builtins], [AC_TRY_RUN([ [mysql_cv_gcc_atomic_builtins], [AC_TRY_RUN([
int main() int main()
{ {
...@@ -1752,14 +1749,17 @@ AC_CACHE_CHECK([whether the compiler provides atomic builtins], ...@@ -1752,14 +1749,17 @@ AC_CACHE_CHECK([whether the compiler provides atomic builtins],
return -1; return -1;
return 0; return 0;
} }
], [mysql_cv_gcc_atomic_builtins=yes], ], [mysql_cv_gcc_atomic_builtins=yes_but_disabled],
[mysql_cv_gcc_atomic_builtins=no], [mysql_cv_gcc_atomic_builtins=no],
[mysql_cv_gcc_atomic_builtins=no])]) [mysql_cv_gcc_atomic_builtins=no])])
if test "x$mysql_cv_gcc_atomic_builtins" = disabled_xyes; then if test "x$mysql_cv_gcc_atomic_builtins" = xyes; then
AC_DEFINE(HAVE_GCC_ATOMIC_BUILTINS, 1, AC_DEFINE(HAVE_GCC_ATOMIC_BUILTINS, 1,
[Define to 1 if compiler provides atomic builtins.]) [Define to 1 if compiler provides atomic builtins.])
fi fi
;;
*) AC_MSG_ERROR(["$with_atomic_ops" is not a valid value for --with-atomic-ops]) ;;
esac
# Force static compilation to avoid linking problems/get more speed # Force static compilation to avoid linking problems/get more speed
AC_ARG_WITH(mysqld-ldflags, AC_ARG_WITH(mysqld-ldflags,
...@@ -2702,7 +2702,7 @@ then ...@@ -2702,7 +2702,7 @@ then
AC_DEFINE([THREAD], [1], AC_DEFINE([THREAD], [1],
[Define if you want to have threaded code. This may be undef on client code]) [Define if you want to have threaded code. This may be undef on client code])
# Avoid _PROGRAMS names # Avoid _PROGRAMS names
THREAD_LOBJECTS="thr_alarm.o thr_lock.o thr_mutex.o thr_rwlock.o my_pthread.o my_thr_init.o mf_keycache.o" THREAD_LOBJECTS="thr_alarm.o thr_lock.o thr_mutex.o thr_rwlock.o my_pthread.o my_thr_init.o mf_keycache.o waiting_threads.o"
AC_SUBST(THREAD_LOBJECTS) AC_SUBST(THREAD_LOBJECTS)
server_scripts="mysqld_safe mysql_install_db" server_scripts="mysqld_safe mysql_install_db"
sql_server_dirs="strings mysys dbug extra regex" sql_server_dirs="strings mysys dbug extra regex"
......
...@@ -110,7 +110,7 @@ typedef struct { ...@@ -110,7 +110,7 @@ typedef struct {
typedef struct { typedef struct {
void * volatile pin[LF_PINBOX_PINS]; void * volatile pin[LF_PINBOX_PINS];
LF_PINBOX *pinbox; LF_PINBOX *pinbox;
void *stack_ends_here; void **stack_ends_here;
void *purgatory; void *purgatory;
uint32 purgatory_count; uint32 purgatory_count;
uint32 volatile link; uint32 volatile link;
...@@ -166,8 +166,8 @@ void lf_pinbox_init(LF_PINBOX *pinbox, uint free_ptr_offset, ...@@ -166,8 +166,8 @@ void lf_pinbox_init(LF_PINBOX *pinbox, uint free_ptr_offset,
void lf_pinbox_destroy(LF_PINBOX *pinbox); void lf_pinbox_destroy(LF_PINBOX *pinbox);
lock_wrap(lf_pinbox_get_pins, LF_PINS *, lock_wrap(lf_pinbox_get_pins, LF_PINS *,
(LF_PINBOX *pinbox, void *stack_end), (LF_PINBOX *pinbox),
(pinbox, stack_end), (pinbox),
&pinbox->pinarray.lock) &pinbox->pinarray.lock)
lock_wrap_void(lf_pinbox_put_pins, lock_wrap_void(lf_pinbox_put_pins,
(LF_PINS *pins), (LF_PINS *pins),
...@@ -182,15 +182,13 @@ lock_wrap_void(lf_pinbox_free, ...@@ -182,15 +182,13 @@ lock_wrap_void(lf_pinbox_free,
memory allocator, lf_alloc-pin.c memory allocator, lf_alloc-pin.c
*/ */
struct st_lf_alloc_node {
struct st_lf_alloc_node *next;
};
typedef struct st_lf_allocator { typedef struct st_lf_allocator {
LF_PINBOX pinbox; LF_PINBOX pinbox;
struct st_lf_alloc_node * volatile top; uchar * volatile top;
uint element_size; uint element_size;
uint32 volatile mallocs; uint32 volatile mallocs;
void (*constructor)(uchar *);
void (*destructor)(uchar *);
} LF_ALLOCATOR; } LF_ALLOCATOR;
void lf_alloc_init(LF_ALLOCATOR *allocator, uint size, uint free_ptr_offset); void lf_alloc_init(LF_ALLOCATOR *allocator, uint size, uint free_ptr_offset);
...@@ -202,8 +200,8 @@ uint lf_alloc_pool_count(LF_ALLOCATOR *allocator); ...@@ -202,8 +200,8 @@ uint lf_alloc_pool_count(LF_ALLOCATOR *allocator);
*/ */
#define _lf_alloc_free(PINS, PTR) _lf_pinbox_free((PINS), (PTR)) #define _lf_alloc_free(PINS, PTR) _lf_pinbox_free((PINS), (PTR))
#define lf_alloc_free(PINS, PTR) lf_pinbox_free((PINS), (PTR)) #define lf_alloc_free(PINS, PTR) lf_pinbox_free((PINS), (PTR))
#define _lf_alloc_get_pins(A, ST) _lf_pinbox_get_pins(&(A)->pinbox, (ST)) #define _lf_alloc_get_pins(A) _lf_pinbox_get_pins(&(A)->pinbox)
#define lf_alloc_get_pins(A, ST) lf_pinbox_get_pins(&(A)->pinbox, (ST)) #define lf_alloc_get_pins(A) lf_pinbox_get_pins(&(A)->pinbox)
#define _lf_alloc_put_pins(PINS) _lf_pinbox_put_pins(PINS) #define _lf_alloc_put_pins(PINS) _lf_pinbox_put_pins(PINS)
#define lf_alloc_put_pins(PINS) lf_pinbox_put_pins(PINS) #define lf_alloc_put_pins(PINS) lf_pinbox_put_pins(PINS)
#define lf_alloc_direct_free(ALLOC, ADDR) my_free((uchar*)(ADDR), MYF(0)) #define lf_alloc_direct_free(ALLOC, ADDR) my_free((uchar*)(ADDR), MYF(0))
...@@ -220,13 +218,17 @@ lock_wrap(lf_alloc_new, void *, ...@@ -220,13 +218,17 @@ lock_wrap(lf_alloc_new, void *,
#define LF_HASH_UNIQUE 1 #define LF_HASH_UNIQUE 1
/* lf_hash overhead per element (that is, sizeof(LF_SLIST) */
#define LF_HASH_OVERHEAD (sizeof(int*)*4)
typedef struct { typedef struct {
LF_DYNARRAY array; /* hash itself */ LF_DYNARRAY array; /* hash itself */
LF_ALLOCATOR alloc; /* allocator for elements */ LF_ALLOCATOR alloc; /* allocator for elements */
hash_get_key get_key; /* see HASH */ hash_get_key get_key; /* see HASH */
CHARSET_INFO *charset; /* see HASH */ CHARSET_INFO *charset; /* see HASH */
uint key_offset, key_length; /* see HASH */ uint key_offset, key_length; /* see HASH */
uint element_size, flags; /* LF_HASH_UNIQUE, etc */ uint element_size; /* size of memcpy'ed area on insert */
uint flags; /* LF_HASH_UNIQUE, etc */
int32 volatile size; /* size of array */ int32 volatile size; /* size of array */
int32 volatile count; /* number of elements in the hash */ int32 volatile count; /* number of elements in the hash */
} LF_HASH; } LF_HASH;
...@@ -242,8 +244,8 @@ int lf_hash_delete(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen); ...@@ -242,8 +244,8 @@ int lf_hash_delete(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen);
shortcut macros to access underlying pinbox functions from an LF_HASH shortcut macros to access underlying pinbox functions from an LF_HASH
see _lf_pinbox_get_pins() and _lf_pinbox_put_pins() see _lf_pinbox_get_pins() and _lf_pinbox_put_pins()
*/ */
#define _lf_hash_get_pins(HASH, ST) _lf_alloc_get_pins(&(HASH)->alloc, (ST)) #define _lf_hash_get_pins(HASH) _lf_alloc_get_pins(&(HASH)->alloc)
#define lf_hash_get_pins(HASH, ST) lf_alloc_get_pins(&(HASH)->alloc, (ST)) #define lf_hash_get_pins(HASH) lf_alloc_get_pins(&(HASH)->alloc)
#define _lf_hash_put_pins(PINS) _lf_pinbox_put_pins(PINS) #define _lf_hash_put_pins(PINS) _lf_pinbox_put_pins(PINS)
#define lf_hash_put_pins(PINS) lf_pinbox_put_pins(PINS) #define lf_hash_put_pins(PINS) lf_pinbox_put_pins(PINS)
#define lf_hash_search_unpin(PINS) lf_unpin((PINS), 2) #define lf_hash_search_unpin(PINS) lf_unpin((PINS), 2)
......
...@@ -79,25 +79,27 @@ typedef void * (__cdecl *pthread_handler)(void *); ...@@ -79,25 +79,27 @@ typedef void * (__cdecl *pthread_handler)(void *);
so it can be used directly as a 64 bit value. The value so it can be used directly as a 64 bit value. The value
stored is in 100ns units. stored is in 100ns units.
*/ */
union ft64 { union ft64 {
FILETIME ft; FILETIME ft;
__int64 i64; __int64 i64;
}; };
struct timespec { struct timespec {
union ft64 tv; union ft64 tv;
/* The max timeout value in millisecond for pthread_cond_timedwait */ /* The max timeout value in millisecond for pthread_cond_timedwait */
long max_timeout_msec; long max_timeout_msec;
}; };
#define set_timespec(ABSTIME,SEC) { \
GetSystemTimeAsFileTime(&((ABSTIME).tv.ft)); \ #define set_timespec_time_nsec(ABSTIME,TIME,NSEC) do { \
(ABSTIME).tv.i64+= (__int64)(SEC)*10000000; \ (ABSTIME).tv.i64= (TIME)+(__int64)(NSEC)/100; \
(ABSTIME).max_timeout_msec= (long)((SEC)*1000); \
}
#define set_timespec_nsec(ABSTIME,NSEC) { \
GetSystemTimeAsFileTime(&((ABSTIME).tv.ft)); \
(ABSTIME).tv.i64+= (__int64)(NSEC)/100; \
(ABSTIME).max_timeout_msec= (long)((NSEC)/1000000); \ (ABSTIME).max_timeout_msec= (long)((NSEC)/1000000); \
} } while(0)
#define set_timespec_nsec(ABSTIME,NSEC) do { \
union ft64 tv; \
GetSystemTimeAsFileTime(&tv.ft); \
set_timespec_time_nsec((ABSTIME), tv.i64, (NSEC)) \
} while(0)
void win_pthread_init(void); void win_pthread_init(void);
int win_pthread_setspecific(void *A,void *B,uint length); int win_pthread_setspecific(void *A,void *B,uint length);
...@@ -416,43 +418,32 @@ int my_pthread_mutex_trylock(pthread_mutex_t *mutex); ...@@ -416,43 +418,32 @@ int my_pthread_mutex_trylock(pthread_mutex_t *mutex);
for calculating an absolute time at which for calculating an absolute time at which
pthread_cond_timedwait should timeout pthread_cond_timedwait should timeout
*/ */
#ifdef HAVE_TIMESPEC_TS_SEC
#ifndef set_timespec #define set_timespec(ABSTIME,SEC) set_timespec_nsec((ABSTIME),(SEC)*1000000000ULL)
#define set_timespec(ABSTIME,SEC) \
{ \
(ABSTIME).ts_sec=time(0) + (time_t) (SEC); \
(ABSTIME).ts_nsec=0; \
}
#endif /* !set_timespec */
#ifndef set_timespec_nsec #ifndef set_timespec_nsec
#define set_timespec_nsec(ABSTIME,NSEC) \ #define set_timespec_nsec(ABSTIME,NSEC) \
{ \ set_timespec_time_nsec((ABSTIME),my_getsystime(),(NSEC))
ulonglong now= my_getsystime() + (NSEC/100); \
(ABSTIME).ts_sec= (now / ULL(10000000)); \
(ABSTIME).ts_nsec= (now % ULL(10000000) * 100 + ((NSEC) % 100)); \
}
#endif /* !set_timespec_nsec */ #endif /* !set_timespec_nsec */
/* adapt for two different flavors of struct timespec */
#ifdef HAVE_TIMESPEC_TS_SEC
#define TV_sec ts_sec
#define TV_nsec ts_nsec
#else #else
#ifndef set_timespec #define TV_sec tv_sec
#define set_timespec(ABSTIME,SEC) \ #define TV_nsec tv_nsec
{\
struct timeval tv;\
gettimeofday(&tv,0);\
(ABSTIME).tv_sec=tv.tv_sec+(time_t) (SEC);\
(ABSTIME).tv_nsec=tv.tv_usec*1000;\
}
#endif /* !set_timespec */
#ifndef set_timespec_nsec
#define set_timespec_nsec(ABSTIME,NSEC) \
{\
ulonglong now= my_getsystime() + (NSEC/100); \
(ABSTIME).tv_sec= (time_t) (now / ULL(10000000)); \
(ABSTIME).tv_nsec= (long) (now % ULL(10000000) * 100 + ((NSEC) % 100)); \
}
#endif /* !set_timespec_nsec */
#endif /* HAVE_TIMESPEC_TS_SEC */ #endif /* HAVE_TIMESPEC_TS_SEC */
/* safe_mutex adds checking to mutex for easier debugging */ #ifndef set_timespec_time_nsec
#define set_timespec_time_nsec(ABSTIME,TIME,NSEC) do { \
ulonglong now= (TIME) + (NSEC/100); \
(ABSTIME).TV_sec= (now / ULL(10000000)); \
(ABSTIME).TV_nsec= (now % ULL(10000000) * 100 + ((NSEC) % 100)); \
} while(0)
#endif /* !set_timespec_time_nsec */
/* safe_mutex adds checking to mutex for easier debugging */
#if defined(__NETWARE__) && !defined(SAFE_MUTEX_DETECT_DESTROY) #if defined(__NETWARE__) && !defined(SAFE_MUTEX_DETECT_DESTROY)
#define SAFE_MUTEX_DETECT_DESTROY #define SAFE_MUTEX_DETECT_DESTROY
...@@ -692,6 +683,7 @@ struct st_my_thread_var ...@@ -692,6 +683,7 @@ struct st_my_thread_var
struct st_my_thread_var *next,**prev; struct st_my_thread_var *next,**prev;
void *opt_info; void *opt_info;
uint lock_type; /* used by conditional release the queue */ uint lock_type; /* used by conditional release the queue */
void *stack_ends_here;
#ifndef DBUG_OFF #ifndef DBUG_OFF
void *dbug; void *dbug;
char name[THREAD_NAME_SIZE+1]; char name[THREAD_NAME_SIZE+1];
......
/* Copyright (C) 2008 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <my_global.h>
#include <my_sys.h>
#include <lf.h>
typedef struct st_wt_resource_id WT_RESOURCE_ID;
typedef struct st_wt_resource_type {
int (*compare)(void *a, void *b);
const void *(*make_key)(WT_RESOURCE_ID *id, uint *len);
} WT_RESOURCE_TYPE;
struct st_wt_resource_id {
WT_RESOURCE_TYPE *type;
union {
void *ptr;
ulonglong num;
} value;
};
extern uint wt_timeout_short, wt_deadlock_search_depth_short;
extern uint wt_timeout_long, wt_deadlock_search_depth_long;
#define WT_WAIT_STATS 24
#define WT_CYCLE_STATS 32
extern ulonglong wt_wait_table[WT_WAIT_STATS];
extern uint32 wt_wait_stats[WT_WAIT_STATS+1];
extern uint32 wt_cycle_stats[2][WT_CYCLE_STATS+1];
extern uint32 wt_success_stats;
/*
'lock' protects 'owners', 'state', and 'waiter_count'
'id' is read-only
a resource is picked up from a hash in a lock-free manner
it's returned pinned, so it cannot be freed at once
but it may be freed right after the pin is removed
to free a resource it should be
1. have no owners
2. have no waiters
two ways to access a resource:
1. find it in a hash
- it's returned pinned.
a) take a lock in exclusive mode
b) check the state, it should be ACTIVE
c) unpin
2. by a direct reference
- could only used if a resource cannot be freed
e.g. accessing a resource by thd->waiting_for is safe,
a resource cannot be freed as there's a thread waiting for it
*/
typedef struct st_wt_resource {
WT_RESOURCE_ID id;
uint waiter_count;
enum { ACTIVE, FREE } state;
#ifndef DBUG_OFF
pthread_mutex_t *mutex;
#endif
/*
before the 'lock' all elements are mutable, after - immutable
in the sense that lf_hash_insert() won't memcpy() over them.
See wt_init().
*/
pthread_rwlock_t lock;
pthread_cond_t cond;
DYNAMIC_ARRAY owners;
} WT_RESOURCE;
typedef struct st_wt_thd {
/*
XXX
there's no protection (mutex) against concurrent access of
the dynarray below. it is assumed that a caller will have it
automatically (not to protect this array but to protect its
own - caller's - data structures, and we'll get it for free.
If not, we'll need to add a mutex
*/
DYNAMIC_ARRAY my_resources;
/*
'waiting_for' is modified under waiting_for->lock, and only by thd itself
'waiting_for' is read lock-free (using pinning protocol), but a thd object
can read its own 'waiting_for' without any locks or tricks.
*/
WT_RESOURCE *waiting_for;
LF_PINS *pins;
/*
weight relates to the desirability of a transaction being killed if it's
part of a deadlock. In a deadlock situation transactions with lower weights
are killed first.
Examples of using the weight to implement different selection strategies:
1. Latest
Keep all weights equal.
2. Random
Assight weights at random.
(variant: modify a weight randomly before every lock request)
3. Youngest
Set weight to -NOW()
4. Minimum locks
count locks granted in your lock manager, store the value as a weight
5. Minimum work
depends on the definition of "work". For example, store the number
of rows modifies in this transaction (or a length of REDO log for a
transaction) as a weight.
It is only statistically relevant and is not protected by any locks.
*/
ulong volatile weight;
/*
'killed' is indirectly protected by waiting_for->lock -
a killed thread needs to clear its 'waiting_for', and thus needs a lock.
That is a thread needs an exclusive lock to read 'killed' reliably.
But other threads may change 'killed' from 0 to 1, a shared
lock is enough for that.
*/
my_bool volatile killed;
#ifndef DBUG_OFF
const char *name;
#endif
} WT_THD;
#define WT_TIMEOUT ETIMEDOUT
#define WT_OK 0
#define WT_DEADLOCK -1
#define WT_DEPTH_EXCEEDED -2
void wt_init(void);
void wt_end(void);
void wt_thd_init(WT_THD *);
void wt_thd_destroy(WT_THD *);
int wt_thd_will_wait_for(WT_THD *, WT_THD *, WT_RESOURCE_ID *);
int wt_thd_dontwait(WT_THD *);
int wt_thd_cond_timedwait(WT_THD *, pthread_mutex_t *);
void wt_thd_release(WT_THD *, WT_RESOURCE_ID *);
#define wt_thd_release_all(THD) wt_thd_release((THD), 0)
int wt_resource_id_memcmp(void *, void *);
...@@ -58,7 +58,7 @@ libmysys_a_SOURCES = my_init.c my_getwd.c mf_getdate.c my_mmap.c \ ...@@ -58,7 +58,7 @@ libmysys_a_SOURCES = my_init.c my_getwd.c mf_getdate.c my_mmap.c \
my_windac.c my_access.c base64.c my_libwrap.c \ my_windac.c my_access.c base64.c my_libwrap.c \
wqueue.c wqueue.c
EXTRA_DIST = thr_alarm.c thr_lock.c my_pthread.c my_thr_init.c \ EXTRA_DIST = thr_alarm.c thr_lock.c my_pthread.c my_thr_init.c \
thr_mutex.c thr_rwlock.c \ thr_mutex.c thr_rwlock.c waiting_threads.c \
CMakeLists.txt mf_soundex.c \ CMakeLists.txt mf_soundex.c \
my_conio.c my_wincond.c my_winthread.c my_conio.c my_wincond.c my_winthread.c
libmysys_a_LIBADD = @THREAD_LOBJECTS@ libmysys_a_LIBADD = @THREAD_LOBJECTS@
......
...@@ -96,11 +96,10 @@ ...@@ -96,11 +96,10 @@
versioning a pointer - because we use an array, a pointer to pins is 16 bit, versioning a pointer - because we use an array, a pointer to pins is 16 bit,
upper 16 bits are used for a version. upper 16 bits are used for a version.
It is assumed that pins belong to a thread and are not transferable It is assumed that pins belong to a THD and are not transferable
between threads (LF_PINS::stack_ends_here being a primary reason between THD's (LF_PINS::stack_ends_here being a primary reason
for this limitation). for this limitation).
*/ */
#include <my_global.h> #include <my_global.h>
#include <my_sys.h> #include <my_sys.h>
#include <lf.h> #include <lf.h>
...@@ -137,10 +136,6 @@ void lf_pinbox_destroy(LF_PINBOX *pinbox) ...@@ -137,10 +136,6 @@ void lf_pinbox_destroy(LF_PINBOX *pinbox)
SYNOPSYS SYNOPSYS
pinbox - pinbox -
stack_end - a pointer to the end (top/bottom, depending on the
STACK_DIRECTION) of stack. Used for safe alloca. There's
no safety margin deducted, a caller should take care of it,
if necessary.
DESCRIPTION DESCRIPTION
get a new LF_PINS structure from a stack of unused pins, get a new LF_PINS structure from a stack of unused pins,
...@@ -150,7 +145,7 @@ void lf_pinbox_destroy(LF_PINBOX *pinbox) ...@@ -150,7 +145,7 @@ void lf_pinbox_destroy(LF_PINBOX *pinbox)
It is assumed that pins belong to a thread and are not transferable It is assumed that pins belong to a thread and are not transferable
between threads. between threads.
*/ */
LF_PINS *_lf_pinbox_get_pins(LF_PINBOX *pinbox, void *stack_end) LF_PINS *_lf_pinbox_get_pins(LF_PINBOX *pinbox)
{ {
uint32 pins, next, top_ver; uint32 pins, next, top_ver;
LF_PINS *el; LF_PINS *el;
...@@ -194,7 +189,7 @@ LF_PINS *_lf_pinbox_get_pins(LF_PINBOX *pinbox, void *stack_end) ...@@ -194,7 +189,7 @@ LF_PINS *_lf_pinbox_get_pins(LF_PINBOX *pinbox, void *stack_end)
el->link= pins; el->link= pins;
el->purgatory_count= 0; el->purgatory_count= 0;
el->pinbox= pinbox; el->pinbox= pinbox;
el->stack_ends_here= stack_end; el->stack_ends_here= & my_thread_var->stack_ends_here;
return el; return el;
} }
...@@ -325,6 +320,9 @@ static int match_pins(LF_PINS *el, void *addr) ...@@ -325,6 +320,9 @@ static int match_pins(LF_PINS *el, void *addr)
#define available_stack_size(CUR,END) (long) ((char*)(END) - (char*)(CUR)) #define available_stack_size(CUR,END) (long) ((char*)(END) - (char*)(CUR))
#endif #endif
#define next_node(P, X) (*((uchar **)(((uchar *)(X)) + (P)->free_ptr_offset)))
#define anext_node(X) next_node(&allocator->pinbox, (X))
/* /*
Scan the purgatory and free everything that can be freed Scan the purgatory and free everything that can be freed
*/ */
...@@ -332,7 +330,7 @@ static void _lf_pinbox_real_free(LF_PINS *pins) ...@@ -332,7 +330,7 @@ static void _lf_pinbox_real_free(LF_PINS *pins)
{ {
int npins, alloca_size; int npins, alloca_size;
void *list, **addr; void *list, **addr;
struct st_lf_alloc_node *first, *last= NULL; uchar *first, *last= NULL;
LF_PINBOX *pinbox= pins->pinbox; LF_PINBOX *pinbox= pins->pinbox;
LINT_INIT(first); LINT_INIT(first);
...@@ -341,7 +339,7 @@ static void _lf_pinbox_real_free(LF_PINS *pins) ...@@ -341,7 +339,7 @@ static void _lf_pinbox_real_free(LF_PINS *pins)
#ifdef HAVE_ALLOCA #ifdef HAVE_ALLOCA
alloca_size= sizeof(void *)*LF_PINBOX_PINS*npins; alloca_size= sizeof(void *)*LF_PINBOX_PINS*npins;
/* create a sorted list of pinned addresses, to speed up searches */ /* create a sorted list of pinned addresses, to speed up searches */
if (available_stack_size(&pinbox, pins->stack_ends_here) > alloca_size) if (available_stack_size(&pinbox, *pins->stack_ends_here) > alloca_size)
{ {
struct st_harvester hv; struct st_harvester hv;
addr= (void **) alloca(alloca_size); addr= (void **) alloca(alloca_size);
...@@ -391,9 +389,9 @@ static void _lf_pinbox_real_free(LF_PINS *pins) ...@@ -391,9 +389,9 @@ static void _lf_pinbox_real_free(LF_PINS *pins)
} }
/* not pinned - freeing */ /* not pinned - freeing */
if (last) if (last)
last= last->next= (struct st_lf_alloc_node *)cur; last= next_node(pinbox, last)= (uchar *)cur;
else else
first= last= (struct st_lf_alloc_node *)cur; first= last= (uchar *)cur;
continue; continue;
found: found:
/* pinned - keeping */ /* pinned - keeping */
...@@ -412,22 +410,22 @@ LF_REQUIRE_PINS(1) ...@@ -412,22 +410,22 @@ LF_REQUIRE_PINS(1)
add it back to the allocator stack add it back to the allocator stack
DESCRIPTION DESCRIPTION
'first' and 'last' are the ends of the linked list of st_lf_alloc_node's: 'first' and 'last' are the ends of the linked list of nodes:
first->el->el->....->el->last. Use first==last to free only one element. first->el->el->....->el->last. Use first==last to free only one element.
*/ */
static void alloc_free(struct st_lf_alloc_node *first, static void alloc_free(uchar *first,
struct st_lf_alloc_node volatile *last, uchar volatile *last,
LF_ALLOCATOR *allocator) LF_ALLOCATOR *allocator)
{ {
/* /*
we need a union here to access type-punned pointer reliably. we need a union here to access type-punned pointer reliably.
otherwise gcc -fstrict-aliasing will not see 'tmp' changed in the loop otherwise gcc -fstrict-aliasing will not see 'tmp' changed in the loop
*/ */
union { struct st_lf_alloc_node * node; void *ptr; } tmp; union { uchar * node; void *ptr; } tmp;
tmp.node= allocator->top; tmp.node= allocator->top;
do do
{ {
last->next= tmp.node; anext_node(last)= tmp.node;
} while (!my_atomic_casptr((void **)(char *)&allocator->top, } while (!my_atomic_casptr((void **)(char *)&allocator->top,
(void **)&tmp.ptr, first) && LF_BACKOFF); (void **)&tmp.ptr, first) && LF_BACKOFF);
} }
...@@ -452,6 +450,8 @@ void lf_alloc_init(LF_ALLOCATOR *allocator, uint size, uint free_ptr_offset) ...@@ -452,6 +450,8 @@ void lf_alloc_init(LF_ALLOCATOR *allocator, uint size, uint free_ptr_offset)
allocator->top= 0; allocator->top= 0;
allocator->mallocs= 0; allocator->mallocs= 0;
allocator->element_size= size; allocator->element_size= size;
allocator->constructor= 0;
allocator->destructor= 0;
DBUG_ASSERT(size >= sizeof(void*) + free_ptr_offset); DBUG_ASSERT(size >= sizeof(void*) + free_ptr_offset);
} }
...@@ -468,10 +468,12 @@ void lf_alloc_init(LF_ALLOCATOR *allocator, uint size, uint free_ptr_offset) ...@@ -468,10 +468,12 @@ void lf_alloc_init(LF_ALLOCATOR *allocator, uint size, uint free_ptr_offset)
*/ */
void lf_alloc_destroy(LF_ALLOCATOR *allocator) void lf_alloc_destroy(LF_ALLOCATOR *allocator)
{ {
struct st_lf_alloc_node *node= allocator->top; uchar *node= allocator->top;
while (node) while (node)
{ {
struct st_lf_alloc_node *tmp= node->next; uchar *tmp= anext_node(node);
if (allocator->destructor)
allocator->destructor(node);
my_free((void *)node, MYF(0)); my_free((void *)node, MYF(0));
node= tmp; node= tmp;
} }
...@@ -489,7 +491,7 @@ void lf_alloc_destroy(LF_ALLOCATOR *allocator) ...@@ -489,7 +491,7 @@ void lf_alloc_destroy(LF_ALLOCATOR *allocator)
void *_lf_alloc_new(LF_PINS *pins) void *_lf_alloc_new(LF_PINS *pins)
{ {
LF_ALLOCATOR *allocator= (LF_ALLOCATOR *)(pins->pinbox->free_func_arg); LF_ALLOCATOR *allocator= (LF_ALLOCATOR *)(pins->pinbox->free_func_arg);
struct st_lf_alloc_node *node; uchar *node;
for (;;) for (;;)
{ {
do do
...@@ -500,6 +502,8 @@ void *_lf_alloc_new(LF_PINS *pins) ...@@ -500,6 +502,8 @@ void *_lf_alloc_new(LF_PINS *pins)
if (!node) if (!node)
{ {
node= (void *)my_malloc(allocator->element_size, MYF(MY_WME)); node= (void *)my_malloc(allocator->element_size, MYF(MY_WME));
if (allocator->constructor)
allocator->constructor(node);
#ifdef MY_LF_EXTRA_DEBUG #ifdef MY_LF_EXTRA_DEBUG
if (likely(node != 0)) if (likely(node != 0))
my_atomic_add32(&allocator->mallocs, 1); my_atomic_add32(&allocator->mallocs, 1);
...@@ -507,7 +511,7 @@ void *_lf_alloc_new(LF_PINS *pins) ...@@ -507,7 +511,7 @@ void *_lf_alloc_new(LF_PINS *pins)
break; break;
} }
if (my_atomic_casptr((void **)(char *)&allocator->top, if (my_atomic_casptr((void **)(char *)&allocator->top,
(void *)&node, node->next)) (void *)&node, anext_node(node)))
break; break;
} }
_lf_unpin(pins, 0); _lf_unpin(pins, 0);
...@@ -523,8 +527,8 @@ void *_lf_alloc_new(LF_PINS *pins) ...@@ -523,8 +527,8 @@ void *_lf_alloc_new(LF_PINS *pins)
uint lf_alloc_pool_count(LF_ALLOCATOR *allocator) uint lf_alloc_pool_count(LF_ALLOCATOR *allocator)
{ {
uint i; uint i;
struct st_lf_alloc_node *node; uchar *node;
for (node= allocator->top, i= 0; node; node= node->next, i++) for (node= allocator->top, i= 0; node; node= anext_node(node), i++)
/* no op */; /* no op */;
return i; return i;
} }
......
...@@ -299,11 +299,22 @@ static int initialize_bucket(LF_HASH *, LF_SLIST * volatile*, uint, LF_PINS *); ...@@ -299,11 +299,22 @@ static int initialize_bucket(LF_HASH *, LF_SLIST * volatile*, uint, LF_PINS *);
/* /*
Initializes lf_hash, the arguments are compatible with hash_init Initializes lf_hash, the arguments are compatible with hash_init
@@note element_size sets both the size of allocated memory block for
lf_alloc and a size of memcpy'ed block size in lf_hash_insert. Typically
they are the same, indeed. But LF_HASH::element_size can be decreased
after lf_hash_init, and then lf_alloc will allocate larger block that
lf_hash_insert will copy over. It is desireable if part of the element
is expensive to initialize - for example if there is a mutex or
DYNAMIC_ARRAY. In this case they should be initialize in the
LF_ALLOCATOR::constructor, and lf_hash_insert should not overwrite them.
See wt_init() for example.
*/ */
void lf_hash_init(LF_HASH *hash, uint element_size, uint flags, void lf_hash_init(LF_HASH *hash, uint element_size, uint flags,
uint key_offset, uint key_length, hash_get_key get_key, uint key_offset, uint key_length, hash_get_key get_key,
CHARSET_INFO *charset) CHARSET_INFO *charset)
{ {
compile_time_assert(sizeof(LF_SLIST) == LF_HASH_OVERHEAD);
lf_alloc_init(&hash->alloc, sizeof(LF_SLIST)+element_size, lf_alloc_init(&hash->alloc, sizeof(LF_SLIST)+element_size,
offsetof(LF_SLIST, key)); offsetof(LF_SLIST, key));
lf_dynarray_init(&hash->array, sizeof(LF_SLIST *)); lf_dynarray_init(&hash->array, sizeof(LF_SLIST *));
...@@ -453,7 +464,7 @@ void *lf_hash_search(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen) ...@@ -453,7 +464,7 @@ void *lf_hash_search(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen)
return found ? found+1 : 0; return found ? found+1 : 0;
} }
static const uchar *dummy_key= ""; static const uchar *dummy_key= (uchar*)"";
/* /*
RETURN RETURN
...@@ -473,7 +484,7 @@ static int initialize_bucket(LF_HASH *hash, LF_SLIST * volatile *node, ...@@ -473,7 +484,7 @@ static int initialize_bucket(LF_HASH *hash, LF_SLIST * volatile *node,
unlikely(initialize_bucket(hash, el, parent, pins))) unlikely(initialize_bucket(hash, el, parent, pins)))
return -1; return -1;
dummy->hashnr= my_reverse_bits(bucket) | 0; /* dummy node */ dummy->hashnr= my_reverse_bits(bucket) | 0; /* dummy node */
dummy->key= (char*) dummy_key; dummy->key= dummy_key;
dummy->keylen= 0; dummy->keylen= 0;
if ((cur= linsert(el, hash->charset, dummy, pins, LF_HASH_UNIQUE))) if ((cur= linsert(el, hash->charset, dummy, pins, LF_HASH_UNIQUE)))
{ {
......
...@@ -290,6 +290,8 @@ my_bool my_thread_init(void) ...@@ -290,6 +290,8 @@ my_bool my_thread_init(void)
pthread_mutex_init(&tmp->mutex,MY_MUTEX_INIT_FAST); pthread_mutex_init(&tmp->mutex,MY_MUTEX_INIT_FAST);
pthread_cond_init(&tmp->suspend, NULL); pthread_cond_init(&tmp->suspend, NULL);
tmp->stack_ends_here= &tmp + STACK_DIRECTION * my_thread_stack_size;
pthread_mutex_lock(&THR_LOCK_threads); pthread_mutex_lock(&THR_LOCK_threads);
tmp->id= ++thread_id; tmp->id= ++thread_id;
++THR_thread_count; ++THR_thread_count;
......
/* Copyright (C) 2008 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/*
Note that if your lock system satisfy the following condition:
there exist four lock levels A, B, C, D, such as
A is compatible with B
A is not compatible with C
D is not compatible with B
(example A=IX, B=IS, C=S, D=X)
you need to include lock level in the resource identifier - thread 1
waiting for lock A on resource R and thread 2 waiting for lock B
on resource R should wait on different WT_RESOURCE structures, on different
{lock, resource} pairs. Otherwise the following is possible:
thread1> take S-lock on R
thread2> take IS-lock on R
thread3> wants X-lock on R, starts waiting for threads 1 and 2 on R.
thread3 is killed (or timeout or whatever)
WT_RESOURCE structure for R is still in the hash, as it has two owners
thread4> wants an IX-lock on R
WT_RESOURCE for R is found in the hash, thread4 starts waiting on it.
!! now thread4 is waiting for both thread1 and thread2
!! while, in fact, IX-lock and IS-lock are compatible and
!! thread4 should not wait for thread2.
*/
#include <waiting_threads.h>
#include <m_string.h>
uint wt_timeout_short=100, wt_deadlock_search_depth_short=4;
uint wt_timeout_long=10000, wt_deadlock_search_depth_long=15;
/*
status variables:
distribution of cycle lengths
wait time log distribution
Note:
we call deadlock() twice per wait (with different search lengths).
it means a deadlock will be counted twice. It's difficult to avoid,
as on the second search we could find a *different* deadlock and we
*want* to count it too. So we just count all deadlocks - two searches
mean two increments on the wt_cycle_stats.
*/
ulonglong wt_wait_table[WT_WAIT_STATS];
uint32 wt_wait_stats[WT_WAIT_STATS+1];
uint32 wt_cycle_stats[2][WT_CYCLE_STATS+1], wt_success_stats;
static my_atomic_rwlock_t cycle_stats_lock, wait_stats_lock, success_stats_lock;
#define increment_success_stats() \
do { \
my_atomic_rwlock_wrlock(&success_stats_lock); \
my_atomic_add32(&wt_success_stats, 1); \
my_atomic_rwlock_wrunlock(&success_stats_lock); \
} while (0)
#define increment_cycle_stats(X,MAX) \
do { \
uint i= (X), j= (MAX) == wt_deadlock_search_depth_long; \
if (i >= WT_CYCLE_STATS) \
i= WT_CYCLE_STATS; \
my_atomic_rwlock_wrlock(&cycle_stats_lock); \
my_atomic_add32(&wt_cycle_stats[j][i], 1); \
my_atomic_rwlock_wrunlock(&cycle_stats_lock); \
} while (0)
#define increment_wait_stats(X,RET) \
do { \
uint i; \
if ((RET) == ETIMEDOUT) \
i= WT_WAIT_STATS; \
else \
{ \
ulonglong w=(X)/10; \
for (i=0; i < WT_WAIT_STATS && w > wt_wait_table[i]; i++) ; \
} \
my_atomic_rwlock_wrlock(&wait_stats_lock); \
my_atomic_add32(wt_wait_stats+i, 1); \
my_atomic_rwlock_wrunlock(&wait_stats_lock); \
} while (0)
#define rc_rdlock(X) \
do { \
WT_RESOURCE *R=(X); \
DBUG_PRINT("wt", ("LOCK resid=%lld for READ", R->id.value.num)); \
pthread_rwlock_rdlock(&R->lock); \
} while (0)
#define rc_wrlock(X) \
do { \
WT_RESOURCE *R=(X); \
DBUG_PRINT("wt", ("LOCK resid=%lld for WRITE", R->id.value.num)); \
pthread_rwlock_wrlock(&R->lock); \
} while (0)
#define rc_unlock(X) \
do { \
WT_RESOURCE *R=(X); \
DBUG_PRINT("wt", ("UNLOCK resid=%lld", R->id.value.num)); \
pthread_rwlock_unlock(&R->lock); \
} while (0)
static LF_HASH reshash;
static void wt_resource_init(uchar *arg)
{
WT_RESOURCE *rc=(WT_RESOURCE*)(arg+LF_HASH_OVERHEAD);
DBUG_ENTER("wt_resource_init");
bzero(rc, sizeof(*rc));
pthread_rwlock_init(&rc->lock, 0);
pthread_cond_init(&rc->cond, 0);
my_init_dynamic_array(&rc->owners, sizeof(WT_THD *), 5, 5);
DBUG_VOID_RETURN;
}
static void wt_resource_destroy(uchar *arg)
{
WT_RESOURCE *rc=(WT_RESOURCE*)(arg+LF_HASH_OVERHEAD);
DBUG_ENTER("wt_resource_destroy");
DBUG_ASSERT(rc->owners.elements == 0);
pthread_rwlock_destroy(&rc->lock);
pthread_cond_destroy(&rc->cond);
delete_dynamic(&rc->owners);
DBUG_VOID_RETURN;
}
void wt_init()
{
DBUG_ENTER("wt_init");
lf_hash_init(&reshash, sizeof(WT_RESOURCE), LF_HASH_UNIQUE, 0,
sizeof(struct st_wt_resource_id), 0, 0);
reshash.alloc.constructor= wt_resource_init;
reshash.alloc.destructor= wt_resource_destroy;
/*
Note a trick: we initialize the hash with the real element size,
but fix it later to a shortened element size. This way
the allocator will allocate elements correctly, but
lf_hash_insert() will only overwrite part of the element with memcpy().
lock, condition, and dynamic array will be intact.
*/
reshash.element_size= offsetof(WT_RESOURCE, lock);
bzero(wt_wait_stats, sizeof(wt_wait_stats));
bzero(wt_cycle_stats, sizeof(wt_cycle_stats));
wt_success_stats=0;
{
int i;
double from=log(1); /* 1 us */
double to=log(60e6); /* 1 min */
for (i=0; i < WT_WAIT_STATS; i++)
{
wt_wait_table[i]=(ulonglong)exp((to-from)/(WT_WAIT_STATS-1)*i+from);
DBUG_ASSERT(i==0 || wt_wait_table[i-1] != wt_wait_table[i]);
}
}
my_atomic_rwlock_init(&cycle_stats_lock);
my_atomic_rwlock_init(&success_stats_lock);
my_atomic_rwlock_init(&wait_stats_lock);
DBUG_VOID_RETURN;
}
void wt_end()
{
DBUG_ENTER("wt_end");
DBUG_ASSERT(reshash.count == 0);
lf_hash_destroy(&reshash);
my_atomic_rwlock_destroy(&cycle_stats_lock);
my_atomic_rwlock_destroy(&success_stats_lock);
my_atomic_rwlock_destroy(&wait_stats_lock);
DBUG_VOID_RETURN;
}
void wt_thd_init(WT_THD *thd)
{
DBUG_ENTER("wt_thd_init");
my_init_dynamic_array(&thd->my_resources, sizeof(WT_RESOURCE *), 10, 5);
thd->pins=lf_hash_get_pins(&reshash);
thd->waiting_for=0;
thd->weight=0;
#ifndef DBUG_OFF
thd->name=my_thread_name();
#endif
DBUG_VOID_RETURN;
}
void wt_thd_destroy(WT_THD *thd)
{
DBUG_ENTER("wt_thd_destroy");
DBUG_ASSERT(thd->my_resources.elements == 0);
delete_dynamic(&thd->my_resources);
lf_hash_put_pins(thd->pins);
thd->waiting_for=0;
DBUG_VOID_RETURN;
}
int wt_resource_id_memcmp(void *a, void *b)
{
return memcmp(a, b, sizeof(WT_RESOURCE_ID));
}
struct deadlock_arg {
WT_THD *thd;
uint max_depth;
WT_THD *victim;
WT_RESOURCE *rc;
};
/*
loop detection in a wait-for graph with a limited search depth.
*/
static int deadlock_search(struct deadlock_arg *arg, WT_THD *blocker,
uint depth)
{
WT_RESOURCE *rc, *volatile *shared_ptr= &blocker->waiting_for;
WT_THD *cursor;
uint i;
int ret= WT_OK;
DBUG_ENTER("deadlock_search");
DBUG_PRINT("wt", ("enter: thd=%s, blocker=%s, depth=%u",
arg->thd->name, blocker->name, depth));
LF_REQUIRE_PINS(1);
arg->rc= 0;
if (depth > arg->max_depth)
{
DBUG_PRINT("wt", ("exit: WT_DEPTH_EXCEEDED (early)"));
DBUG_RETURN(WT_DEPTH_EXCEEDED);
}
retry:
/* safe dereference as explained in lf_alloc-pin.c */
do
{
rc= *shared_ptr;
lf_pin(arg->thd->pins, 0, rc);
} while (rc != *shared_ptr && LF_BACKOFF);
if (rc == 0)
{
DBUG_PRINT("wt", ("exit: OK (early)"));
DBUG_RETURN(0);
}
rc_rdlock(rc);
if (rc->state != ACTIVE || *shared_ptr != rc)
{
rc_unlock(rc);
lf_unpin(arg->thd->pins, 0);
goto retry;
}
lf_unpin(arg->thd->pins, 0);
for (i=0; i < rc->owners.elements; i++)
{
cursor= *dynamic_element(&rc->owners, i, WT_THD**);
if (cursor == arg->thd)
{
ret= WT_DEADLOCK;
increment_cycle_stats(depth, arg->max_depth);
arg->victim= cursor;
goto end;
}
}
for (i=0; i < rc->owners.elements; i++)
{
cursor= *dynamic_element(&rc->owners, i, WT_THD**);
switch (deadlock_search(arg, cursor, depth+1)) {
case WT_DEPTH_EXCEEDED:
ret= WT_DEPTH_EXCEEDED;
break;
case WT_DEADLOCK:
ret= WT_DEADLOCK;
if (cursor->weight < arg->victim->weight)
{
if (arg->victim != arg->thd)
{
rc_unlock(arg->victim->waiting_for); /* release the previous victim */
DBUG_ASSERT(arg->rc == cursor->waiting_for);
}
arg->victim= cursor;
}
else if (arg->rc)
rc_unlock(arg->rc);
goto end;
case WT_OK:
break;
default:
DBUG_ASSERT(0);
}
if (arg->rc)
rc_unlock(arg->rc);
}
end:
arg->rc= rc;
DBUG_PRINT("wt", ("exit: %s",
ret == WT_DEPTH_EXCEEDED ? "WT_DEPTH_EXCEEDED" :
ret ? "WT_DEADLOCK" : "OK"));
DBUG_RETURN(ret);
}
static int deadlock(WT_THD *thd, WT_THD *blocker, uint depth,
uint max_depth)
{
struct deadlock_arg arg= {thd, max_depth, 0, 0};
int ret;
DBUG_ENTER("deadlock");
ret= deadlock_search(&arg, blocker, depth);
if (arg.rc)
rc_unlock(arg.rc);
if (ret == WT_DEPTH_EXCEEDED)
{
increment_cycle_stats(WT_CYCLE_STATS, max_depth);
ret= WT_OK;
}
if (ret == WT_DEADLOCK && arg.victim != thd)
{
DBUG_PRINT("wt", ("killing %s", arg.victim->name));
arg.victim->killed=1;
pthread_cond_broadcast(&arg.victim->waiting_for->cond);
rc_unlock(arg.victim->waiting_for);
ret= WT_OK;
}
DBUG_RETURN(ret);
}
/*
Deletes an element from reshash.
rc->lock must be locked by the caller and it's unlocked on return.
*/
static void unlock_lock_and_free_resource(WT_THD *thd, WT_RESOURCE *rc)
{
uint keylen;
const void *key;
DBUG_ENTER("unlock_lock_and_free_resource");
DBUG_ASSERT(rc->state == ACTIVE);
if (rc->owners.elements || rc->waiter_count)
{
DBUG_PRINT("wt", ("nothing to do, %d owners, %d waiters",
rc->owners.elements, rc->waiter_count));
rc_unlock(rc);
DBUG_VOID_RETURN;
}
/* XXX if (rc->id.type->make_key) key= rc->id.type->make_key(&rc->id, &keylen); else */
{
key= &rc->id;
keylen= sizeof(rc->id);
}
/*
To free the element correctly we need to:
1. take its lock (already done).
2. set the state to FREE
3. release the lock
4. remove from the hash
I *think* it's safe to release the lock while the element is still
in the hash. If not, the corrected procedure should be
3. pin; 4; remove; 5; release; 6; unpin and it'll need pin[3].
*/
rc->state=FREE;
rc_unlock(rc);
lf_hash_delete(&reshash, thd->pins, key, keylen);
DBUG_VOID_RETURN;
}
int wt_thd_dontwait_locked(WT_THD *thd)
{
WT_RESOURCE *rc= thd->waiting_for;
DBUG_ENTER("wt_thd_dontwait_locked");
DBUG_ASSERT(rc->waiter_count);
DBUG_ASSERT(rc->state == ACTIVE);
rc->waiter_count--;
thd->waiting_for= 0;
unlock_lock_and_free_resource(thd, rc);
DBUG_RETURN(thd->killed ? WT_DEADLOCK : WT_OK);
}
int wt_thd_dontwait(WT_THD *thd)
{
int ret;
WT_RESOURCE *rc= thd->waiting_for;
DBUG_ENTER("wt_thd_dontwait");
if (!rc)
DBUG_RETURN(WT_OK);
/*
nobody's trying to free the resource now,
as its waiter_count is guaranteed to be non-zero
*/
rc_wrlock(rc);
ret= wt_thd_dontwait_locked(thd);
DBUG_RETURN(ret);
}
/*
called by a *waiter* to declare what resource it will wait for.
can be called many times, if many blockers own a blocking resource.
but must always be called with the same resource id - a thread cannot
wait for more than one resource at a time.
*/
int wt_thd_will_wait_for(WT_THD *thd, WT_THD *blocker, WT_RESOURCE_ID *resid)
{
uint i;
WT_RESOURCE *rc;
DBUG_ENTER("wt_thd_will_wait_for");
LF_REQUIRE_PINS(3);
DBUG_PRINT("wt", ("enter: thd=%s, blocker=%s, resid=%llu",
thd->name, blocker->name, resid->value.num));
if (thd->waiting_for == 0)
{
uint keylen;
const void *key;
/* XXX if (restype->make_key) key= restype->make_key(resid, &keylen); else */
{
key= resid;
keylen= sizeof(*resid);
}
DBUG_PRINT("wt", ("first blocker"));
retry:
while ((rc= lf_hash_search(&reshash, thd->pins, key, keylen)) == 0)
{
WT_RESOURCE tmp;
DBUG_PRINT("wt", ("failed to find rc in hash, inserting"));
bzero(&tmp, sizeof(tmp));
tmp.waiter_count= 0;
tmp.id= *resid;
tmp.state= ACTIVE;
#ifndef DBUG_OFF
tmp.mutex= 0;
#endif
lf_hash_insert(&reshash, thd->pins, &tmp);
/*
Two cases: either lf_hash_insert() failed - because another thread
has just inserted a resource with the same id - and we need to retry.
Or lf_hash_insert() succeeded, and then we need to repeat
lf_hash_search() to find a real address of the newly inserted element.
That is, we don't care what lf_hash_insert() has returned.
And we need to repeat the loop anyway.
*/
}
DBUG_PRINT("wt", ("found in hash rc=%p", rc));
rc_wrlock(rc);
if (rc->state != ACTIVE)
{
DBUG_PRINT("wt", ("but it's not active, retrying"));
/* Somebody has freed the element while we weren't looking */
rc_unlock(rc);
lf_hash_search_unpin(thd->pins);
goto retry;
}
lf_hash_search_unpin(thd->pins); /* the element cannot go away anymore */
thd->waiting_for= rc;
rc->waiter_count++;
thd->killed= 0;
}
else
{
DBUG_ASSERT(thd->waiting_for->id.type == resid->type);
DBUG_ASSERT(resid->type->compare(&thd->waiting_for->id, resid) == 0);
DBUG_PRINT("wt", ("adding another blocker"));
/*
we can safely access the resource here, it's in the hash as it has
at least one owner, and non-zero waiter_count
*/
rc= thd->waiting_for;
rc_wrlock(rc);
DBUG_ASSERT(rc->waiter_count);
DBUG_ASSERT(rc->state == ACTIVE);
if (thd->killed)
{
wt_thd_dontwait_locked(thd);
DBUG_RETURN(WT_DEADLOCK);
}
}
for (i=0; i < rc->owners.elements; i++)
if (*dynamic_element(&rc->owners, i, WT_THD**) == blocker)
break;
if (i >= rc->owners.elements)
{
push_dynamic(&blocker->my_resources, (void*)&rc);
push_dynamic(&rc->owners, (void*)&blocker);
}
rc_unlock(rc);
if (deadlock(thd, blocker, 1, wt_deadlock_search_depth_short))
{
wt_thd_dontwait(thd);
DBUG_RETURN(WT_DEADLOCK);
}
DBUG_RETURN(0);
}
/*
called by a *waiter* to start waiting
It's supposed to be a drop-in replacement for
pthread_cond_timedwait(), and it takes mutex as an argument.
*/
int wt_thd_cond_timedwait(WT_THD *thd, pthread_mutex_t *mutex)
{
int ret= WT_OK;
struct timespec timeout;
ulonglong before, after, starttime;
WT_RESOURCE *rc= thd->waiting_for;
DBUG_ENTER("wt_thd_cond_timedwait");
DBUG_PRINT("wt", ("enter: thd=%s, rc=%p", thd->name, rc));
#ifndef DBUG_OFF
if (rc->mutex)
DBUG_ASSERT(rc->mutex == mutex);
else
rc->mutex= mutex;
safe_mutex_assert_owner(mutex);
#endif
before= starttime= my_getsystime();
#ifdef __WIN__
/*
only for the sake of Windows we distinguish between
'before' and 'starttime'
*/
GetSystemTimeAsFileTime((PFILETIME)&starttime);
#endif
set_timespec_time_nsec(timeout, starttime, wt_timeout_short*1000);
if (!thd->killed)
ret= pthread_cond_timedwait(&rc->cond, mutex, &timeout);
if (ret == WT_TIMEOUT)
{
if (deadlock(thd, thd, 0, wt_deadlock_search_depth_long))
ret= WT_DEADLOCK;
else if (wt_timeout_long > wt_timeout_short)
{
set_timespec_time_nsec(timeout, starttime, wt_timeout_long*1000);
if (!thd->killed)
ret= pthread_cond_timedwait(&rc->cond, mutex, &timeout);
}
}
after= my_getsystime();
if (wt_thd_dontwait(thd) == WT_DEADLOCK)
ret= WT_DEADLOCK;
increment_wait_stats(after-before, ret);
if (ret == WT_OK)
increment_success_stats();
DBUG_RETURN(ret);
}
/*
called by a *blocker* when it releases a resource
when resid==0 all resources will be freed
Note: it's conceptually similar to pthread_cond_broadcast, and must be done
under the same mutex as wt_thd_cond_timedwait().
*/
void wt_thd_release(WT_THD *thd, WT_RESOURCE_ID *resid)
{
WT_RESOURCE *rc;
uint i, j;
DBUG_ENTER("wt_thd_release");
for (i=0; i < thd->my_resources.elements; i++)
{
rc= *dynamic_element(&thd->my_resources, i, WT_RESOURCE**);
if (!resid || (resid->type->compare(&rc->id, resid) == 0))
{
rc_wrlock(rc);
/*
nobody's trying to free the resource now,
as its owners[] array is not empty (at least thd must be there)
*/
DBUG_ASSERT(rc->state == ACTIVE);
for (j=0; j < rc->owners.elements; j++)
if (*dynamic_element(&rc->owners, j, WT_THD**) == thd)
break;
DBUG_ASSERT(j < rc->owners.elements);
delete_dynamic_element(&rc->owners, j);
if (rc->owners.elements == 0)
{
pthread_cond_broadcast(&rc->cond);
#ifndef DBUG_OFF
if (rc->mutex)
safe_mutex_assert_owner(rc->mutex);
#endif
}
unlock_lock_and_free_resource(thd, rc);
if (resid)
{
delete_dynamic_element(&thd->my_resources, i);
DBUG_VOID_RETURN;
}
}
}
DBUG_ASSERT(!resid);
reset_dynamic(&thd->my_resources);
DBUG_VOID_RETURN;
}
...@@ -2246,10 +2246,7 @@ int ha_maria::external_lock(THD *thd, int lock_type) ...@@ -2246,10 +2246,7 @@ int ha_maria::external_lock(THD *thd, int lock_type)
/* Start of new statement */ /* Start of new statement */
if (!trn) /* no transaction yet - open it now */ if (!trn) /* no transaction yet - open it now */
{ {
trn= trnman_new_trn(& thd->mysys_var->mutex, trn= trnman_new_trn(& thd->mysys_var->mutex, & thd->mysys_var->suspend);
& thd->mysys_var->suspend,
thd->thread_stack + STACK_DIRECTION *
(my_thread_stack_size - STACK_MIN_SIZE));
if (unlikely(!trn)) if (unlikely(!trn))
DBUG_RETURN(HA_ERR_OUT_OF_MEM); DBUG_RETURN(HA_ERR_OUT_OF_MEM);
THD_TRN= trn; THD_TRN= trn;
...@@ -2389,10 +2386,7 @@ int ha_maria::implicit_commit(THD *thd) ...@@ -2389,10 +2386,7 @@ int ha_maria::implicit_commit(THD *thd)
tables may be under LOCK TABLES, and so they will start the next tables may be under LOCK TABLES, and so they will start the next
statement assuming they have a trn (see ha_maria::start_stmt()). statement assuming they have a trn (see ha_maria::start_stmt()).
*/ */
trn= trnman_new_trn(& thd->mysys_var->mutex, trn= trnman_new_trn(& thd->mysys_var->mutex, & thd->mysys_var->suspend);
& thd->mysys_var->suspend,
thd->thread_stack + STACK_DIRECTION *
(my_thread_stack_size - STACK_MIN_SIZE));
/* This is just a commit, tables stay locked if they were: */ /* This is just a commit, tables stay locked if they were: */
trnman_reset_locked_tables(trn, locked_tables); trnman_reset_locked_tables(trn, locked_tables);
THD_TRN= trn; THD_TRN= trn;
......
...@@ -108,9 +108,7 @@ int maria_begin(MARIA_HA *info) ...@@ -108,9 +108,7 @@ int maria_begin(MARIA_HA *info)
{ {
TRN *trn; TRN *trn;
struct st_my_thread_var *mysys_var= my_thread_var; struct st_my_thread_var *mysys_var= my_thread_var;
trn= trnman_new_trn(&mysys_var->mutex, trn= trnman_new_trn(&mysys_var->mutex, &mysys_var->suspend);
&mysys_var->suspend,
(char*) &mysys_var + STACK_DIRECTION *1024*128);
if (unlikely(!trn)) if (unlikely(!trn))
DBUG_RETURN(HA_ERR_OUT_OF_MEM); DBUG_RETURN(HA_ERR_OUT_OF_MEM);
......
...@@ -260,8 +260,7 @@ static void set_short_trid(TRN *trn) ...@@ -260,8 +260,7 @@ static void set_short_trid(TRN *trn)
mutex and cond will be used for lock waits mutex and cond will be used for lock waits
*/ */
TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond, TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond)
void *stack_end)
{ {
TRN *trn; TRN *trn;
DBUG_ENTER("trnman_new_trn"); DBUG_ENTER("trnman_new_trn");
...@@ -308,7 +307,7 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond, ...@@ -308,7 +307,7 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond,
} }
trnman_allocated_transactions++; trnman_allocated_transactions++;
} }
trn->pins= lf_hash_get_pins(&trid_to_committed_trn, stack_end); trn->pins= lf_hash_get_pins(&trid_to_committed_trn);
if (!trn->pins) if (!trn->pins)
{ {
trnman_free_trn(trn); trnman_free_trn(trn);
...@@ -761,7 +760,7 @@ TRN *trnman_recreate_trn_from_recovery(uint16 shortid, TrID longid) ...@@ -761,7 +760,7 @@ TRN *trnman_recreate_trn_from_recovery(uint16 shortid, TrID longid)
TrID old_trid_generator= global_trid_generator; TrID old_trid_generator= global_trid_generator;
TRN *trn; TRN *trn;
DBUG_ASSERT(maria_in_recovery && !maria_multi_threaded); DBUG_ASSERT(maria_in_recovery && !maria_multi_threaded);
if (unlikely((trn= trnman_new_trn(NULL, NULL, NULL)) == NULL)) if (unlikely((trn= trnman_new_trn(NULL, NULL)) == NULL))
return NULL; return NULL;
/* deallocate excessive allocations of trnman_new_trn() */ /* deallocate excessive allocations of trnman_new_trn() */
global_trid_generator= old_trid_generator; global_trid_generator= old_trid_generator;
......
...@@ -38,7 +38,7 @@ extern my_bool (*trnman_end_trans_hook)(TRN *trn, my_bool commit, ...@@ -38,7 +38,7 @@ extern my_bool (*trnman_end_trans_hook)(TRN *trn, my_bool commit,
int trnman_init(TrID); int trnman_init(TrID);
void trnman_destroy(void); void trnman_destroy(void);
TRN *trnman_new_trn(pthread_mutex_t *, pthread_cond_t *, void *); TRN *trnman_new_trn(pthread_mutex_t *, pthread_cond_t *);
my_bool trnman_end_trn(TRN *trn, my_bool commit); my_bool trnman_end_trn(TRN *trn, my_bool commit);
#define trnman_commit_trn(T) trnman_end_trn(T, TRUE) #define trnman_commit_trn(T) trnman_end_trn(T, TRUE)
#define trnman_abort_trn(T) trnman_end_trn(T, FALSE) #define trnman_abort_trn(T) trnman_end_trn(T, FALSE)
......
...@@ -42,6 +42,8 @@ pthread_handler_t test_trnman(void *arg) ...@@ -42,6 +42,8 @@ pthread_handler_t test_trnman(void *arg)
pthread_cond_t conds[MAX_ITER]; pthread_cond_t conds[MAX_ITER];
int m= (*(int *)arg); int m= (*(int *)arg);
my_thread_init();
for (i= 0; i < MAX_ITER; i++) for (i= 0; i < MAX_ITER; i++)
{ {
pthread_mutex_init(&mutexes[i], MY_MUTEX_INIT_FAST); pthread_mutex_init(&mutexes[i], MY_MUTEX_INIT_FAST);
...@@ -54,7 +56,7 @@ pthread_handler_t test_trnman(void *arg) ...@@ -54,7 +56,7 @@ pthread_handler_t test_trnman(void *arg)
m-= n= x % MAX_ITER; m-= n= x % MAX_ITER;
for (i= 0; i < n; i++) for (i= 0; i < n; i++)
{ {
trn[i]= trnman_new_trn(&mutexes[i], &conds[i], &m + STACK_SIZE); trn[i]= trnman_new_trn(&mutexes[i], &conds[i]);
if (!trn[i]) if (!trn[i])
{ {
diag("trnman_new_trn() failed"); diag("trnman_new_trn() failed");
...@@ -76,6 +78,8 @@ pthread_handler_t test_trnman(void *arg) ...@@ -76,6 +78,8 @@ pthread_handler_t test_trnman(void *arg)
rt_num_threads--; rt_num_threads--;
pthread_mutex_unlock(&rt_mutex); pthread_mutex_unlock(&rt_mutex);
my_thread_end();
return 0; return 0;
} }
#undef MAX_ITER #undef MAX_ITER
...@@ -114,7 +118,7 @@ void run_test(const char *test, pthread_handler handler, int n, int m) ...@@ -114,7 +118,7 @@ void run_test(const char *test, pthread_handler handler, int n, int m)
i= trnman_can_read_from(trn[T1], trid[T2]); \ i= trnman_can_read_from(trn[T1], trid[T2]); \
ok(i == RES, "trn" #T1 " %s read from trn" #T2, i ? "can" : "cannot") ok(i == RES, "trn" #T1 " %s read from trn" #T2, i ? "can" : "cannot")
#define start_transaction(T) \ #define start_transaction(T) \
trn[T]= trnman_new_trn(&mutexes[T], &conds[T], &i + STACK_SIZE); \ trn[T]= trnman_new_trn(&mutexes[T], &conds[T]); \
trid[T]= trn[T]->trid trid[T]= trn[T]->trid
#define commit(T) trnman_commit_trn(trn[T]) #define commit(T) trnman_commit_trn(trn[T])
#define abort(T) trnman_abort_trn(trn[T]) #define abort(T) trnman_abort_trn(trn[T])
...@@ -159,7 +163,6 @@ void test_trnman_read_from() ...@@ -159,7 +163,6 @@ void test_trnman_read_from()
int main(int argc __attribute__((unused)), char **argv) int main(int argc __attribute__((unused)), char **argv)
{ {
MY_INIT(argv[0]); MY_INIT(argv[0]);
my_init();
plan(7); plan(7);
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
INCLUDES = @ZLIB_INCLUDES@ -I$(top_builddir)/include \ INCLUDES = @ZLIB_INCLUDES@ -I$(top_builddir)/include \
-I$(top_srcdir)/include -I$(top_srcdir)/unittest/mytap -I$(top_srcdir)/include -I$(top_srcdir)/unittest/mytap
noinst_PROGRAMS = bitmap-t base64-t my_atomic-t noinst_PROGRAMS = bitmap-t base64-t my_atomic-t lf-t waiting_threads-t
LDADD = $(top_builddir)/unittest/mytap/libmytap.a \ LDADD = $(top_builddir)/unittest/mytap/libmytap.a \
$(top_builddir)/mysys/libmysys.a \ $(top_builddir)/mysys/libmysys.a \
......
/* Copyright (C) 2006 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "thr_template.c"
#include <lf.h>
int32 inserts= 0, N;
LF_ALLOCATOR lf_allocator;
LF_HASH lf_hash;
/*
pin allocator - alloc and release an element in a loop
*/
pthread_handler_t test_lf_pinbox(void *arg)
{
int m= *(int *)arg;
int32 x= 0;
LF_PINS *pins;
my_thread_init();
pins= lf_pinbox_get_pins(&lf_allocator.pinbox);
for (x= ((int)(intptr)(&m)); m ; m--)
{
lf_pinbox_put_pins(pins);
pins= lf_pinbox_get_pins(&lf_allocator.pinbox);
}
lf_pinbox_put_pins(pins);
pthread_mutex_lock(&mutex);
if (!--running_threads) pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
my_thread_end();
return 0;
}
typedef union {
int32 data;
void *not_used;
} TLA;
pthread_handler_t test_lf_alloc(void *arg)
{
int m= (*(int *)arg)/2;
int32 x,y= 0;
LF_PINS *pins;
my_thread_init();
pins= lf_alloc_get_pins(&lf_allocator);
for (x= ((int)(intptr)(&m)); m ; m--)
{
TLA *node1, *node2;
x= (x*m+0x87654321) & INT_MAX32;
node1= (TLA *)lf_alloc_new(pins);
node1->data= x;
y+= node1->data;
node1->data= 0;
node2= (TLA *)lf_alloc_new(pins);
node2->data= x;
y-= node2->data;
node2->data= 0;
lf_alloc_free(pins, node1);
lf_alloc_free(pins, node2);
}
lf_alloc_put_pins(pins);
pthread_mutex_lock(&mutex);
bad+= y;
if (--N == 0)
{
diag("%d mallocs, %d pins in stack",
lf_allocator.mallocs, lf_allocator.pinbox.pins_in_array);
#ifdef MY_LF_EXTRA_DEBUG
bad|= lf_allocator.mallocs - lf_alloc_pool_count(&lf_allocator);
#endif
}
if (!--running_threads) pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
my_thread_end();
return 0;
}
#define N_TLH 1000
pthread_handler_t test_lf_hash(void *arg)
{
int m= (*(int *)arg)/(2*N_TLH);
int32 x,y,z,sum= 0, ins= 0;
LF_PINS *pins;
my_thread_init();
pins= lf_hash_get_pins(&lf_hash);
for (x= ((int)(intptr)(&m)); m ; m--)
{
int i;
y= x;
for (i= 0; i < N_TLH; i++)
{
x= (x*(m+i)+0x87654321) & INT_MAX32;
z= (x<0) ? -x : x;
if (lf_hash_insert(&lf_hash, pins, &z))
{
sum+= z;
ins++;
}
}
for (i= 0; i < N_TLH; i++)
{
y= (y*(m+i)+0x87654321) & INT_MAX32;
z= (y<0) ? -y : y;
if (lf_hash_delete(&lf_hash, pins, (uchar *)&z, sizeof(z)))
sum-= z;
}
}
lf_hash_put_pins(pins);
pthread_mutex_lock(&mutex);
bad+= sum;
inserts+= ins;
if (--N == 0)
{
diag("%d mallocs, %d pins in stack, %d hash size, %d inserts",
lf_hash.alloc.mallocs, lf_hash.alloc.pinbox.pins_in_array,
lf_hash.size, inserts);
bad|= lf_hash.count;
}
if (!--running_threads) pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
my_thread_end();
return 0;
}
void do_tests()
{
plan(4);
lf_alloc_init(&lf_allocator, sizeof(TLA), offsetof(TLA, not_used));
lf_hash_init(&lf_hash, sizeof(int), LF_HASH_UNIQUE, 0, sizeof(int), 0,
&my_charset_bin);
bad= my_atomic_initialize();
ok(!bad, "my_atomic_initialize() returned %d", bad);
test_concurrently("lf_pinbox", test_lf_pinbox, N= THREADS, CYCLES);
test_concurrently("lf_alloc", test_lf_alloc, N= THREADS, CYCLES);
test_concurrently("lf_hash", test_lf_hash, N= THREADS, CYCLES/10);
lf_hash_destroy(&lf_hash);
lf_alloc_destroy(&lf_allocator);
}
...@@ -13,11 +13,7 @@ ...@@ -13,11 +13,7 @@
along with this program; if not, write to the Free Software along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <my_global.h> #include "thr_template.c"
#include <my_sys.h>
#include <my_atomic.h>
#include <tap.h>
#include <lf.h>
/* at least gcc 3.4.5 and 3.4.6 (but not 3.2.3) on RHEL */ /* at least gcc 3.4.5 and 3.4.6 (but not 3.2.3) on RHEL */
#if __GNUC__ == 3 && __GNUC_MINOR__ == 4 #if __GNUC__ == 3 && __GNUC_MINOR__ == 4
...@@ -26,20 +22,12 @@ ...@@ -26,20 +22,12 @@
#define GCC_BUG_WORKAROUND #define GCC_BUG_WORKAROUND
#endif #endif
volatile uint32 a32,b32; volatile uint32 b32;
volatile int32 c32, N; volatile int32 c32;
my_atomic_rwlock_t rwl; my_atomic_rwlock_t rwl;
LF_ALLOCATOR lf_allocator;
LF_HASH lf_hash;
pthread_attr_t thr_attr;
pthread_mutex_t mutex;
pthread_cond_t cond;
uint running_threads;
size_t stacksize= 0;
#define STACK_SIZE (((int)stacksize-2048)*STACK_DIRECTION)
/* add and sub a random number in a loop. Must get 0 at the end */ /* add and sub a random number in a loop. Must get 0 at the end */
pthread_handler_t test_atomic_add_handler(void *arg) pthread_handler_t test_atomic_add(void *arg)
{ {
int m= (*(int *)arg)/2; int m= (*(int *)arg)/2;
GCC_BUG_WORKAROUND int32 x; GCC_BUG_WORKAROUND int32 x;
...@@ -47,11 +35,11 @@ pthread_handler_t test_atomic_add_handler(void *arg) ...@@ -47,11 +35,11 @@ pthread_handler_t test_atomic_add_handler(void *arg)
{ {
x= (x*m+0x87654321) & INT_MAX32; x= (x*m+0x87654321) & INT_MAX32;
my_atomic_rwlock_wrlock(&rwl); my_atomic_rwlock_wrlock(&rwl);
my_atomic_add32(&a32, x); my_atomic_add32(&bad, x);
my_atomic_rwlock_wrunlock(&rwl); my_atomic_rwlock_wrunlock(&rwl);
my_atomic_rwlock_wrlock(&rwl); my_atomic_rwlock_wrlock(&rwl);
my_atomic_add32(&a32, -x); my_atomic_add32(&bad, -x);
my_atomic_rwlock_wrunlock(&rwl); my_atomic_rwlock_wrunlock(&rwl);
} }
pthread_mutex_lock(&mutex); pthread_mutex_lock(&mutex);
...@@ -62,13 +50,13 @@ pthread_handler_t test_atomic_add_handler(void *arg) ...@@ -62,13 +50,13 @@ pthread_handler_t test_atomic_add_handler(void *arg)
/* /*
1. generate thread number 0..N-1 from b32 1. generate thread number 0..N-1 from b32
2. add it to a32 2. add it to bad
3. swap thread numbers in c32 3. swap thread numbers in c32
4. (optionally) one more swap to avoid 0 as a result 4. (optionally) one more swap to avoid 0 as a result
5. subtract result from a32 5. subtract result from bad
must get 0 in a32 at the end must get 0 in bad at the end
*/ */
pthread_handler_t test_atomic_fas_handler(void *arg) pthread_handler_t test_atomic_fas(void *arg)
{ {
int m= *(int *)arg; int m= *(int *)arg;
int32 x; int32 x;
...@@ -78,7 +66,7 @@ pthread_handler_t test_atomic_fas_handler(void *arg) ...@@ -78,7 +66,7 @@ pthread_handler_t test_atomic_fas_handler(void *arg)
my_atomic_rwlock_wrunlock(&rwl); my_atomic_rwlock_wrunlock(&rwl);
my_atomic_rwlock_wrlock(&rwl); my_atomic_rwlock_wrlock(&rwl);
my_atomic_add32(&a32, x); my_atomic_add32(&bad, x);
my_atomic_rwlock_wrunlock(&rwl); my_atomic_rwlock_wrunlock(&rwl);
for (; m ; m--) for (; m ; m--)
...@@ -96,7 +84,7 @@ pthread_handler_t test_atomic_fas_handler(void *arg) ...@@ -96,7 +84,7 @@ pthread_handler_t test_atomic_fas_handler(void *arg)
} }
my_atomic_rwlock_wrlock(&rwl); my_atomic_rwlock_wrlock(&rwl);
my_atomic_add32(&a32, -x); my_atomic_add32(&bad, -x);
my_atomic_rwlock_wrunlock(&rwl); my_atomic_rwlock_wrunlock(&rwl);
pthread_mutex_lock(&mutex); pthread_mutex_lock(&mutex);
...@@ -106,28 +94,28 @@ pthread_handler_t test_atomic_fas_handler(void *arg) ...@@ -106,28 +94,28 @@ pthread_handler_t test_atomic_fas_handler(void *arg)
} }
/* /*
same as test_atomic_add_handler, but my_atomic_add32 is emulated with same as test_atomic_add, but my_atomic_add32 is emulated with
my_atomic_cas32 - notice that the slowdown is proportional to the my_atomic_cas32 - notice that the slowdown is proportional to the
number of CPUs number of CPUs
*/ */
pthread_handler_t test_atomic_cas_handler(void *arg) pthread_handler_t test_atomic_cas(void *arg)
{ {
int m= (*(int *)arg)/2, ok= 0; int m= (*(int *)arg)/2, ok= 0;
GCC_BUG_WORKAROUND int32 x, y; GCC_BUG_WORKAROUND int32 x, y;
for (x= ((int)(intptr)(&m)); m ; m--) for (x= ((int)(intptr)(&m)); m ; m--)
{ {
my_atomic_rwlock_wrlock(&rwl); my_atomic_rwlock_wrlock(&rwl);
y= my_atomic_load32(&a32); y= my_atomic_load32(&bad);
my_atomic_rwlock_wrunlock(&rwl); my_atomic_rwlock_wrunlock(&rwl);
x= (x*m+0x87654321) & INT_MAX32; x= (x*m+0x87654321) & INT_MAX32;
do { do {
my_atomic_rwlock_wrlock(&rwl); my_atomic_rwlock_wrlock(&rwl);
ok= my_atomic_cas32(&a32, &y, (uint32)y+x); ok= my_atomic_cas32(&bad, &y, (uint32)y+x);
my_atomic_rwlock_wrunlock(&rwl); my_atomic_rwlock_wrunlock(&rwl);
} while (!ok) ; } while (!ok) ;
do { do {
my_atomic_rwlock_wrlock(&rwl); my_atomic_rwlock_wrlock(&rwl);
ok= my_atomic_cas32(&a32, &y, y-x); ok= my_atomic_cas32(&bad, &y, y-x);
my_atomic_rwlock_wrunlock(&rwl); my_atomic_rwlock_wrunlock(&rwl);
} while (!ok) ; } while (!ok) ;
} }
...@@ -137,212 +125,22 @@ pthread_handler_t test_atomic_cas_handler(void *arg) ...@@ -137,212 +125,22 @@ pthread_handler_t test_atomic_cas_handler(void *arg)
return 0; return 0;
} }
void do_tests()
/*
pin allocator - alloc and release an element in a loop
*/
pthread_handler_t test_lf_pinbox(void *arg)
{
int m= *(int *)arg;
int32 x= 0;
LF_PINS *pins;
pins= lf_pinbox_get_pins(&lf_allocator.pinbox, &m + STACK_SIZE);
for (x= ((int)(intptr)(&m)); m ; m--)
{
lf_pinbox_put_pins(pins);
pins= lf_pinbox_get_pins(&lf_allocator.pinbox, &m + STACK_SIZE);
}
lf_pinbox_put_pins(pins);
pthread_mutex_lock(&mutex);
if (!--running_threads) pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
return 0;
}
typedef union {
int32 data;
void *not_used;
} TLA;
pthread_handler_t test_lf_alloc(void *arg)
{
int m= (*(int *)arg)/2;
int32 x,y= 0;
LF_PINS *pins;
pins= lf_alloc_get_pins(&lf_allocator, &m + STACK_SIZE);
for (x= ((int)(intptr)(&m)); m ; m--)
{
TLA *node1, *node2;
x= (x*m+0x87654321) & INT_MAX32;
node1= (TLA *)lf_alloc_new(pins);
node1->data= x;
y+= node1->data;
node1->data= 0;
node2= (TLA *)lf_alloc_new(pins);
node2->data= x;
y-= node2->data;
node2->data= 0;
lf_alloc_free(pins, node1);
lf_alloc_free(pins, node2);
}
lf_alloc_put_pins(pins);
my_atomic_rwlock_wrlock(&rwl);
my_atomic_add32(&a32, y);
if (my_atomic_add32(&N, -1) == 1)
{
diag("%d mallocs, %d pins in stack",
lf_allocator.mallocs, lf_allocator.pinbox.pins_in_array);
#ifdef MY_LF_EXTRA_DEBUG
a32|= lf_allocator.mallocs - lf_alloc_pool_count(&lf_allocator);
#endif
}
my_atomic_rwlock_wrunlock(&rwl);
pthread_mutex_lock(&mutex);
if (!--running_threads) pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
return 0;
}
#define N_TLH 1000
pthread_handler_t test_lf_hash(void *arg)
{
int m= (*(int *)arg)/(2*N_TLH);
int32 x,y,z,sum= 0, ins= 0;
LF_PINS *pins;
pins= lf_hash_get_pins(&lf_hash, &m + STACK_SIZE);
for (x= ((int)(intptr)(&m)); m ; m--)
{
int i;
y= x;
for (i= 0; i < N_TLH; i++)
{
x= (x*(m+i)+0x87654321) & INT_MAX32;
z= (x<0) ? -x : x;
if (lf_hash_insert(&lf_hash, pins, &z))
{
sum+= z;
ins++;
}
}
for (i= 0; i < N_TLH; i++)
{
y= (y*(m+i)+0x87654321) & INT_MAX32;
z= (y<0) ? -y : y;
if (lf_hash_delete(&lf_hash, pins, (uchar *)&z, sizeof(z)))
sum-= z;
}
}
lf_hash_put_pins(pins);
my_atomic_rwlock_wrlock(&rwl);
my_atomic_add32(&a32, sum);
my_atomic_add32(&b32, ins);
if (my_atomic_add32(&N, -1) == 1)
{
diag("%d mallocs, %d pins in stack, %d hash size, %d inserts",
lf_hash.alloc.mallocs, lf_hash.alloc.pinbox.pins_in_array,
lf_hash.size, b32);
a32|= lf_hash.count;
}
my_atomic_rwlock_wrunlock(&rwl);
pthread_mutex_lock(&mutex);
if (!--running_threads) pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
return 0;
}
void test_atomic(const char *test, pthread_handler handler, int n, int m)
{ {
pthread_t t; plan(4);
ulonglong now= my_getsystime();
a32= 0; bad= my_atomic_initialize();
b32= 0; ok(!bad, "my_atomic_initialize() returned %d", bad);
c32= 0;
diag("Testing %s with %d threads, %d iterations... ", test, n, m);
for (running_threads= n ; n ; n--)
{
if (pthread_create(&t, &thr_attr, handler, &m) != 0)
{
diag("Could not create thread");
abort();
}
}
pthread_mutex_lock(&mutex);
while (running_threads)
pthread_cond_wait(&cond, &mutex);
pthread_mutex_unlock(&mutex);
now= my_getsystime()-now;
ok(a32 == 0, "tested %s in %g secs (%d)", test, ((double)now)/1e7, a32);
}
int main()
{
int err;
MY_INIT("my_atomic-t.c");
diag("N CPUs: %d, atomic ops: %s", my_getncpus(), MY_ATOMIC_MODE);
err= my_atomic_initialize();
plan(7);
ok(err == 0, "my_atomic_initialize() returned %d", err);
pthread_mutex_init(&mutex, 0);
pthread_cond_init(&cond, 0);
my_atomic_rwlock_init(&rwl); my_atomic_rwlock_init(&rwl);
lf_alloc_init(&lf_allocator, sizeof(TLA), offsetof(TLA, not_used));
lf_hash_init(&lf_hash, sizeof(int), LF_HASH_UNIQUE, 0, sizeof(int), 0,
&my_charset_bin);
pthread_attr_init(&thr_attr);
pthread_attr_setdetachstate(&thr_attr,PTHREAD_CREATE_DETACHED);
#ifdef HAVE_PTHREAD_ATTR_GETSTACKSIZE
pthread_attr_getstacksize(&thr_attr, &stacksize);
if (stacksize == 0)
#endif
stacksize = PTHREAD_STACK_MIN;
#ifdef MY_ATOMIC_MODE_RWLOCKS
#if defined(HPUX11) || defined(__POWERPC__) /* showed to be very slow (scheduler-related) */
#define CYCLES 300
#else
#define CYCLES 3000
#endif
#else
#define CYCLES 300000
#endif
#define THREADS 100
test_atomic("my_atomic_add32", test_atomic_add_handler, THREADS,CYCLES);
test_atomic("my_atomic_fas32", test_atomic_fas_handler, THREADS,CYCLES);
test_atomic("my_atomic_cas32", test_atomic_cas_handler, THREADS,CYCLES);
test_atomic("lf_pinbox", test_lf_pinbox, THREADS,CYCLES);
test_atomic("lf_alloc", test_lf_alloc, THREADS,CYCLES);
test_atomic("lf_hash", test_lf_hash, THREADS,CYCLES/10);
lf_hash_destroy(&lf_hash); b32= c32= 0;
lf_alloc_destroy(&lf_allocator); test_concurrently("my_atomic_add32", test_atomic_add, THREADS, CYCLES);
b32= c32= 0;
test_concurrently("my_atomic_fas32", test_atomic_fas, THREADS, CYCLES);
b32= c32= 0;
test_concurrently("my_atomic_cas32", test_atomic_cas, THREADS, CYCLES);
/*
workaround until we know why it crashes randomly on some machine
(BUG#22320).
*/
sleep(2);
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
pthread_attr_destroy(&thr_attr);
my_atomic_rwlock_destroy(&rwl); my_atomic_rwlock_destroy(&rwl);
my_end(0);
return exit_status();
} }
/* Copyright (C) 2006 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <my_global.h>
#include <my_sys.h>
#include <my_atomic.h>
#include <tap.h>
volatile uint32 bad;
pthread_attr_t thr_attr;
pthread_mutex_t mutex;
pthread_cond_t cond;
uint running_threads;
void do_tests();
void test_concurrently(const char *test, pthread_handler handler, int n, int m)
{
pthread_t t;
ulonglong now= my_getsystime();
bad= 0;
diag("Testing %s with %d threads, %d iterations... ", test, n, m);
for (running_threads= n ; n ; n--)
{
if (pthread_create(&t, &thr_attr, handler, &m) != 0)
{
diag("Could not create thread");
abort();
}
}
pthread_mutex_lock(&mutex);
while (running_threads)
pthread_cond_wait(&cond, &mutex);
pthread_mutex_unlock(&mutex);
now= my_getsystime()-now;
ok(!bad, "tested %s in %g secs (%d)", test, ((double)now)/1e7, bad);
}
int main(int argc, char **argv)
{
MY_INIT("thd_template");
if (argv[1] && *argv[1])
DBUG_SET_INITIAL(argv[1]);
pthread_mutex_init(&mutex, 0);
pthread_cond_init(&cond, 0);
pthread_attr_init(&thr_attr);
pthread_attr_setdetachstate(&thr_attr,PTHREAD_CREATE_DETACHED);
#ifdef MY_ATOMIC_MODE_RWLOCKS
#if defined(HPUX11) || defined(__POWERPC__) /* showed to be very slow (scheduler-related) */
#define CYCLES 300
#else
#define CYCLES 3000
#endif
#else
#define CYCLES 3000
#endif
#define THREADS 30
diag("N CPUs: %d, atomic ops: %s", my_getncpus(), MY_ATOMIC_MODE);
do_tests();
/*
workaround until we know why it crashes randomly on some machine
(BUG#22320).
*/
sleep(2);
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
pthread_attr_destroy(&thr_attr);
my_end(0);
return exit_status();
}
/* Copyright (C) 2006 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "thr_template.c"
#include <waiting_threads.h>
#include <m_string.h>
#include <locale.h>
struct test_wt_thd {
WT_THD thd;
pthread_mutex_t lock;
} thds[THREADS];
uint i, cnt;
pthread_mutex_t lock;
#define reset(ARRAY) bzero(ARRAY, sizeof(ARRAY))
enum { LATEST, RANDOM, YOUNGEST, LOCKS } kill_strategy;
WT_RESOURCE_TYPE restype={ wt_resource_id_memcmp, 0};
#define rnd() ((uint)(my_rnd(&rand) * INT_MAX32))
/*
stress test: wait on a random number of random threads.
it always succeeds (unless crashes or hangs).
*/
pthread_handler_t test_wt(void *arg)
{
int m, n, i, id, res;
struct my_rnd_struct rand;
my_thread_init();
pthread_mutex_lock(&lock);
id= cnt++;
pthread_mutex_unlock(&lock);
my_rnd_init(&rand, (ulong)(intptr)&m, id);
if (kill_strategy == YOUNGEST)
thds[id].thd.weight= ~my_getsystime();
if (kill_strategy == LOCKS)
thds[id].thd.weight= 0;
/*
wt_thd_init() is supposed to be called in the thread that will use it.
We didn't do that, and now need to fix the broken object.
*/
thds[id].thd.pins->stack_ends_here= & my_thread_var->stack_ends_here;
#ifndef DBUG_OFF
thds[id].thd.name=my_thread_name();
#endif
for (m= *(int *)arg; m ; m--)
{
WT_RESOURCE_ID resid;
int blockers[THREADS/10], j, k;
bzero(&resid, sizeof(resid));
resid.value.num= id; //rnd() % THREADS;
resid.type= &restype;
res= 0;
for (j= n= (rnd() % THREADS)/10; !res && j >= 0; j--)
{
retry:
i= rnd() % (THREADS-1);
if (i >= id) i++;
#ifndef DBUG_OFF
if (thds[i].thd.name==0)
goto retry;
#endif
for (k=n; k >=j; k--)
if (blockers[k] == i)
goto retry;
blockers[j]= i;
if (kill_strategy == RANDOM)
thds[id].thd.weight= rnd();
pthread_mutex_lock(& thds[i].lock);
res= wt_thd_will_wait_for(& thds[id].thd, & thds[i].thd, &resid);
pthread_mutex_unlock(& thds[i].lock);
}
if (!res)
{
pthread_mutex_lock(&lock);
res= wt_thd_cond_timedwait(& thds[id].thd, &lock);
pthread_mutex_unlock(&lock);
}
if (res)
{
pthread_mutex_lock(& thds[id].lock);
pthread_mutex_lock(&lock);
wt_thd_release_all(& thds[id].thd);
pthread_mutex_unlock(&lock);
pthread_mutex_unlock(& thds[id].lock);
if (kill_strategy == LOCKS)
thds[id].thd.weight= 0;
if (kill_strategy == YOUNGEST)
thds[id].thd.weight= ~my_getsystime();
}
else if (kill_strategy == LOCKS)
thds[id].thd.weight++;
}
pthread_mutex_lock(& thds[id].lock);
pthread_mutex_lock(&lock);
wt_thd_release_all(& thds[id].thd);
pthread_mutex_unlock(&lock);
pthread_mutex_unlock(& thds[id].lock);
#ifndef DBUG_OFF
{
#define DEL "(deleted)"
char *x=malloc(strlen(thds[id].thd.name)+sizeof(DEL)+1);
strxmov(x, thds[id].thd.name, DEL, 0);
thds[id].thd.name=x; /* it's a memory leak, go on, shot me */
}
#endif
pthread_mutex_lock(&mutex);
if (!--running_threads) pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
DBUG_PRINT("wt", ("exiting"));
my_thread_end();
return 0;
}
void do_one_test()
{
double sum, sum0;
#ifndef DBUG_OFF
for (cnt=0; cnt < THREADS; cnt++)
thds[cnt].thd.name=0;
#endif
reset(wt_cycle_stats);
reset(wt_wait_stats);
wt_success_stats=0;
cnt=0;
test_concurrently("waiting_threads", test_wt, THREADS, CYCLES);
sum=sum0=0;
for (cnt=0; cnt < WT_CYCLE_STATS; cnt++)
sum+= wt_cycle_stats[0][cnt] + wt_cycle_stats[1][cnt];
for (cnt=0; cnt < WT_CYCLE_STATS; cnt++)
if (wt_cycle_stats[0][cnt] + wt_cycle_stats[1][cnt] > 0)
{
sum0+=wt_cycle_stats[0][cnt] + wt_cycle_stats[1][cnt];
diag("deadlock cycles of length %2u: %4u %4u %8.2f %%", cnt,
wt_cycle_stats[0][cnt], wt_cycle_stats[1][cnt], 1e2*sum0/sum);
}
diag("depth exceeded: %u %u",
wt_cycle_stats[0][cnt], wt_cycle_stats[1][cnt]);
for (cnt=0; cnt < WT_WAIT_STATS; cnt++)
if (wt_wait_stats[cnt]>0)
diag("deadlock waits up to %7llu us: %5u",
wt_wait_table[cnt], wt_wait_stats[cnt]);
diag("timed out: %u", wt_wait_stats[cnt]);
diag("successes: %u", wt_success_stats);
}
void do_tests()
{
plan(12);
compile_time_assert(THREADS >= 3);
DBUG_PRINT("wt", ("================= initialization ==================="));
bad= my_atomic_initialize();
ok(!bad, "my_atomic_initialize() returned %d", bad);
pthread_mutex_init(&lock, 0);
wt_init();
for (cnt=0; cnt < THREADS; cnt++)
{
wt_thd_init(& thds[cnt].thd);
pthread_mutex_init(& thds[cnt].lock, 0);
}
{
WT_RESOURCE_ID resid[3];
for (i=0; i < 3; i++)
{
bzero(&resid[i], sizeof(resid[i]));
resid[i].value.num= i+1;
resid[i].type= &restype;
}
DBUG_PRINT("wt", ("================= manual test ==================="));
#define ok_wait(X,Y, R) \
ok(wt_thd_will_wait_for(& thds[X].thd, & thds[Y].thd, &resid[R]) == 0, \
"thd[" #X "] will wait for thd[" #Y "]")
#define ok_deadlock(X,Y,R) \
ok(wt_thd_will_wait_for(& thds[X].thd, & thds[Y].thd, &resid[R]) == WT_DEADLOCK, \
"thd[" #X "] will wait for thd[" #Y "] - deadlock")
ok_wait(0,1,0);
ok_wait(0,2,0);
ok_wait(0,3,0);
pthread_mutex_lock(&lock);
bad= wt_thd_cond_timedwait(& thds[0].thd, &lock);
pthread_mutex_unlock(&lock);
ok(bad == ETIMEDOUT, "timeout test returned %d", bad);
ok_wait(0,1,0);
ok_wait(1,2,1);
ok_deadlock(2,0,2);
// FIXME remove wt_thd_dontwait calls below
wt_thd_dontwait(& thds[0].thd);
wt_thd_dontwait(& thds[1].thd);
wt_thd_dontwait(& thds[2].thd);
wt_thd_dontwait(& thds[3].thd);
pthread_mutex_lock(&lock);
wt_thd_release_all(& thds[0].thd);
wt_thd_release_all(& thds[1].thd);
wt_thd_release_all(& thds[2].thd);
wt_thd_release_all(& thds[3].thd);
pthread_mutex_unlock(&lock);
}
wt_deadlock_search_depth_short=6;
wt_timeout_short=1000;
wt_timeout_long= 100;
wt_deadlock_search_depth_long=16;
DBUG_PRINT("wt", ("================= stress test ==================="));
diag("timeout_short=%d us, deadlock_search_depth_short=%d",
wt_timeout_short, wt_deadlock_search_depth_short);
diag("timeout_long=%d us, deadlock_search_depth_long=%d",
wt_timeout_long, wt_deadlock_search_depth_long);
#define test_kill_strategy(X) \
diag("kill strategy: " #X); \
kill_strategy=X; \
do_one_test();
test_kill_strategy(LATEST);
test_kill_strategy(RANDOM);
test_kill_strategy(YOUNGEST);
test_kill_strategy(LOCKS);
DBUG_PRINT("wt", ("================= cleanup ==================="));
pthread_mutex_lock(&lock);
for (cnt=0; cnt < THREADS; cnt++)
{
wt_thd_release_all(& thds[cnt].thd);
wt_thd_destroy(& thds[cnt].thd);
pthread_mutex_destroy(& thds[cnt].lock);
}
pthread_mutex_unlock(&lock);
wt_end();
pthread_mutex_destroy(&lock);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment