Commit a8494872 authored by Mikael Ronstrom's avatar Mikael Ronstrom

WL#5138 merged to mysql-next-mr

parents ecb6228c 018b63c5
......@@ -39,7 +39,7 @@ noinst_HEADERS = config-win.h config-netware.h my_bit.h \
thr_lock.h t_ctype.h violite.h my_md5.h base64.h \
my_handler.h my_time.h service_versions.h \
my_vle.h my_user.h my_atomic.h atomic/nolock.h \
atomic/rwlock.h atomic/x86-gcc.h atomic/x86-msvc.h \
atomic/rwlock.h atomic/x86-gcc.h atomic/generic-msvc.h \
atomic/solaris.h \
atomic/gcc_builtins.h my_libwrap.h my_stacktrace.h
......
......@@ -18,7 +18,7 @@
#define make_atomic_add_body(S) \
v= __sync_fetch_and_add(a, v);
#define make_atomic_swap_body(S) \
#define make_atomic_fas_body(S) \
v= __sync_lock_test_and_set(a, v);
#define make_atomic_cas_body(S) \
int ## S sav; \
......@@ -28,7 +28,10 @@
#ifdef MY_ATOMIC_MODE_DUMMY
#define make_atomic_load_body(S) ret= *a
#define make_atomic_store_body(S) *a= v
#define MY_ATOMIC_MODE "gcc-builtins-up"
#else
#define MY_ATOMIC_MODE "gcc-builtins-smp"
#define make_atomic_load_body(S) \
ret= __sync_fetch_and_or(a, 0);
#define make_atomic_store_body(S) \
......
/* Copyright (C) 2006-2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef _atomic_h_cleanup_
#define _atomic_h_cleanup_ "atomic/generic-msvc.h"
/*
We don't implement anything specific for MY_ATOMIC_MODE_DUMMY, always use
intrinsics.
8 and 16-bit atomics are not implemented, but it can be done if necessary.
*/
#undef MY_ATOMIC_HAS_8_16
/*
x86 compilers (both VS2003 or VS2005) never use instrinsics, but generate
function calls to kernel32 instead, even in the optimized build.
We force intrinsics as described in MSDN documentation for
_InterlockedCompareExchange.
*/
#ifdef _M_IX86
#if (_MSC_VER >= 1500)
#include <intrin.h>
#else
C_MODE_START
/*Visual Studio 2003 and earlier do not have prototypes for atomic intrinsics*/
LONG _InterlockedExchange (LONG volatile *Target,LONG Value);
LONG _InterlockedCompareExchange (LONG volatile *Target, LONG Value, LONG Comp);
LONG _InterlockedExchangeAdd (LONG volatile *Addend, LONG Value);
C_MODE_END
#pragma intrinsic(_InterlockedExchangeAdd)
#pragma intrinsic(_InterlockedCompareExchange)
#pragma intrinsic(_InterlockedExchange)
#endif
#define InterlockedExchange _InterlockedExchange
#define InterlockedExchangeAdd _InterlockedExchangeAdd
#define InterlockedCompareExchange _InterlockedCompareExchange
/*
No need to do something special for InterlockedCompareExchangePointer
as it is a #define to InterlockedCompareExchange. The same applies to
InterlockedExchangePointer.
*/
#endif /*_M_IX86*/
#define MY_ATOMIC_MODE "msvc-intrinsics"
#define IL_EXCHG_ADD32(X,Y) InterlockedExchangeAdd((volatile LONG *)(X),(Y))
#define IL_COMP_EXCHG32(X,Y,Z) InterlockedCompareExchange((volatile LONG *)(X),(Y),(Z))
#define IL_COMP_EXCHGptr InterlockedCompareExchangePointer
#define IL_EXCHG32(X,Y) InterlockedExchange((volatile LONG *)(X),(Y))
#define IL_EXCHGptr InterlockedExchangePointer
#define make_atomic_add_body(S) \
v= IL_EXCHG_ADD ## S (a, v)
#define make_atomic_cas_body(S) \
int ## S initial_cmp= *cmp; \
int ## S initial_a= IL_COMP_EXCHG ## S (a, set, initial_cmp); \
if (!(ret= (initial_a == initial_cmp))) *cmp= initial_a;
#define make_atomic_swap_body(S) \
v= IL_EXCHG ## S (a, v)
#define make_atomic_load_body(S) \
ret= 0; /* avoid compiler warning */ \
ret= IL_COMP_EXCHG ## S (a, ret, ret);
/*
my_yield_processor (equivalent of x86 PAUSE instruction) should be used
to improve performance on hyperthreaded CPUs. Intel recommends to use it in
spin loops also on non-HT machines to reduce power consumption (see e.g
http://softwarecommunity.intel.com/articles/eng/2004.htm)
Running benchmarks for spinlocks implemented with InterlockedCompareExchange
and YieldProcessor shows that much better performance is achieved by calling
YieldProcessor in a loop - that is, yielding longer. On Intel boxes setting
loop count in the range 200-300 brought best results.
*/
#ifndef YIELD_LOOPS
#define YIELD_LOOPS 200
#endif
static __inline int my_yield_processor()
{
int i;
for(i=0; i<YIELD_LOOPS; i++)
{
#if (_MSC_VER <= 1310)
/* On older compilers YieldProcessor is not available, use inline assembly*/
__asm { rep nop }
#else
YieldProcessor();
#endif
}
return 1;
}
#define LF_BACKOFF my_yield_processor()
#else /* cleanup */
#undef IL_EXCHG_ADD32
#undef IL_COMP_EXCHG32
#undef IL_COMP_EXCHGptr
#undef IL_EXCHG32
#undef IL_EXCHGptr
#endif
......@@ -16,43 +16,36 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#if defined(__i386__) || defined(_M_IX86) || defined(HAVE_GCC_ATOMIC_BUILTINS)
#ifdef MY_ATOMIC_MODE_DUMMY
# define LOCK ""
#else
# define LOCK "lock"
#endif
#ifdef HAVE_GCC_ATOMIC_BUILTINS
#include "gcc_builtins.h"
#elif __GNUC__
#include "x86-gcc.h"
#elif defined(_MSC_VER)
#include "x86-msvc.h"
#endif
#if defined(__i386__) || defined(_MSC_VER) || defined(__x86_64__) \
|| defined(HAVE_GCC_ATOMIC_BUILTINS)
# ifdef MY_ATOMIC_MODE_DUMMY
# define LOCK_prefix ""
# else
# define LOCK_prefix "lock"
# endif
# ifdef HAVE_GCC_ATOMIC_BUILTINS
# include "gcc_builtins.h"
# elif __GNUC__
# include "x86-gcc.h"
# elif defined(_MSC_VER)
# include "generic-msvc.h"
# endif
#elif defined(HAVE_SOLARIS_ATOMIC)
#include "solaris.h"
#endif /* __i386__ || _M_IX86 || HAVE_GCC_ATOMIC_BUILTINS */
#endif
#if defined(make_atomic_cas_body) || defined(MY_ATOMICS_MADE)
/*
* We have atomics that require no locking
*/
#define MY_ATOMIC_NOLOCK
#ifdef __SUNPRO_C
/*
* Sun Studio 12 (and likely earlier) does not accept a typedef struct {}
*/
typedef char my_atomic_rwlock_t;
#else
typedef struct { } my_atomic_rwlock_t;
#endif
Type not used so minimal size (emptry struct has different size between C
and C++, zero-length array is gcc-specific).
*/
typedef char my_atomic_rwlock_t __attribute__ ((unused));
#define my_atomic_rwlock_destroy(name)
#define my_atomic_rwlock_init(name)
#define my_atomic_rwlock_rdlock(name)
......
......@@ -16,7 +16,8 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
typedef struct {pthread_rwlock_t rw;} my_atomic_rwlock_t;
typedef struct {pthread_mutex_t rw;} my_atomic_rwlock_t;
#define MY_ATOMIC_MODE_RWLOCKS 1
#ifdef MY_ATOMIC_MODE_DUMMY
/*
......@@ -34,17 +35,25 @@ typedef struct {pthread_rwlock_t rw;} my_atomic_rwlock_t;
#define my_atomic_rwlock_wrunlock(name)
#define MY_ATOMIC_MODE "dummy (non-atomic)"
#else
#define my_atomic_rwlock_destroy(name) pthread_rwlock_destroy(& (name)->rw)
#define my_atomic_rwlock_init(name) pthread_rwlock_init(& (name)->rw, 0)
#define my_atomic_rwlock_rdlock(name) pthread_rwlock_rdlock(& (name)->rw)
#define my_atomic_rwlock_wrlock(name) pthread_rwlock_wrlock(& (name)->rw)
#define my_atomic_rwlock_rdunlock(name) pthread_rwlock_unlock(& (name)->rw)
#define my_atomic_rwlock_wrunlock(name) pthread_rwlock_unlock(& (name)->rw)
#define MY_ATOMIC_MODE "rwlocks"
/*
we're using read-write lock macros but map them to mutex locks, and they're
faster. Still, having semantically rich API we can change the
underlying implementation, if necessary.
*/
#define my_atomic_rwlock_destroy(name) pthread_mutex_destroy(& (name)->rw)
#define my_atomic_rwlock_init(name) pthread_mutex_init(& (name)->rw, 0)
#define my_atomic_rwlock_rdlock(name) pthread_mutex_lock(& (name)->rw)
#define my_atomic_rwlock_wrlock(name) pthread_mutex_lock(& (name)->rw)
#define my_atomic_rwlock_rdunlock(name) pthread_mutex_unlock(& (name)->rw)
#define my_atomic_rwlock_wrunlock(name) pthread_mutex_unlock(& (name)->rw)
#define MY_ATOMIC_MODE "mutex"
#ifndef MY_ATOMIC_MODE_RWLOCKS
#define MY_ATOMIC_MODE_RWLOCKS 1
#endif
#endif
#define make_atomic_add_body(S) int ## S sav; sav= *a; *a+= v; v=sav;
#define make_atomic_swap_body(S) int ## S sav; sav= *a; *a= v; v=sav;
#define make_atomic_fas_body(S) int ## S sav; sav= *a; *a= v; v=sav;
#define make_atomic_cas_body(S) if ((ret= (*a == *cmp))) *a= set; else *cmp=*a;
#define make_atomic_load_body(S) ret= *a;
#define make_atomic_store_body(S) *a= v;
......
......@@ -186,25 +186,25 @@ my_atomic_storeptr(void * volatile *a, void *v)
/* ------------------------------------------------------------------------ */
STATIC_INLINE int8
my_atomic_swap8(int8 volatile *a, int8 v)
my_atomic_fas8(int8 volatile *a, int8 v)
{
return ((int8) atomic_swap_8((volatile uint8_t *)a, (uint8_t)v));
}
STATIC_INLINE int16
my_atomic_swap16(int16 volatile *a, int16 v)
my_atomic_fas16(int16 volatile *a, int16 v)
{
return ((int16) atomic_swap_16((volatile uint16_t *)a, (uint16_t)v));
}
STATIC_INLINE int32
my_atomic_swap32(int32 volatile *a, int32 v)
my_atomic_fas32(int32 volatile *a, int32 v)
{
return ((int32) atomic_swap_32((volatile uint32_t *)a, (uint32_t)v));
}
STATIC_INLINE void *
my_atomic_swapptr(void * volatile *a, void *v)
my_atomic_fasptr(void * volatile *a, void *v)
{
return (atomic_swap_ptr(a, v));
}
......@@ -22,10 +22,18 @@
architectures support double-word (128-bit) cas.
*/
#ifdef MY_ATOMIC_NO_XADD
#define MY_ATOMIC_MODE "gcc-x86" LOCK "-no-xadd"
#ifdef __x86_64__
# ifdef MY_ATOMIC_NO_XADD
# define MY_ATOMIC_MODE "gcc-amd64" LOCK_prefix "-no-xadd"
# else
# define MY_ATOMIC_MODE "gcc-amd64" LOCK_prefix
# endif
#else
#define MY_ATOMIC_MODE "gcc-x86" LOCK
# ifdef MY_ATOMIC_NO_XADD
# define MY_ATOMIC_MODE "gcc-x86" LOCK_prefix "-no-xadd"
# else
# define MY_ATOMIC_MODE "gcc-x86" LOCK_prefix
# endif
#endif
/* fix -ansi errors while maintaining readability */
......@@ -34,29 +42,53 @@
#endif
#ifndef MY_ATOMIC_NO_XADD
#define make_atomic_add_body(S) \
asm volatile (LOCK "; xadd %0, %1;" : "+r" (v) , "+m" (*a))
#define make_atomic_add_body(S) make_atomic_add_body ## S
#define make_atomic_cas_body(S) make_atomic_cas_body ## S
#endif
#define make_atomic_swap_body(S) \
asm volatile ("; xchg %0, %1;" : "+q" (v) , "+m" (*a))
#define make_atomic_cas_body(S) \
asm volatile (LOCK "; cmpxchg %3, %0; setz %2;" \
#define make_atomic_add_body32 \
asm volatile (LOCK_prefix "; xadd %0, %1;" : "+r" (v) , "+m" (*a))
#define make_atomic_cas_body32 \
asm volatile (LOCK_prefix "; cmpxchg %3, %0; setz %2;" \
: "+m" (*a), "+a" (*cmp), "=q" (ret): "r" (set))
#define make_atomic_cas_bodyptr make_atomic_cas_body32
#ifndef __x86_64__
#define make_atomic_add_body64 make_atomic_add_body32
#define make_atomic_cas_body64 make_atomic_cas_body32
#else
#define make_atomic_add_body64 \
int64 tmp=*a; \
while (!my_atomic_cas64(a, &tmp, tmp+v)); \
v=tmp;
#define make_atomic_cas_body64 \
int32 ebx=(set & 0xFFFFFFFF), ecx=(set >> 32); \
asm volatile (LOCK_prefix "; cmpxchg8b %0; setz %2;" \
: "+m" (*a), "+A" (*cmp), "=q" (ret) \
:"b" (ebx), "c" (ecx))
#endif
#define make_atomic_fas_body(S) \
asm volatile ("xchg %0, %1;" : "+r" (v) , "+m" (*a))
#ifdef MY_ATOMIC_MODE_DUMMY
#define make_atomic_load_body(S) ret=*a
#define make_atomic_store_body(S) *a=v
#else
/*
Actually 32-bit reads/writes are always atomic on x86
But we add LOCK here anyway to force memory barriers
But we add LOCK_prefix here anyway to force memory barriers
*/
#define make_atomic_load_body(S) \
ret=0; \
asm volatile (LOCK "; cmpxchg %2, %0" \
asm volatile (LOCK_prefix "; cmpxchg %2, %0" \
: "+m" (*a), "+a" (ret): "r" (ret))
#define make_atomic_store_body(S) \
asm volatile ("; xchg %0, %1;" : "+m" (*a) : "r" (v))
asm volatile ("; xchg %0, %1;" : "+m" (*a), "+r" (v))
#endif
/* TODO test on intel whether the below helps. on AMD it makes no difference */
//#define LF_BACKOFF ({asm volatile ("rep; nop"); 1; })
#endif /* ATOMIC_X86_GCC_INCLUDED */
/* Copyright (C) 2006 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/*
XXX 64-bit atomic operations can be implemented using
cmpxchg8b, if necessary
*/
// Would it be better to use intrinsics ?
// (InterlockedCompareExchange, InterlockedCompareExchange16
// InterlockedExchangeAdd, InterlockedExchange)
#ifndef _atomic_h_cleanup_
#define _atomic_h_cleanup_ "atomic/x86-msvc.h"
#define MY_ATOMIC_MODE "msvc-x86" LOCK
#define make_atomic_add_body(S) \
_asm { \
_asm mov reg_ ## S, v \
_asm LOCK xadd *a, reg_ ## S \
_asm movzx v, reg_ ## S \
}
#define make_atomic_cas_body(S) \
_asm { \
_asm mov areg_ ## S, *cmp \
_asm mov reg2_ ## S, set \
_asm LOCK cmpxchg *a, reg2_ ## S \
_asm mov *cmp, areg_ ## S \
_asm setz al \
_asm movzx ret, al \
}
#define make_atomic_swap_body(S) \
_asm { \
_asm mov reg_ ## S, v \
_asm xchg *a, reg_ ## S \
_asm mov v, reg_ ## S \
}
#ifdef MY_ATOMIC_MODE_DUMMY
#define make_atomic_load_body(S) ret=*a
#define make_atomic_store_body(S) *a=v
#else
/*
Actually 32-bit reads/writes are always atomic on x86
But we add LOCK here anyway to force memory barriers
*/
#define make_atomic_load_body(S) \
_asm { \
_asm mov areg_ ## S, 0 \
_asm mov reg2_ ## S, areg_ ## S \
_asm LOCK cmpxchg *a, reg2_ ## S \
_asm mov ret, areg_ ## S \
}
#define make_atomic_store_body(S) \
_asm { \
_asm mov reg_ ## S, v \
_asm xchg *a, reg_ ## S \
}
#endif
#define reg_8 al
#define reg_16 ax
#define reg_32 eax
#define areg_8 al
#define areg_16 ax
#define areg_32 eax
#define reg2_8 bl
#define reg2_16 bx
#define reg2_32 ebx
#else /* cleanup */
#undef reg_8
#undef reg_16
#undef reg_32
#undef areg_8
#undef areg_16
#undef areg_32
#undef reg2_8
#undef reg2_16
#undef reg2_32
#endif
This diff is collapsed.
......@@ -877,6 +877,8 @@ typedef SOCKET_SIZE_TYPE size_socket;
#endif
#endif /* defined (HAVE_LONG_LONG) && !defined(ULONGLONG_MAX)*/
#define INT_MIN64 (~0x7FFFFFFFFFFFFFFFLL)
#define INT_MAX64 0x7FFFFFFFFFFFFFFFLL
#define INT_MIN32 (~0x7FFFFFFFL)
#define INT_MAX32 0x7FFFFFFFL
#define UINT_MAX32 0xFFFFFFFFL
......
......@@ -133,9 +133,10 @@ post_init_event_thread(THD *thd)
pthread_mutex_lock(&LOCK_thread_count);
threads.append(thd);
thread_count++;
thread_running++;
pthread_mutex_unlock(&LOCK_thread_count);
my_atomic_rwlock_wrlock(&global_query_id_lock);
inc_thread_running();
my_atomic_rwlock_wrunlock(&global_query_id_lock);
return FALSE;
}
......@@ -157,10 +158,12 @@ deinit_event_thread(THD *thd)
DBUG_PRINT("exit", ("Event thread finishing"));
pthread_mutex_lock(&LOCK_thread_count);
thread_count--;
thread_running--;
delete thd;
pthread_cond_broadcast(&COND_thread_count);
pthread_mutex_unlock(&LOCK_thread_count);
my_atomic_rwlock_wrlock(&global_query_id_lock);
dec_thread_running();
my_atomic_rwlock_wrunlock(&global_query_id_lock);
}
......@@ -418,10 +421,12 @@ Event_scheduler::start()
net_end(&new_thd->net);
pthread_mutex_lock(&LOCK_thread_count);
thread_count--;
thread_running--;
delete new_thd;
pthread_cond_broadcast(&COND_thread_count);
pthread_mutex_unlock(&LOCK_thread_count);
my_atomic_rwlock_wrlock(&global_query_id_lock);
dec_thread_running();
my_atomic_rwlock_wrunlock(&global_query_id_lock);
}
end:
UNLOCK_DATA();
......@@ -551,10 +556,12 @@ Event_scheduler::execute_top(Event_queue_element_for_exec *event_name)
net_end(&new_thd->net);
pthread_mutex_lock(&LOCK_thread_count);
thread_count--;
thread_running--;
delete new_thd;
pthread_cond_broadcast(&COND_thread_count);
pthread_mutex_unlock(&LOCK_thread_count);
my_atomic_rwlock_wrlock(&global_query_id_lock);
dec_thread_running();
my_atomic_rwlock_wrunlock(&global_query_id_lock);
}
delete event_name;
DBUG_RETURN(TRUE);
......
......@@ -3056,9 +3056,9 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli,
{
thd->set_time((time_t)when);
thd->set_query((char*)query_arg, q_len_arg);
VOID(pthread_mutex_lock(&LOCK_thread_count));
my_atomic_rwlock_wrlock(&global_query_id_lock);
thd->query_id = next_query_id();
VOID(pthread_mutex_unlock(&LOCK_thread_count));
my_atomic_rwlock_wrunlock(&global_query_id_lock);
thd->variables.pseudo_thread_id= thread_id; // for temp tables
DBUG_PRINT("query",("%s", thd->query()));
......@@ -4581,9 +4581,9 @@ int Load_log_event::do_apply_event(NET* net, Relay_log_info const *rli,
if (rpl_filter->db_ok(thd->db))
{
thd->set_time((time_t)when);
VOID(pthread_mutex_lock(&LOCK_thread_count));
my_atomic_rwlock_wrlock(&global_query_id_lock);
thd->query_id = next_query_id();
VOID(pthread_mutex_unlock(&LOCK_thread_count));
my_atomic_rwlock_wrunlock(&global_query_id_lock);
thd->warning_info->opt_clear_warning_info(thd->query_id);
TABLE_LIST tables;
......@@ -8072,9 +8072,9 @@ int Table_map_log_event::do_apply_event(Relay_log_info const *rli)
DBUG_ASSERT(rli->sql_thd == thd);
/* Step the query id to mark what columns that are actually used. */
pthread_mutex_lock(&LOCK_thread_count);
my_atomic_rwlock_wrlock(&global_query_id_lock);
thd->query_id= next_query_id();
pthread_mutex_unlock(&LOCK_thread_count);
my_atomic_rwlock_wrunlock(&global_query_id_lock);
if (!(memory= my_multi_malloc(MYF(MY_WME),
&table_list, (uint) sizeof(RPL_TABLE_LIST),
......
......@@ -53,6 +53,7 @@
#include "sql_array.h"
#include "sql_plugin.h"
#include "scheduler.h"
#include <my_atomic.h>
class Parser_state;
......@@ -85,11 +86,49 @@ typedef ulong nesting_map; /* Used for flags of nesting constructs */
typedef ulonglong nested_join_map;
/* query_id */
typedef ulonglong query_id_t;
typedef int64 query_id_t;
extern query_id_t global_query_id;
extern int32 thread_running;
extern my_atomic_rwlock_t global_query_id_lock;
/* increment query_id and return it. */
inline query_id_t next_query_id() { return global_query_id++; }
inline query_id_t next_query_id()
{
query_id_t id;
id= my_atomic_add64(&global_query_id, 1);
return (id+1);
}
inline query_id_t get_query_id()
{
query_id_t id;
id= my_atomic_load64(&global_query_id);
return id;
}
inline int32
inc_thread_running()
{
int32 num_thread_running;
num_thread_running= my_atomic_add32(&thread_running, 1);
return (num_thread_running+1);
}
inline int32
dec_thread_running()
{
int32 num_thread_running;
num_thread_running= my_atomic_add32(&thread_running, -1);
return (num_thread_running-1);
}
inline int32
get_thread_running()
{
int32 num_thread_running;
num_thread_running= my_atomic_load32(&thread_running);
return num_thread_running;
}
/* useful constants */
extern MYSQL_PLUGIN_IMPORT const key_map key_map_empty;
......@@ -1940,7 +1979,7 @@ extern bool opt_ignore_builtin_innodb;
extern my_bool opt_character_set_client_handshake;
extern bool volatile abort_loop, shutdown_in_progress;
extern bool in_bootstrap;
extern uint volatile thread_count, thread_running, global_read_lock;
extern uint volatile thread_count, global_read_lock;
extern uint connection_count;
extern my_bool opt_sql_bin_update, opt_safe_user_create, opt_no_mix_types;
extern my_bool opt_safe_show_db, opt_local_infile, opt_myisam_use_mmap;
......
......@@ -531,7 +531,8 @@ uint mysqld_port_timeout;
uint delay_key_write_options, protocol_version;
uint lower_case_table_names;
uint tc_heuristic_recover= 0;
uint volatile thread_count, thread_running;
uint volatile thread_count;
int32 thread_running;
ulonglong thd_startup_options;
ulong back_log, connect_timeout, concurrency, server_id;
ulong table_cache_size, table_def_size;
......@@ -547,6 +548,7 @@ ulonglong max_binlog_cache_size=0;
ulong query_cache_size=0;
ulong refresh_version; /* Increments on each reload */
query_id_t global_query_id;
my_atomic_rwlock_t global_query_id_lock;
ulong aborted_threads, aborted_connects;
ulong delayed_insert_timeout, delayed_insert_limit, delayed_queue_size;
ulong delayed_insert_threads, delayed_insert_writes, delayed_rows_in_use;
......@@ -1380,6 +1382,7 @@ void clean_up(bool print_message)
DBUG_PRINT("quit", ("Error messages freed"));
/* Tell main we are ready */
logger.cleanup_end();
my_atomic_rwlock_destroy(&global_query_id_lock);
(void) pthread_mutex_lock(&LOCK_thread_count);
DBUG_PRINT("quit", ("got thread count lock"));
ready_to_exit=1;
......@@ -7795,6 +7798,7 @@ static int mysql_init_variables(void)
what_to_log= ~ (1L << (uint) COM_TIME);
refresh_version= 1L; /* Increments on each reload */
global_query_id= thread_id= 1L;
my_atomic_rwlock_init(&global_query_id_lock);
strmov(server_version, MYSQL_SERVER_VERSION);
myisam_recover_options_str= sql_mode_str= "OFF";
myisam_stats_method_str= "nulls_unequal";
......
......@@ -2736,9 +2736,9 @@ sp_lex_keeper::reset_lex_and_exec_core(THD *thd, uint *nextp,
*/
thd->lex= m_lex;
VOID(pthread_mutex_lock(&LOCK_thread_count));
my_atomic_rwlock_wrlock(&global_query_id_lock);
thd->query_id= next_query_id();
VOID(pthread_mutex_unlock(&LOCK_thread_count));
my_atomic_rwlock_wrunlock(&global_query_id_lock);
if (thd->prelocked_mode == NON_PRELOCKED)
{
......
......@@ -495,7 +495,9 @@ static void handle_bootstrap_impl(THD *thd)
We don't need to obtain LOCK_thread_count here because in bootstrap
mode we have only one thread.
*/
my_atomic_rwlock_wrlock(&global_query_id_lock);
thd->query_id=next_query_id();
my_atomic_rwlock_wrunlock(&global_query_id_lock);
thd->set_time();
mysql_parse(thd, thd->query(), length, & found_semicolon);
close_thread_tables(thd); // Free tables
......@@ -989,29 +991,30 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
thd->enable_slow_log= TRUE;
thd->lex->sql_command= SQLCOM_END; /* to avoid confusing VIEW detectors */
thd->set_time();
VOID(pthread_mutex_lock(&LOCK_thread_count));
thd->query_id= global_query_id;
switch( command ) {
/* Ignore these statements. */
case COM_STATISTICS:
case COM_PING:
break;
/* Only increase id on these statements but don't count them. */
case COM_STMT_PREPARE:
case COM_STMT_CLOSE:
case COM_STMT_RESET:
next_query_id();
break;
/* Increase id and count all other statements. */
default:
statistic_increment(thd->status_var.questions, &LOCK_status);
next_query_id();
my_atomic_rwlock_wrlock(&global_query_id_lock);
{
query_id_t query_id;
switch( command ) {
/* Ignore these statements. */
case COM_STATISTICS:
case COM_PING:
query_id= get_query_id();
break;
/* Only increase id on these statements but don't count them. */
case COM_STMT_PREPARE:
case COM_STMT_CLOSE:
case COM_STMT_RESET:
query_id= next_query_id() - 1;
break;
/* Increase id and count all other statements. */
default:
statistic_increment(thd->status_var.questions, &LOCK_status);
query_id= next_query_id() - 1;
}
thd->query_id= query_id;
}
thread_running++;
/* TODO: set thd->lex->sql_command to SQLCOM_END here */
VOID(pthread_mutex_unlock(&LOCK_thread_count));
inc_thread_running();
my_atomic_rwlock_wrunlock(&global_query_id_lock);
/**
Clear the set of flags that are expected to be cleared at the
......@@ -1277,15 +1280,15 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
(char *) thd->security_ctx->host_or_ip);
thd->set_query(beginning_of_next_stmt, length);
VOID(pthread_mutex_lock(&LOCK_thread_count));
/*
Count each statement from the client.
*/
statistic_increment(thd->status_var.questions, &LOCK_status);
my_atomic_rwlock_wrlock(&global_query_id_lock);
thd->query_id= next_query_id();
my_atomic_rwlock_wrunlock(&global_query_id_lock);
thd->set_time(); /* Reset the query start time. */
/* TODO: set thd->lex->sql_command to SQLCOM_END here */
VOID(pthread_mutex_unlock(&LOCK_thread_count));
mysql_parse(thd, beginning_of_next_stmt, length, &end_of_stmt);
}
......@@ -1601,9 +1604,9 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
thd_proc_info(thd, "cleaning up");
thd->set_query(NULL, 0);
thd->command=COM_SLEEP;
VOID(pthread_mutex_lock(&LOCK_thread_count)); // For process list
thread_running--;
VOID(pthread_mutex_unlock(&LOCK_thread_count));
my_atomic_rwlock_wrlock(&global_query_id_lock);
dec_thread_running();
my_atomic_rwlock_wrunlock(&global_query_id_lock);
thd_proc_info(thd, 0);
thd->packet.shrink(thd->variables.net_buffer_length); // Reclaim some memory
free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
......
......@@ -16,6 +16,8 @@
AM_CPPFLAGS = @ZLIB_INCLUDES@ -I$(top_builddir)/include
AM_CPPFLAGS += -I$(top_srcdir)/include -I$(top_srcdir)/unittest/mytap
noinst_HEADERS = thr_template.c
LDADD = $(top_builddir)/unittest/mytap/libmytap.a \
$(top_builddir)/mysys/libmysys.a \
$(top_builddir)/dbug/libdbug.a \
......
/* Copyright (C) 2006 MySQL AB
/* Copyright (C) 2006-2008 MySQL AB, 2008 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
......@@ -13,10 +13,7 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <my_global.h>
#include <my_sys.h>
#include <my_atomic.h>
#include <tap.h>
#include "thr_template.c"
/* at least gcc 3.4.5 and 3.4.6 (but not 3.2.3) on RHEL */
#if __GNUC__ == 3 && __GNUC_MINOR__ == 4
......@@ -25,181 +22,162 @@
#define GCC_BUG_WORKAROUND
#endif
int32 a32,b32,c32;
volatile uint32 b32;
volatile int32 c32;
my_atomic_rwlock_t rwl;
pthread_attr_t thr_attr;
pthread_mutex_t mutex;
pthread_cond_t cond;
int N;
/* add and sub a random number in a loop. Must get 0 at the end */
pthread_handler_t test_atomic_add_handler(void *arg)
pthread_handler_t test_atomic_add(void *arg)
{
int m=*(int *)arg;
int m= (*(int *)arg)/2;
GCC_BUG_WORKAROUND int32 x;
for (x=((int)((long)(&m))); m ; m--)
for (x= ((int)(intptr)(&m)); m ; m--)
{
x= (x*m+0x87654321) & INT_MAX32;
my_atomic_rwlock_wrlock(&rwl);
my_atomic_add32(&bad, x);
my_atomic_rwlock_wrunlock(&rwl);
my_atomic_rwlock_wrlock(&rwl);
my_atomic_add32(&bad, -x);
my_atomic_rwlock_wrunlock(&rwl);
}
pthread_mutex_lock(&mutex);
if (!--running_threads) pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
return 0;
}
volatile int64 a64;
/* add and sub a random number in a loop. Must get 0 at the end */
pthread_handler_t test_atomic_add64(void *arg)
{
int m= (*(int *)arg)/2;
GCC_BUG_WORKAROUND int64 x;
for (x= ((int64)(intptr)(&m)); m ; m--)
{
x=x*m+0x87654321;
x= (x*m+0xfdecba987654321LL) & INT_MAX64;
my_atomic_rwlock_wrlock(&rwl);
my_atomic_add32(&a32, x);
my_atomic_add64(&a64, x);
my_atomic_rwlock_wrunlock(&rwl);
my_atomic_rwlock_wrlock(&rwl);
my_atomic_add32(&a32, -x);
my_atomic_add64(&a64, -x);
my_atomic_rwlock_wrunlock(&rwl);
}
pthread_mutex_lock(&mutex);
N--;
if (!N) pthread_cond_signal(&cond);
if (!--running_threads)
{
bad= (a64 != 0);
pthread_cond_signal(&cond);
}
pthread_mutex_unlock(&mutex);
return 0;
}
/*
1. generate thread number 0..N-1 from b32
2. add it to a32
2. add it to bad
3. swap thread numbers in c32
4. (optionally) one more swap to avoid 0 as a result
5. subtract result from a32
must get 0 in a32 at the end
5. subtract result from bad
must get 0 in bad at the end
*/
pthread_handler_t test_atomic_swap_handler(void *arg)
pthread_handler_t test_atomic_fas(void *arg)
{
int m=*(int *)arg;
int32 x;
int m= *(int *)arg;
int32 x;
my_atomic_rwlock_wrlock(&rwl);
x=my_atomic_add32(&b32, 1);
x= my_atomic_add32(&b32, 1);
my_atomic_rwlock_wrunlock(&rwl);
my_atomic_rwlock_wrlock(&rwl);
my_atomic_add32(&a32, x);
my_atomic_add32(&bad, x);
my_atomic_rwlock_wrunlock(&rwl);
for (; m ; m--)
{
my_atomic_rwlock_wrlock(&rwl);
x=my_atomic_swap32(&c32, x);
x= my_atomic_fas32(&c32, x);
my_atomic_rwlock_wrunlock(&rwl);
}
if (!x)
{
my_atomic_rwlock_wrlock(&rwl);
x=my_atomic_swap32(&c32, x);
x= my_atomic_fas32(&c32, x);
my_atomic_rwlock_wrunlock(&rwl);
}
my_atomic_rwlock_wrlock(&rwl);
my_atomic_add32(&a32, -x);
my_atomic_add32(&bad, -x);
my_atomic_rwlock_wrunlock(&rwl);
pthread_mutex_lock(&mutex);
N--;
if (!N) pthread_cond_signal(&cond);
if (!--running_threads) pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
return 0;
}
/*
same as test_atomic_add_handler, but my_atomic_add32 is emulated with
(slower) my_atomic_cas32
same as test_atomic_add, but my_atomic_add32 is emulated with
my_atomic_cas32 - notice that the slowdown is proportional to the
number of CPUs
*/
pthread_handler_t test_atomic_cas_handler(void *arg)
pthread_handler_t test_atomic_cas(void *arg)
{
int m=*(int *)arg, ok;
GCC_BUG_WORKAROUND int32 x,y;
for (x=((int)((long)(&m))); m ; m--)
int m= (*(int *)arg)/2, ok= 0;
GCC_BUG_WORKAROUND int32 x, y;
for (x= ((int)(intptr)(&m)); m ; m--)
{
my_atomic_rwlock_wrlock(&rwl);
y=my_atomic_load32(&a32);
y= my_atomic_load32(&bad);
my_atomic_rwlock_wrunlock(&rwl);
x=x*m+0x87654321;
x= (x*m+0x87654321) & INT_MAX32;
do {
my_atomic_rwlock_wrlock(&rwl);
ok=my_atomic_cas32(&a32, &y, y+x);
ok= my_atomic_cas32(&bad, &y, (uint32)y+x);
my_atomic_rwlock_wrunlock(&rwl);
} while (!ok);
} while (!ok) ;
do {
my_atomic_rwlock_wrlock(&rwl);
ok=my_atomic_cas32(&a32, &y, y-x);
ok= my_atomic_cas32(&bad, &y, y-x);
my_atomic_rwlock_wrunlock(&rwl);
} while (!ok);
} while (!ok) ;
}
pthread_mutex_lock(&mutex);
N--;
if (!N) pthread_cond_signal(&cond);
if (!--running_threads) pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
return 0;
}
void test_atomic(const char *test, pthread_handler handler, int n, int m)
{
pthread_t t;
ulonglong now=my_getsystime();
a32= 0;
b32= 0;
c32= 0;
diag("Testing %s with %d threads, %d iterations... ", test, n, m);
for (N=n ; n ; n--)
{
if (pthread_create(&t, &thr_attr, handler, &m) != 0)
{
diag("Could not create thread");
a32= 1;
goto err;
}
}
pthread_mutex_lock(&mutex);
while (N)
pthread_cond_wait(&cond, &mutex);
pthread_mutex_unlock(&mutex);
now=my_getsystime()-now;
err:
ok(a32 == 0, "tested %s in %g secs", test, ((double)now)/1e7);
}
int main()
void do_tests()
{
int err;
MY_INIT("my_atomic-t.c");
diag("N CPUs: %d", my_getncpus());
err= my_atomic_initialize();
plan(6);
plan(4);
ok(err == 0, "my_atomic_initialize() returned %d", err);
bad= my_atomic_initialize();
ok(!bad, "my_atomic_initialize() returned %d", bad);
pthread_attr_init(&thr_attr);
pthread_attr_setdetachstate(&thr_attr,PTHREAD_CREATE_DETACHED);
pthread_mutex_init(&mutex, 0);
pthread_cond_init(&cond, 0);
my_atomic_rwlock_init(&rwl);
#ifdef HPUX11
#define CYCLES 1000
#else
#define CYCLES 10000
#endif
#define THREADS 100
test_atomic("my_atomic_add32", test_atomic_add_handler, THREADS, CYCLES);
test_atomic("my_atomic_swap32", test_atomic_swap_handler, THREADS, CYCLES);
test_atomic("my_atomic_cas32", test_atomic_cas_handler, THREADS, CYCLES);
/*
workaround until we know why it crashes randomly on some machine
(BUG#22320).
*/
sleep(2);
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
pthread_attr_destroy(&thr_attr);
b32= c32= 0;
test_concurrently("my_atomic_add32", test_atomic_add, THREADS, CYCLES);
b32= c32= 0;
test_concurrently("my_atomic_fas32", test_atomic_fas, THREADS, CYCLES);
b32= c32= 0;
test_concurrently("my_atomic_cas32", test_atomic_cas, THREADS, CYCLES);
{
int64 b=0x1000200030004000LL;
a64=0;
my_atomic_add64(&a64, b);
ok(a64==b, "add64");
}
a64=0;
test_concurrently("my_atomic_add64", test_atomic_add64, THREADS, CYCLES);
my_atomic_rwlock_destroy(&rwl);
return exit_status();
}
/* Copyright (C) 2006-2008 MySQL AB, 2008 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <my_global.h>
#include <my_sys.h>
#include <my_atomic.h>
#include <tap.h>
volatile uint32 bad;
pthread_attr_t thr_attr;
pthread_mutex_t mutex;
pthread_cond_t cond;
uint running_threads;
void do_tests();
void test_concurrently(const char *test, pthread_handler handler, int n, int m)
{
pthread_t t;
ulonglong now= my_getsystime();
bad= 0;
diag("Testing %s with %d threads, %d iterations... ", test, n, m);
for (running_threads= n ; n ; n--)
{
if (pthread_create(&t, &thr_attr, handler, &m) != 0)
{
diag("Could not create thread");
abort();
}
}
pthread_mutex_lock(&mutex);
while (running_threads)
pthread_cond_wait(&cond, &mutex);
pthread_mutex_unlock(&mutex);
now= my_getsystime()-now;
ok(!bad, "tested %s in %g secs (%d)", test, ((double)now)/1e7, bad);
}
int main(int argc __attribute__((unused)), char **argv)
{
MY_INIT("thd_template");
if (argv[1] && *argv[1])
DBUG_SET_INITIAL(argv[1]);
pthread_mutex_init(&mutex, 0);
pthread_cond_init(&cond, 0);
pthread_attr_init(&thr_attr);
pthread_attr_setdetachstate(&thr_attr,PTHREAD_CREATE_DETACHED);
#ifdef MY_ATOMIC_MODE_RWLOCKS
#if defined(HPUX11) || defined(__POWERPC__) /* showed to be very slow (scheduler-related) */
#define CYCLES 300
#else
#define CYCLES 3000
#endif
#else
#define CYCLES 3000
#endif
#define THREADS 30
diag("N CPUs: %d, atomic ops: %s", my_getncpus(), MY_ATOMIC_MODE);
do_tests();
/*
workaround until we know why it crashes randomly on some machine
(BUG#22320).
*/
sleep(2);
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
pthread_attr_destroy(&thr_attr);
my_end(0);
return exit_status();
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment