Commit 09a7a73b authored by unknown's avatar unknown

push for trnman review

(lockmanager still fails unit tests)


BitKeeper/deleted/.del-Makefile.am~4375ae3d4de2bdf0:
  Delete: unittest/maria/Makefile.am
configure.in:
  silence up configure warnings, don't generate unittest/maria/Makefile
include/atomic/nolock.h:
  s/LOCK/LOCK_prefix/
include/atomic/x86-gcc.h:
  s/LOCK/LOCK_prefix/
include/atomic/x86-msvc.h:
  s/LOCK/LOCK_prefix/
include/lf.h:
  pin asserts, renames
include/my_atomic.h:
  move cleanup
include/my_bit.h:
  s/uint/uint32/
mysys/lf_dynarray.c:
  style fixes, split for() in two, remove if()s
mysys/lf_hash.c:
  renames, minor fixes
mysys/my_atomic.c:
  run-time assert -> compile-time assert
storage/maria/Makefile.am:
  lockman here
storage/maria/unittest/Makefile.am:
  new unit tests
storage/maria/unittest/trnman-t.c:
  lots of changes
storage/maria/lockman.c:
  many changes:
  second meaning of "blocker"
  portability: s/gettimeofday/my_getsystime/
  move mutex/cond out of LOCK_OWNER - it creates a race condition
  that will be fixed in a separate changeset
  increment lm->count for every element, not only for distinct ones -
  because we cannot decrease it for distinct elements only :(
storage/maria/lockman.h:
  move mutex/cond out of LOCK_OWNER
storage/maria/trnman.c:
  move mutex/cond out of LOCK_OWNER
  atomic-ops to access short_trid_to_trn[]
storage/maria/trnman.h:
  move mutex/cond out of LOCK_OWNER
storage/maria/unittest/lockman-t.c:
  unit stress test
parent f9f4583f
......@@ -2260,9 +2260,9 @@ AC_ARG_WITH(man,
if test "$with_man" = "yes"
then
man_dirs="man"
man1_files=`ls -1 $srcdir/man/*.1 | sed -e 's;^.*man/;;'`
man1_files=`ls -1 $srcdir/man/*.1 2>/dev/null| sed -e 's;^.*man/;;'`
man1_files=`echo $man1_files`
man8_files=`ls -1 $srcdir/man/*.8 | sed -e 's;^.*man/;;'`
man8_files=`ls -1 $srcdir/man/*.8 2>/dev/null| sed -e 's;^.*man/;;'`
man8_files=`echo $man8_files`
else
man_dirs=""
......@@ -2481,7 +2481,7 @@ AC_SUBST(MAKE_BINARY_DISTRIBUTION_OPTIONS)
# Output results
AC_CONFIG_FILES(Makefile extra/Makefile mysys/Makefile dnl
unittest/Makefile unittest/mytap/Makefile unittest/mytap/t/Makefile dnl
unittest/mysys/Makefile unittest/examples/Makefile unittest/maria/Makefile dnl
unittest/mysys/Makefile unittest/examples/Makefile dnl
strings/Makefile regex/Makefile storage/Makefile dnl
man/Makefile BUILD/Makefile vio/Makefile dnl
libmysql/Makefile client/Makefile dnl
......
......@@ -17,9 +17,9 @@
#if defined(__i386__) || defined(_M_IX86) || defined(__x86_64__)
# ifdef MY_ATOMIC_MODE_DUMMY
# define LOCK ""
# define LOCK_prefix ""
# else
# define LOCK "lock"
# define LOCK_prefix "lock"
# endif
# ifdef __GNUC__
......
......@@ -22,15 +22,15 @@
#ifdef __x86_64__
# ifdef MY_ATOMIC_NO_XADD
# define MY_ATOMIC_MODE "gcc-amd64" LOCK "-no-xadd"
# define MY_ATOMIC_MODE "gcc-amd64" LOCK_prefix "-no-xadd"
# else
# define MY_ATOMIC_MODE "gcc-amd64" LOCK
# define MY_ATOMIC_MODE "gcc-amd64" LOCK_prefix
# endif
#else
# ifdef MY_ATOMIC_NO_XADD
# define MY_ATOMIC_MODE "gcc-x86" LOCK "-no-xadd"
# define MY_ATOMIC_MODE "gcc-x86" LOCK_prefix "-no-xadd"
# else
# define MY_ATOMIC_MODE "gcc-x86" LOCK
# define MY_ATOMIC_MODE "gcc-x86" LOCK_prefix
# endif
#endif
......@@ -41,12 +41,12 @@
#ifndef MY_ATOMIC_NO_XADD
#define make_atomic_add_body(S) \
asm volatile (LOCK "; xadd %0, %1;" : "+r" (v) , "+m" (*a))
asm volatile (LOCK_prefix "; xadd %0, %1;" : "+r" (v) , "+m" (*a))
#endif
#define make_atomic_swap_body(S) \
asm volatile ("; xchg %0, %1;" : "+r" (v) , "+m" (*a))
asm volatile ("xchg %0, %1;" : "+r" (v) , "+m" (*a))
#define make_atomic_cas_body(S) \
asm volatile (LOCK "; cmpxchg %3, %0; setz %2;" \
asm volatile (LOCK_prefix "; cmpxchg %3, %0; setz %2;" \
: "+m" (*a), "+a" (*cmp), "=q" (ret): "r" (set))
#ifdef MY_ATOMIC_MODE_DUMMY
......@@ -55,11 +55,11 @@
#else
/*
Actually 32-bit reads/writes are always atomic on x86
But we add LOCK here anyway to force memory barriers
But we add LOCK_prefix here anyway to force memory barriers
*/
#define make_atomic_load_body(S) \
ret=0; \
asm volatile (LOCK "; cmpxchg %2, %0" \
asm volatile (LOCK_prefix "; cmpxchg %2, %0" \
: "+m" (*a), "+a" (ret): "r" (ret))
#define make_atomic_store_body(S) \
asm volatile ("; xchg %0, %1;" : "+m" (*a), "+r" (v))
......
......@@ -26,19 +26,19 @@
#ifndef _atomic_h_cleanup_
#define _atomic_h_cleanup_ "atomic/x86-msvc.h"
#define MY_ATOMIC_MODE "msvc-x86" LOCK
#define MY_ATOMIC_MODE "msvc-x86" LOCK_prefix
#define make_atomic_add_body(S) \
_asm { \
_asm mov reg_ ## S, v \
_asm LOCK xadd *a, reg_ ## S \
_asm LOCK_prefix xadd *a, reg_ ## S \
_asm movzx v, reg_ ## S \
}
#define make_atomic_cas_body(S) \
_asm { \
_asm mov areg_ ## S, *cmp \
_asm mov reg2_ ## S, set \
_asm LOCK cmpxchg *a, reg2_ ## S \
_asm LOCK_prefix cmpxchg *a, reg2_ ## S \
_asm mov *cmp, areg_ ## S \
_asm setz al \
_asm movzx ret, al \
......@@ -56,13 +56,13 @@
#else
/*
Actually 32-bit reads/writes are always atomic on x86
But we add LOCK here anyway to force memory barriers
But we add LOCK_prefix here anyway to force memory barriers
*/
#define make_atomic_load_body(S) \
_asm { \
_asm mov areg_ ## S, 0 \
_asm mov reg2_ ## S, areg_ ## S \
_asm LOCK cmpxchg *a, reg2_ ## S \
_asm LOCK_prefix cmpxchg *a, reg2_ ## S \
_asm mov ret, areg_ ## S \
}
#define make_atomic_store_body(S) \
......
......@@ -88,8 +88,8 @@ nolock_wrap(lf_dynarray_iterate, int,
pin manager for memory allocator
*/
#define LF_PINBOX_PINS 3
#define LF_PURGATORY_SIZE 11
#define LF_PINBOX_PINS 4
#define LF_PURGATORY_SIZE 10
typedef void lf_pinbox_free_func(void *, void *);
......@@ -112,9 +112,9 @@ typedef struct {
-sizeof(void *)*(LF_PINBOX_PINS+LF_PURGATORY_SIZE+1)];
} LF_PINS;
#define lf_lock_by_pins(PINS) \
#define lf_rwlock_by_pins(PINS) \
my_atomic_rwlock_wrlock(&(PINS)->pinbox->pinstack.lock)
#define lf_unlock_by_pins(PINS) \
#define lf_rwunlock_by_pins(PINS) \
my_atomic_rwlock_wrunlock(&(PINS)->pinbox->pinstack.lock)
/*
......@@ -139,11 +139,13 @@ typedef struct {
#define _lf_unpin(PINS, PIN) _lf_pin(PINS, PIN, NULL)
#define lf_pin(PINS, PIN, ADDR) \
do { \
lf_lock_by_pins(PINS); \
lf_rwlock_by_pins(PINS); \
_lf_pin(PINS, PIN, ADDR); \
lf_unlock_by_pins(PINS); \
lf_rwunlock_by_pins(PINS); \
} while (0)
#define lf_unpin(PINS, PIN) lf_pin(PINS, PIN, NULL)
#define _lf_assert_pin(PINS, PIN) assert((PINS)->pin[PIN] != 0)
#define _lf_assert_unpin(PINS, PIN) assert((PINS)->pin[PIN]==0)
void lf_pinbox_init(LF_PINBOX *pinbox, lf_pinbox_free_func *free_func,
void * free_func_arg);
......
......@@ -118,6 +118,11 @@ make_atomic_swap(16)
make_atomic_swap(32)
make_atomic_swap(ptr)
#ifdef _atomic_h_cleanup_
#include _atomic_h_cleanup_
#undef _atomic_h_cleanup_
#endif
#undef make_atomic_add
#undef make_atomic_cas
#undef make_atomic_load
......@@ -130,11 +135,6 @@ make_atomic_swap(ptr)
#undef make_atomic_swap_body
#undef intptr
#ifdef _atomic_h_cleanup_
#include _atomic_h_cleanup_
#undef _atomic_h_cleanup_
#endif
#ifndef LF_BACKOFF
#define LF_BACKOFF (1)
#endif
......
......@@ -88,7 +88,7 @@ STATIC_INLINE uint32 my_clear_highest_bit(uint32 v)
return v & w;
}
STATIC_INLINE uint32 my_reverse_bits(uint key)
STATIC_INLINE uint32 my_reverse_bits(uint32 key)
{
return
(_my_bits_reverse_table[ key & 255] << 24) |
......@@ -101,7 +101,7 @@ STATIC_INLINE uint32 my_reverse_bits(uint key)
extern uint my_bit_log2(ulong value);
extern uint32 my_round_up_to_next_power(uint32 v);
uint32 my_clear_highest_bit(uint32 v);
uint32 my_reverse_bits(uint key);
uint32 my_reverse_bits(uint32 key);
extern uint my_count_bits(ulonglong v);
extern uint my_count_bits_ushort(ushort v);
#endif
......@@ -38,7 +38,7 @@
void lf_dynarray_init(LF_DYNARRAY *array, uint element_size)
{
bzero(array, sizeof(*array));
array->size_of_element=element_size;
array->size_of_element= element_size;
my_atomic_rwlock_init(&array->lock);
}
......@@ -49,7 +49,7 @@ static void recursive_free(void **alloc, int level)
if (level)
{
int i;
for (i=0; i < LF_DYNARRAY_LEVEL_LENGTH; i++)
for (i= 0; i < LF_DYNARRAY_LEVEL_LENGTH; i++)
recursive_free(alloc[i], level-1);
my_free((void *)alloc, MYF(0));
}
......@@ -60,13 +60,13 @@ static void recursive_free(void **alloc, int level)
void lf_dynarray_destroy(LF_DYNARRAY *array)
{
int i;
for (i=0; i < LF_DYNARRAY_LEVELS; i++)
for (i= 0; i < LF_DYNARRAY_LEVELS; i++)
recursive_free(array->level[i], i);
my_atomic_rwlock_destroy(&array->lock);
bzero(array, sizeof(*array));
}
static const int dynarray_idxes_in_level[LF_DYNARRAY_LEVELS]=
static const int dynarray_idxes_in_prev_level[LF_DYNARRAY_LEVELS]=
{
0, /* +1 here to to avoid -1's below */
LF_DYNARRAY_LEVEL_LENGTH,
......@@ -77,41 +77,32 @@ static const int dynarray_idxes_in_level[LF_DYNARRAY_LEVELS]=
void *_lf_dynarray_lvalue(LF_DYNARRAY *array, uint idx)
{
void * ptr, * volatile * ptr_ptr=0;
void * ptr, * volatile * ptr_ptr= 0;
int i;
for (i=3; i > 0; i--)
for (i= 3; idx < dynarray_idxes_in_prev_level[i]; i--) /* no-op */;
ptr_ptr= &array->level[i];
idx-= dynarray_idxes_in_prev_level[i];
for (; i > 0; i--)
{
if (ptr_ptr || idx >= dynarray_idxes_in_level[i])
if (!(ptr= *ptr_ptr))
{
if (!ptr_ptr)
{
ptr_ptr=&array->level[i];
idx-= dynarray_idxes_in_level[i];
}
ptr=*ptr_ptr;
if (!ptr)
{
void *alloc=my_malloc(LF_DYNARRAY_LEVEL_LENGTH * sizeof(void *),
MYF(MY_WME|MY_ZEROFILL));
if (!alloc)
return(NULL);
if (my_atomic_casptr(ptr_ptr, &ptr, alloc))
ptr= alloc;
else
my_free(alloc, MYF(0));
}
ptr_ptr=((void **)ptr) + idx / dynarray_idxes_in_level[i];
idx%= dynarray_idxes_in_level[i];
void *alloc= my_malloc(LF_DYNARRAY_LEVEL_LENGTH * sizeof(void *),
MYF(MY_WME|MY_ZEROFILL));
if (!alloc)
return(NULL);
if (my_atomic_casptr(ptr_ptr, &ptr, alloc))
ptr= alloc;
else
my_free(alloc, MYF(0));
}
ptr_ptr= ((void **)ptr) + idx / dynarray_idxes_in_prev_level[i];
idx%= dynarray_idxes_in_prev_level[i];
}
if (!ptr_ptr)
ptr_ptr=&array->level[0];
ptr=*ptr_ptr;
if (!ptr)
if (!(ptr= *ptr_ptr))
{
void *alloc, *data;
alloc=my_malloc(LF_DYNARRAY_LEVEL_LENGTH * array->size_of_element +
alloc= my_malloc(LF_DYNARRAY_LEVEL_LENGTH * array->size_of_element +
max(array->size_of_element, sizeof(void *)),
MYF(MY_WME|MY_ZEROFILL));
if (!alloc)
......@@ -123,7 +114,7 @@ void *_lf_dynarray_lvalue(LF_DYNARRAY *array, uint idx)
if (mod)
data+= array->size_of_element - mod;
}
((void **)data)[-1]=alloc; /* free() will need the original pointer */
((void **)data)[-1]= alloc; /* free() will need the original pointer */
if (my_atomic_casptr(ptr_ptr, &ptr, data))
ptr= data;
else
......@@ -134,29 +125,20 @@ void *_lf_dynarray_lvalue(LF_DYNARRAY *array, uint idx)
void *_lf_dynarray_value(LF_DYNARRAY *array, uint idx)
{
void * ptr, * volatile * ptr_ptr=0;
void * ptr, * volatile * ptr_ptr= 0;
int i;
for (i=3; i > 0; i--)
for (i= 3; idx < dynarray_idxes_in_prev_level[i]; i--) /* no-op */;
ptr_ptr= &array->level[i];
idx-= dynarray_idxes_in_prev_level[i];
for (; i > 0; i--)
{
if (ptr_ptr || idx >= dynarray_idxes_in_level[i])
{
if (!ptr_ptr)
{
ptr_ptr=&array->level[i];
idx-= dynarray_idxes_in_level[i];
}
ptr=*ptr_ptr;
if (!ptr)
return(NULL);
ptr_ptr=((void **)ptr) + idx / dynarray_idxes_in_level[i];
idx %= dynarray_idxes_in_level[i];
}
if (!(ptr= *ptr_ptr))
return(NULL);
ptr_ptr= ((void **)ptr) + idx / dynarray_idxes_in_prev_level[i];
idx %= dynarray_idxes_in_prev_level[i];
}
if (!ptr_ptr)
ptr_ptr=&array->level[0];
ptr=*ptr_ptr;
if (!ptr)
if (!(ptr= *ptr_ptr))
return(NULL);
return ptr + array->size_of_element * idx;
}
......@@ -169,8 +151,8 @@ static int recursive_iterate(LF_DYNARRAY *array, void *ptr, int level,
return 0;
if (!level)
return func(ptr, arg);
for (i=0; i < LF_DYNARRAY_LEVEL_LENGTH; i++)
if ((res=recursive_iterate(array, ((void **)ptr)[i], level-1, func, arg)))
for (i= 0; i < LF_DYNARRAY_LEVEL_LENGTH; i++)
if ((res= recursive_iterate(array, ((void **)ptr)[i], level-1, func, arg)))
return res;
return 0;
}
......@@ -178,8 +160,8 @@ static int recursive_iterate(LF_DYNARRAY *array, void *ptr, int level,
int _lf_dynarray_iterate(LF_DYNARRAY *array, lf_dynarray_func func, void *arg)
{
int i, res;
for (i=0; i < LF_DYNARRAY_LEVELS; i++)
if ((res=recursive_iterate(array, array->level[i], i, func, arg)))
for (i= 0; i < LF_DYNARRAY_LEVELS; i++)
if ((res= recursive_iterate(array, array->level[i], i, func, arg)))
return res;
return 0;
}
......
......@@ -19,6 +19,7 @@
TODO
try to get rid of dummy nodes ?
for non-unique hash, count only _distinct_ values
*/
#include <my_global.h>
#include <my_sys.h>
......@@ -51,7 +52,7 @@ typedef struct {
cursor is positioned in either case
pins[0..2] are used, they are NOT removed on return
*/
static int lfind(intptr volatile *head, uint32 hashnr,
static int lfind(LF_SLIST * volatile *head, uint32 hashnr,
const uchar *key, uint keylen, CURSOR *cursor, LF_PINS *pins)
{
uint32 cur_hashnr;
......@@ -60,7 +61,7 @@ static int lfind(intptr volatile *head, uint32 hashnr,
intptr link;
retry:
cursor->prev=head;
cursor->prev=(intptr *)head;
do {
cursor->curr=PTR(*cursor->prev);
_lf_pin(pins,1,cursor->curr);
......@@ -112,7 +113,7 @@ static int lfind(intptr volatile *head, uint32 hashnr,
/*
RETURN
0 - inserted
not 0 - a pointer to a conflict
not 0 - a pointer to a conflict (not pinned and thus unusable)
NOTE
it uses pins[0..2], on return all pins are removed.
......@@ -125,17 +126,17 @@ static LF_SLIST *linsert(LF_SLIST * volatile *head, LF_SLIST *node,
do
{
if (lfind((intptr*)head, node->hashnr, node->key, node->keylen,
if (lfind(head, node->hashnr, node->key, node->keylen,
&cursor, pins) &&
(flags & LF_HASH_UNIQUE))
res=0;
res=0; /* duplicate found */
else
{
node->link=(intptr)cursor.curr;
assert(node->link != (intptr)node);
assert(cursor.prev != &node->link);
if (my_atomic_casptr((void **)cursor.prev, (void **)&cursor.curr, node))
res=1;
res=1; /* inserted ok */
}
} while (res == -1);
_lf_unpin(pins, 0);
......@@ -159,7 +160,7 @@ static int ldelete(LF_SLIST * volatile *head, uint32 hashnr,
do
{
if (!lfind((intptr *)head, hashnr, key, keylen, &cursor, pins))
if (!lfind(head, hashnr, key, keylen, &cursor, pins))
res= 1;
else
if (my_atomic_casptr((void **)&(cursor.curr->link),
......@@ -169,7 +170,7 @@ static int ldelete(LF_SLIST * volatile *head, uint32 hashnr,
(void **)&cursor.curr, cursor.next))
_lf_alloc_free(pins, cursor.curr);
else
lfind((intptr *)head, hashnr, key, keylen, &cursor, pins);
lfind(head, hashnr, key, keylen, &cursor, pins);
res= 0;
}
} while (res == -1);
......@@ -191,7 +192,7 @@ static LF_SLIST *lsearch(LF_SLIST * volatile *head, uint32 hashnr,
const uchar *key, uint keylen, LF_PINS *pins)
{
CURSOR cursor;
int res=lfind((intptr *)head, hashnr, key, keylen, &cursor, pins);
int res=lfind(head, hashnr, key, keylen, &cursor, pins);
if (res) _lf_pin(pins, 2, cursor.curr);
_lf_unpin(pins, 0);
_lf_unpin(pins, 1);
......@@ -214,7 +215,7 @@ static inline uint calc_hash(LF_HASH *hash, const uchar *key, uint keylen)
return nr1 & INT_MAX32;
}
#define MAX_LOAD 1
#define MAX_LOAD 1.0
static void initialize_bucket(LF_HASH *, LF_SLIST * volatile*, uint, LF_PINS *);
void lf_hash_init(LF_HASH *hash, uint element_size, uint flags,
......@@ -262,7 +263,7 @@ int lf_hash_insert(LF_HASH *hash, LF_PINS *pins, const void *data)
uint csize, bucket, hashnr;
LF_SLIST *node, * volatile *el;
lf_lock_by_pins(pins);
lf_rwlock_by_pins(pins);
node=(LF_SLIST *)_lf_alloc_new(pins);
memcpy(node+1, data, hash->element_size);
node->key= hash_key(hash, (uchar *)(node+1), &node->keylen);
......@@ -275,13 +276,13 @@ int lf_hash_insert(LF_HASH *hash, LF_PINS *pins, const void *data)
if (linsert(el, node, pins, hash->flags))
{
_lf_alloc_free(pins, node);
lf_unlock_by_pins(pins);
lf_rwunlock_by_pins(pins);
return 1;
}
csize= hash->size;
if ((my_atomic_add32(&hash->count, 1)+1.0) / csize > MAX_LOAD)
my_atomic_cas32(&hash->size, &csize, csize*2);
lf_unlock_by_pins(pins);
lf_rwunlock_by_pins(pins);
return 0;
}
......@@ -298,17 +299,17 @@ int lf_hash_delete(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen)
uint bucket, hashnr=calc_hash(hash, (uchar *)key, keylen);
bucket= hashnr % hash->size;
lf_lock_by_pins(pins);
lf_rwlock_by_pins(pins);
el=_lf_dynarray_lvalue(&hash->array, bucket);
if (*el == NULL)
initialize_bucket(hash, el, bucket, pins);
if (ldelete(el, my_reverse_bits(hashnr) | 1, (uchar *)key, keylen, pins))
{
lf_unlock_by_pins(pins);
lf_rwunlock_by_pins(pins);
return 1;
}
my_atomic_add32(&hash->count, -1);
lf_unlock_by_pins(pins);
lf_rwunlock_by_pins(pins);
return 0;
}
......@@ -322,13 +323,13 @@ void *lf_hash_search(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen)
uint bucket, hashnr=calc_hash(hash, (uchar *)key, keylen);
bucket= hashnr % hash->size;
lf_lock_by_pins(pins);
lf_rwlock_by_pins(pins);
el=_lf_dynarray_lvalue(&hash->array, bucket);
if (*el == NULL)
initialize_bucket(hash, el, bucket, pins);
found= lsearch(el, my_reverse_bits(hashnr) | 1, (uchar *)key, keylen, pins);
lf_unlock_by_pins(pins);
return found+1;
lf_rwunlock_by_pins(pins);
return found ? found+1 : 0;
}
static char *dummy_key="";
......@@ -347,7 +348,7 @@ static void initialize_bucket(LF_HASH *hash, LF_SLIST * volatile *node,
dummy->keylen=0;
if ((cur= linsert(el, dummy, pins, 0)))
{
_lf_alloc_free(pins, dummy);
my_free((void *)dummy, MYF(0));
dummy= cur;
}
my_atomic_casptr((void **)node, (void **)&tmp, dummy);
......
......@@ -35,7 +35,7 @@
*/
int my_atomic_initialize()
{
DBUG_ASSERT(sizeof(intptr) == sizeof(void *));
char assert_the_size[sizeof(intptr) == sizeof(void *) ? 1 : -1];
/* currently the only thing worth checking is SMP/UP issue */
#ifdef MY_ATOMIC_MODE_DUMMY
return my_getncpus() == 1 ? MY_ATOMIC_OK : MY_ATOMIC_NOT_1CPU;
......
......@@ -53,7 +53,7 @@ maria_pack_LDADD= @CLIENT_EXTRA_LDFLAGS@ libmaria.a \
noinst_PROGRAMS = ma_test1 ma_test2 ma_test3 ma_rt_test ma_sp_test
noinst_HEADERS = maria_def.h ma_rt_index.h ma_rt_key.h ma_rt_mbr.h \
ma_sp_defs.h ma_fulltext.h ma_ftdefs.h ma_ft_test1.h \
ma_ft_eval.h trxman.h \
ma_ft_eval.h trnman.h lockman.h \
ma_control_file.h ha_maria.h
ma_test1_DEPENDENCIES= $(LIBRARIES)
ma_test1_LDADD= @CLIENT_EXTRA_LDFLAGS@ libmaria.a \
......@@ -108,7 +108,7 @@ libmaria_a_SOURCES = ma_init.c ma_open.c ma_extra.c ma_info.c ma_rkey.c \
ma_keycache.c ma_preload.c ma_ft_parser.c \
ma_ft_update.c ma_ft_boolean_search.c \
ma_ft_nlq_search.c ft_maria.c ma_sort.c \
ha_maria.cc trxman.c \
ha_maria.cc trnman.c lockman.c \
ma_rt_index.c ma_rt_key.c ma_rt_mbr.c ma_rt_split.c \
ma_sp_key.c ma_control_file.c
CLEANFILES = test?.MA? FT?.MA? isam.log ma_test_all ma_rt_test.MA? sp_test.MA?
......
This diff is collapsed.
/* Copyright (C) 2000 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef _lockman_h
#define _lockman_h
/*
N - "no lock", not a lock, used sometimes to simplify the code
S - Shared
X - eXclusive
IS - Intention Shared
IX - Intention eXclusive
SIX - Shared + Intention eXclusive
LS - Loose Shared
LX - Loose eXclusive
SLX - Shared + Loose eXclusive
LSIX - Loose Shared + Intention eXclusive
*/
enum lock_type { N, S, X, IS, IX, SIX, LS, LX, SLX, LSIX };
struct lockman_lock;
typedef struct st_lock_owner LOCK_OWNER;
struct st_lock_owner {
LF_PINS *pins;
struct lockman_lock *all_locks;
LOCK_OWNER *waiting_for;
pthread_cond_t *cond; /* transactions waiting for this, wait on 'cond' */
pthread_mutex_t *mutex; /* mutex is required to use 'cond' */
uint16 loid;
};
typedef LOCK_OWNER *loid_to_lo_func(uint16);
typedef struct {
LF_DYNARRAY array; /* hash itself */
LF_ALLOCATOR alloc; /* allocator for elements */
int32 volatile size; /* size of array */
int32 volatile count; /* number of elements in the hash */
uint lock_timeout;
loid_to_lo_func *loid_to_lo;
} LOCKMAN;
enum lockman_getlock_result {
DIDNT_GET_THE_LOCK=0, GOT_THE_LOCK,
GOT_THE_LOCK_NEED_TO_LOCK_A_SUBRESOURCE,
GOT_THE_LOCK_NEED_TO_INSTANT_LOCK_A_SUBRESOURCE
};
void lockman_init(LOCKMAN *, loid_to_lo_func *, uint);
void lockman_destroy(LOCKMAN *);
enum lockman_getlock_result lockman_getlock(LOCKMAN *lm, LOCK_OWNER *lo,
uint64 resource,
enum lock_type lock);
int lockman_release_locks(LOCKMAN *, LOCK_OWNER *);
#ifdef EXTRA_DEBUG
void print_lockhash(LOCKMAN *lm);
#endif
#endif
/* Copyright (C) 2000 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <my_global.h>
#include <my_sys.h>
#include <lf.h>
#include "trnman.h"
uint trnman_active_transactions, trnman_allocated_transactions;
static TRN active_list_min, active_list_max,
committed_list_min, committed_list_max, *pool;
static pthread_mutex_t LOCK_trn_list;
static TrID global_trid_generator;
static LF_HASH trid_to_trn;
static LOCKMAN maria_lockman;
static TRN **short_trid_to_trn;
static my_atomic_rwlock_t LOCK_short_trid_to_trn, LOCK_pool;
static byte *trn_get_hash_key(const byte *trn,uint* len, my_bool unused)
{
*len= sizeof(TrID);
return (byte *) & ((*((TRN **)trn))->trid);
}
static LOCK_OWNER *trnman_short_trid_to_TRN(uint16 short_trid)
{
TRN *trn;
my_atomic_rwlock_rdlock(&LOCK_short_trid_to_trn);
trn= my_atomic_loadptr((void **)&short_trid_to_trn[short_trid]);
my_atomic_rwlock_rdunlock(&LOCK_short_trid_to_trn);
return (LOCK_OWNER *)trn;
}
int trnman_init()
{
pthread_mutex_init(&LOCK_trn_list, MY_MUTEX_INIT_FAST);
active_list_max.trid= active_list_min.trid= 0;
active_list_max.min_read_from= ~0;
active_list_max.next= active_list_min.prev= 0;
active_list_max.prev= &active_list_min;
active_list_min.next= &active_list_max;
trnman_active_transactions= 0;
trnman_allocated_transactions= 0;
committed_list_max.commit_trid= ~0;
committed_list_max.next= committed_list_min.prev= 0;
committed_list_max.prev= &committed_list_min;
committed_list_min.next= &committed_list_max;
pool= 0;
global_trid_generator= 0; /* set later by recovery code */
lf_hash_init(&trid_to_trn, sizeof(TRN*), LF_HASH_UNIQUE,
0, 0, trn_get_hash_key, 0);
my_atomic_rwlock_init(&LOCK_short_trid_to_trn);
my_atomic_rwlock_init(&LOCK_pool);
short_trid_to_trn= (TRN **)my_malloc(SHORT_TRID_MAX*sizeof(TRN*),
MYF(MY_WME|MY_ZEROFILL));
if (!short_trid_to_trn)
return 1;
short_trid_to_trn--; /* min short_trid is 1 */
lockman_init(&maria_lockman, &trnman_short_trid_to_TRN, 10000);
return 0;
}
int trnman_destroy()
{
DBUG_ASSERT(trid_to_trn.count == 0);
DBUG_ASSERT(trnman_active_transactions == 0);
DBUG_ASSERT(active_list_max.prev == &active_list_min);
DBUG_ASSERT(active_list_min.next == &active_list_max);
DBUG_ASSERT(committed_list_max.prev == &committed_list_min);
DBUG_ASSERT(committed_list_min.next == &committed_list_max);
while (pool)
{
TRN *trn= pool;
pool= pool->next;
DBUG_ASSERT(trn->locks.mutex == 0);
DBUG_ASSERT(trn->locks.cond == 0);
my_free((void *)trn, MYF(0));
}
lf_hash_destroy(&trid_to_trn);
pthread_mutex_destroy(&LOCK_trn_list);
my_atomic_rwlock_destroy(&LOCK_short_trid_to_trn);
my_atomic_rwlock_destroy(&LOCK_pool);
my_free((void *)(short_trid_to_trn+1), MYF(0));
lockman_destroy(&maria_lockman);
}
static TrID new_trid()
{
DBUG_ASSERT(global_trid_generator < 0xffffffffffffLL);
safe_mutex_assert_owner(&LOCK_trn_list);
return ++global_trid_generator;
}
static void set_short_trid(TRN *trn)
{
int i= (global_trid_generator + (intptr)trn) * 312089 % SHORT_TRID_MAX;
my_atomic_rwlock_wrlock(&LOCK_short_trid_to_trn);
for ( ; ; i= i % SHORT_TRID_MAX + 1) /* the range is [1..SHORT_TRID_MAX] */
{
void *tmp= NULL;
if (short_trid_to_trn[i] == NULL &&
my_atomic_casptr((void **)&short_trid_to_trn[i], &tmp, trn))
break;
}
my_atomic_rwlock_wrunlock(&LOCK_short_trid_to_trn);
trn->locks.loid= i;
}
TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond)
{
TRN *trn;
/*
see trnman_end_trn to see why we need a mutex here
and as we have a mutex, we can as well do everything
under it - allocating a TRN, incrementing trnman_active_transactions,
setting trn->min_read_from.
Note that all the above is fast. generating short_trid may be slow,
as it involves scanning a big array - so it's still done
outside of the mutex.
*/
pthread_mutex_lock(&LOCK_trn_list);
trnman_active_transactions++;
trn= pool;
my_atomic_rwlock_wrlock(&LOCK_pool);
while (trn && !my_atomic_casptr((void **)&pool, (void **)&trn,
(void *)trn->next))
/* no-op */;
my_atomic_rwlock_wrunlock(&LOCK_pool);
if (!trn)
{
trn= (TRN *)my_malloc(sizeof(TRN), MYF(MY_WME));
if (!trn)
{
pthread_mutex_unlock(&LOCK_trn_list);
return 0;
}
trnman_allocated_transactions++;
}
trn->min_read_from= active_list_min.next->trid;
trn->trid= new_trid();
trn->locks.loid= 0;
trn->next= &active_list_max;
trn->prev= active_list_max.prev;
active_list_max.prev= trn->prev->next= trn;
pthread_mutex_unlock(&LOCK_trn_list);
trn->pins= lf_hash_get_pins(&trid_to_trn);
if (!trn->min_read_from)
trn->min_read_from= trn->trid;
trn->locks.mutex= mutex;
trn->locks.cond= cond;
trn->commit_trid= 0;
trn->locks.waiting_for= 0;
trn->locks.all_locks= 0;
trn->locks.pins= lf_alloc_get_pins(&maria_lockman.alloc);
set_short_trid(trn); /* this must be the last! */
return trn;
}
/*
remove a trn from the active list,
move to committed list,
set commit_trid
TODO
integrate with log manager. That means:
a common "commit" mutex - forcing the log and setting commit_trid
must be done atomically (QQ how the heck it could be done with
group commit ???) XXX - why did I think it must be done atomically ?
trid_to_trn, active_list_*, and committed_list_* can be
updated asyncronously.
*/
void trnman_end_trn(TRN *trn, my_bool commit)
{
int res;
TRN *free_me= 0;
LF_PINS *pins= trn->pins;
pthread_mutex_lock(&LOCK_trn_list);
trn->next->prev= trn->prev;
trn->prev->next= trn->next;
if (trn->prev == &active_list_min)
{
TRN *t;
for (t= committed_list_min.next;
t->commit_trid < active_list_min.next->min_read_from;
t= t->next) /* no-op */;
if (t != committed_list_min.next)
{
free_me= committed_list_min.next;
committed_list_min.next= t;
t->prev->next= 0;
t->prev= &committed_list_min;
}
}
if (commit && active_list_min.next != &active_list_max)
{
trn->commit_trid= global_trid_generator;
trn->next= &committed_list_max;
trn->prev= committed_list_max.prev;
committed_list_max.prev= trn->prev->next= trn;
res= lf_hash_insert(&trid_to_trn, pins, &trn);
DBUG_ASSERT(res == 0);
}
else
{
trn->next= free_me;
free_me= trn;
}
trnman_active_transactions--;
pthread_mutex_unlock(&LOCK_trn_list);
lockman_release_locks(&maria_lockman, &trn->locks);
trn->locks.mutex= 0;
trn->locks.cond= 0;
my_atomic_rwlock_rdlock(&LOCK_short_trid_to_trn);
my_atomic_storeptr((void **)&short_trid_to_trn[trn->locks.loid], 0);
my_atomic_rwlock_rdunlock(&LOCK_short_trid_to_trn);
while (free_me) // XXX send them to the purge thread
{
int res;
TRN *t= free_me;
free_me= free_me->next;
res= lf_hash_delete(&trid_to_trn, pins, &t->trid, sizeof(TrID));
trnman_free_trn(t);
}
lf_hash_put_pins(pins);
lf_pinbox_put_pins(trn->locks.pins);
}
/*
free a trn (add to the pool, that is)
note - we can never really free() a TRN if there's at least one
other running transaction - see, e.g., how lock waits are implemented
in lockman.c
*/
void trnman_free_trn(TRN *trn)
{
TRN *tmp= pool;
my_atomic_rwlock_wrlock(&LOCK_pool);
do
{
/*
without volatile cast gcc-3.4.4 moved the assignment
down after the loop at -O2
*/
*(TRN * volatile *)&(trn->next)= tmp;
} while (!my_atomic_casptr((void **)&pool, (void **)&tmp, trn));
my_atomic_rwlock_wrunlock(&LOCK_pool);
}
/*
NOTE
here we access the hash in a lock-free manner.
It's safe, a 'found' TRN can never be freed/reused before we access it.
In fact, it cannot be freed before 'trn' ends, because a 'found' TRN
can only be removed from the hash when:
found->commit_trid < ALL (trn->min_read_from)
that is, at least
found->commit_trid < trn->min_read_from
but
found->trid >= trn->min_read_from
and
found->commit_trid > found->trid
*/
my_bool trnman_can_read_from(TRN *trn, TrID trid)
{
TRN **found;
my_bool can;
LF_REQUIRE_PINS(3);
if (trid < trn->min_read_from)
return TRUE;
if (trid > trn->trid)
return FALSE;
found= lf_hash_search(&trid_to_trn, trn->pins, &trid, sizeof(trid));
if (!found)
return FALSE; /* not in the hash of committed transactions = cannot read*/
can= (*found)->commit_trid < trn->trid;
lf_unpin(trn->pins, 2);
return can;
}
/* Copyright (C) 2000 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef _trnman_h
#define _trnman_h
#include "lockman.h"
typedef uint64 TrID; /* our TrID is 6 bytes */
typedef struct st_transaction TRN;
struct st_transaction
{
LOCK_OWNER locks;
LF_PINS *pins;
TrID trid, min_read_from, commit_trid;
TRN *next, *prev;
/* Note! if locks.loid is 0, trn is NOT initialized */
};
#define SHORT_TRID_MAX 65535
extern uint trnman_active_transactions, trnman_allocated_transactions;
int trnman_init(void);
int trnman_destroy(void);
TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond);
void trnman_end_trn(TRN *trn, my_bool commit);
#define trnman_commit_trn(T) trnman_end_trn(T, TRUE)
#define trnman_abort_trn(T) trnman_end_trn(T, FALSE)
void trnman_free_trn(TRN *trn);
my_bool trnman_can_read_from(TRN *trn, TrID trid);
#endif
#include <my_global.h>
#include <my_sys.h>
#include <lf.h>
#include "trxman.h"
TRX active_list_min, active_list_max,
committed_list_min, committed_list_max, *pool;
pthread_mutex_t LOCK_trx_list;
uint trxman_active_transactions, trxman_allocated_transactions;
TrID global_trid_generator;
TRX **short_id_to_trx;
my_atomic_rwlock_t LOCK_short_id_to_trx;
LF_HASH trid_to_trx;
static byte *trx_get_hash_key(const byte *trx,uint* len, my_bool unused)
{
*len= sizeof(TrID);
return (byte *) & ((*((TRX **)trx))->trid);
}
int trxman_init()
{
pthread_mutex_init(&LOCK_trx_list, MY_MUTEX_INIT_FAST);
active_list_max.trid= active_list_min.trid= 0;
active_list_max.min_read_from=~0;
active_list_max.next= active_list_min.prev= 0;
active_list_max.prev= &active_list_min;
active_list_min.next= &active_list_max;
trxman_active_transactions= 0;
trxman_allocated_transactions= 0;
committed_list_max.commit_trid= ~0;
committed_list_max.next= committed_list_min.prev= 0;
committed_list_max.prev= &committed_list_min;
committed_list_min.next= &committed_list_max;
pool=0;
global_trid_generator=0; /* set later by recovery code */
lf_hash_init(&trid_to_trx, sizeof(TRX*), LF_HASH_UNIQUE,
0, 0, trx_get_hash_key, 0);
my_atomic_rwlock_init(&LOCK_short_id_to_trx);
short_id_to_trx=(TRX **)my_malloc(SHORT_ID_MAX*sizeof(TRX*),
MYF(MY_WME|MY_ZEROFILL));
if (!short_id_to_trx)
return 1;
short_id_to_trx--; /* min short_id is 1 */
return 0;
}
int trxman_destroy()
{
DBUG_ASSERT(trid_to_trx.count == 0);
DBUG_ASSERT(trxman_active_transactions == 0);
DBUG_ASSERT(active_list_max.prev == &active_list_min);
DBUG_ASSERT(active_list_min.next == &active_list_max);
DBUG_ASSERT(committed_list_max.prev == &committed_list_min);
DBUG_ASSERT(committed_list_min.next == &committed_list_max);
while (pool)
{
TRX *tmp=pool->next;
my_free((void *)pool, MYF(0));
pool=tmp;
}
lf_hash_destroy(&trid_to_trx);
pthread_mutex_destroy(&LOCK_trx_list);
my_atomic_rwlock_destroy(&LOCK_short_id_to_trx);
my_free((void *)(short_id_to_trx+1), MYF(0));
}
static TrID new_trid()
{
DBUG_ASSERT(global_trid_generator < 0xffffffffffffLL);
safe_mutex_assert_owner(&LOCK_trx_list);
return ++global_trid_generator;
}
static void set_short_id(TRX *trx)
{
int i= (global_trid_generator + (intptr)trx) * 312089 % SHORT_ID_MAX;
my_atomic_rwlock_wrlock(&LOCK_short_id_to_trx);
for ( ; ; i= i % SHORT_ID_MAX + 1) /* the range is [1..SHORT_ID_MAX] */
{
void *tmp=NULL;
if (short_id_to_trx[i] == NULL &&
my_atomic_casptr((void **)&short_id_to_trx[i], &tmp, trx))
break;
}
my_atomic_rwlock_wrunlock(&LOCK_short_id_to_trx);
trx->short_id= i;
}
TRX *trxman_new_trx()
{
TRX *trx;
my_atomic_add32(&trxman_active_transactions, 1);
/*
see trxman_end_trx to see why we need a mutex here
and as we have a mutex, we can as well do everything
under it - allocating a TRX, incrementing trxman_active_transactions,
setting trx->min_read_from.
Note that all the above is fast. generating short_id may be slow,
as it involves scanning a big array - so it's still done
outside of the mutex.
*/
pthread_mutex_lock(&LOCK_trx_list);
trx=pool;
while (trx && !my_atomic_casptr((void **)&pool, (void **)&trx, trx->next))
/* no-op */;
if (!trx)
{
trx=(TRX *)my_malloc(sizeof(TRX), MYF(MY_WME));
trxman_allocated_transactions++;
}
if (!trx)
return 0;
trx->min_read_from= active_list_min.next->trid;
trx->trid= new_trid();
trx->short_id= 0;
trx->next= &active_list_max;
trx->prev= active_list_max.prev;
active_list_max.prev= trx->prev->next= trx;
pthread_mutex_unlock(&LOCK_trx_list);
trx->pins=lf_hash_get_pins(&trid_to_trx);
if (!trx->min_read_from)
trx->min_read_from= trx->trid;
trx->commit_trid=0;
set_short_id(trx); /* this must be the last! */
return trx;
}
/*
remove a trx from the active list,
move to committed list,
set commit_trid
TODO
integrate with lock manager, log manager. That means:
a common "commit" mutex - forcing the log and setting commit_trid
must be done atomically (QQ how the heck it could be done with
group commit ???)
trid_to_trx, active_list_*, and committed_list_* can be
updated asyncronously.
*/
void trxman_end_trx(TRX *trx, my_bool commit)
{
int res;
TRX *free_me= 0;
LF_PINS *pins= trx->pins;
pthread_mutex_lock(&LOCK_trx_list);
trx->next->prev= trx->prev;
trx->prev->next= trx->next;
if (trx->prev == &active_list_min)
{
TRX *t;
for (t= committed_list_min.next;
t->commit_trid < active_list_min.next->min_read_from;
t= t->next) /* no-op */;
if (t != committed_list_min.next)
{
free_me= committed_list_min.next;
committed_list_min.next= t;
t->prev->next=0;
t->prev= &committed_list_min;
}
}
my_atomic_rwlock_wrlock(&LOCK_short_id_to_trx);
my_atomic_storeptr((void **)&short_id_to_trx[trx->short_id], 0);
my_atomic_rwlock_wrunlock(&LOCK_short_id_to_trx);
if (commit && active_list_min.next != &active_list_max)
{
trx->commit_trid= global_trid_generator;
trx->next= &committed_list_max;
trx->prev= committed_list_max.prev;
committed_list_max.prev= trx->prev->next= trx;
res= lf_hash_insert(&trid_to_trx, pins, &trx);
DBUG_ASSERT(res == 0);
}
else
{
trx->next=free_me;
free_me=trx;
}
pthread_mutex_unlock(&LOCK_trx_list);
my_atomic_add32(&trxman_active_transactions, -1);
while (free_me)
{
int res;
TRX *t= free_me;
free_me= free_me->next;
res= lf_hash_delete(&trid_to_trx, pins, &t->trid, sizeof(TrID));
trxman_free_trx(t);
}
lf_hash_put_pins(pins);
}
/* free a trx (add to the pool, that is */
void trxman_free_trx(TRX *trx)
{
TRX *tmp=pool;
do
{
trx->next=tmp;
} while (!my_atomic_casptr((void **)&pool, (void **)&tmp, trx));
}
my_bool trx_can_read_from(TRX *trx, TrID trid)
{
TRX *found;
my_bool can;
if (trid < trx->min_read_from)
return TRUE;
if (trid > trx->trid)
return FALSE;
found= lf_hash_search(&trid_to_trx, trx->pins, &trid, sizeof(trid));
if (!found)
return FALSE; /* not in the hash = cannot read */
can= found->commit_trid < trx->trid;
lf_unpin(trx->pins, 2);
return can;
}
typedef uint64 TrID; /* our TrID is 6 bytes */
typedef struct st_transaction
{
TrID trid, min_read_from, commit_trid;
struct st_transaction *next, *prev;
/* Note! if short_id is 0, trx is NOT initialized */
uint16 short_id;
LF_PINS *pins;
} TRX;
#define SHORT_ID_MAX 65535
extern uint trxman_active_transactions, trxman_allocated_transactions;
extern TRX **short_id_to_trx;
extern my_atomic_rwlock_t LOCK_short_id_to_trx;
int trxman_init();
int trxman_end();
TRX *trxman_new_trx();
void trxman_end_trx(TRX *trx, my_bool commit);
#define trxman_commit_trx(T) trxman_end_trx(T, TRUE)
#define trxman_abort_trx(T) trxman_end_trx(T, FALSE)
void trxman_free_trx(TRX *trx);
my_bool trx_can_read_from(TRX *trx, TrID trid);
......@@ -25,5 +25,5 @@ LDADD= $(top_builddir)/unittest/mytap/libmytap.a \
$(top_builddir)/mysys/libmysys.a \
$(top_builddir)/dbug/libdbug.a \
$(top_builddir)/strings/libmystrings.a @ZLIB_LIBS@
noinst_PROGRAMS = ma_control_file-t
noinst_PROGRAMS = ma_control_file-t trnman-t lockman-t
CLEANFILES = maria_control
/* Copyright (C) 2006 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
//#define EXTRA_VERBOSE
#include <tap.h>
#include <my_global.h>
#include <my_sys.h>
#include <my_atomic.h>
#include <lf.h>
#include "../lockman.h"
#define Nlos 10
LOCK_OWNER loarray[Nlos];
pthread_mutex_t mutexes[Nlos];
pthread_cond_t conds[Nlos];
LOCKMAN lockman;
#ifndef EXTRA_VERBOSE
#define print_lockhash(X) /* no-op */
#define DIAG(X) /* no-op */
#else
#define DIAG(X) diag X
#endif
LOCK_OWNER *loid2lo(uint16 loid)
{
return loarray+loid-1;
}
#define unlock_all(O) diag("lo" #O "> release all locks"); \
lockman_release_locks(&lockman, loid2lo(O));print_lockhash(&lockman)
#define test_lock(O, R, L, S, RES) \
ok(lockman_getlock(&lockman, loid2lo(O), R, L) == RES, \
"lo" #O "> " S " lock resource " #R " with " #L "-lock"); \
print_lockhash(&lockman)
#define lock_ok_a(O,R,L) test_lock(O,R,L,"",GOT_THE_LOCK)
#define lock_ok_i(O,R,L) test_lock(O,R,L,"",GOT_THE_LOCK_NEED_TO_LOCK_A_SUBRESOURCE)
#define lock_ok_l(O,R,L) test_lock(O,R,L,"",GOT_THE_LOCK_NEED_TO_INSTANT_LOCK_A_SUBRESOURCE)
#define lock_conflict(O,R,L) test_lock(O,R,L,"cannot ",DIDNT_GET_THE_LOCK); \
unlock_all(O)
void test_lockman_simple()
{
/* simple */
lock_ok_a(1, 1, S);
lock_ok_i(2, 2, IS);
lock_ok_i(1, 2, IX);
/* lock escalation */
lock_ok_a(1, 1, X);
lock_ok_i(2, 2, IX);
/* failures */
lock_conflict(2,1,X); /* this removes all locks of lo2 */
lock_ok_a(1,2,S);
lock_ok_a(1,2,IS);
lock_ok_a(1,2,LS);
lock_ok_i(1,3,IX);
lock_ok_a(2,3,LS);
lock_ok_i(1,3,IX);
lock_ok_l(2,3,IS);
lockman_release_locks(&lockman, loid2lo(1));
lockman_release_locks(&lockman, loid2lo(2));
}
pthread_attr_t rt_attr;
pthread_mutex_t rt_mutex;
pthread_cond_t rt_cond;
int rt_num_threads;
int litmus;
void run_test(const char *test, pthread_handler handler, int n, int m)
{
pthread_t t;
ulonglong now= my_getsystime();
litmus= 0;
diag("Testing %s with %d threads, %d iterations... ", test, n, m);
for (rt_num_threads= n ; n ; n--)
pthread_create(&t, &rt_attr, handler, &m);
pthread_mutex_lock(&rt_mutex);
while (rt_num_threads)
pthread_cond_wait(&rt_cond, &rt_mutex);
pthread_mutex_unlock(&rt_mutex);
now= my_getsystime()-now;
ok(litmus == 0, "tested %s in %g secs (%d)", test, ((double)now)/1e7, litmus);
}
int thread_number= 0, timeouts=0;
#define Nrows 1000
#define Ntables 10
#define TABLE_LOCK_RATIO 10
enum lock_type lock_array[6]={S,X,LS,LX,IS,IX};
char *lock2str[6]={"S","X","LS","LX","IS","IX"};
char *res2str[6]={
"DIDN'T GET THE LOCK",
"GOT THE LOCK",
"GOT THE LOCK NEED TO LOCK A SUBRESOURCE",
"GOT THE LOCK NEED TO INSTANT LOCK A SUBRESOURCE"};
pthread_handler_t test_lockman(void *arg)
{
int m= (*(int *)arg);
uint x, loid, row, table, res, locklevel, timeout= 0;
LOCK_OWNER *lo;
pthread_mutex_lock(&rt_mutex);
loid= ++thread_number;
pthread_mutex_unlock(&rt_mutex);
lo= loid2lo(loid);
for (x= ((int)(intptr)(&m)); m > 0; m--)
{
x= (x*3628273133 + 1500450271) % 9576890767; /* three prime numbers */
row= x % Nrows + Ntables;
table= row % Ntables;
locklevel= (x/Nrows) & 3;
if ((x/Nrows/4) % TABLE_LOCK_RATIO == 0)
{ /* table lock */
res= lockman_getlock(&lockman, lo, table, lock_array[locklevel]);
DIAG(("loid=%2d, table %d lock %s, res=%s", loid, table, lock2str[locklevel], res2str[res]));
if (res == DIDNT_GET_THE_LOCK)
{
lockman_release_locks(&lockman, lo);
DIAG(("loid=%2d, release all locks", loid));
timeout++;
continue;
}
DBUG_ASSERT(res == GOT_THE_LOCK);
}
else
{ /* row lock */
locklevel&= 1;
res= lockman_getlock(&lockman, lo, table, lock_array[locklevel + 4]);
DIAG(("loid=%2d, row %d lock %s, res=%s", loid, row, lock2str[locklevel+4], res2str[res]));
switch (res)
{
case DIDNT_GET_THE_LOCK:
lockman_release_locks(&lockman, lo);
DIAG(("loid=%2d, release all locks", loid));
timeout++;
continue;
case GOT_THE_LOCK:
continue;
case GOT_THE_LOCK_NEED_TO_INSTANT_LOCK_A_SUBRESOURCE:
/* not implemented, so take a regular lock */
case GOT_THE_LOCK_NEED_TO_LOCK_A_SUBRESOURCE:
res= lockman_getlock(&lockman, lo, row, lock_array[locklevel]);
DIAG(("loid=%2d, ROW %d lock %s, res=%s", loid, row, lock2str[locklevel], res2str[res]));
if (res == DIDNT_GET_THE_LOCK)
{
lockman_release_locks(&lockman, lo);
DIAG(("loid=%2d, release all locks", loid));
timeout++;
continue;
}
DBUG_ASSERT(res == GOT_THE_LOCK);
continue;
default:
DBUG_ASSERT(0);
}
}
}
lockman_release_locks(&lockman, lo);
pthread_mutex_lock(&rt_mutex);
rt_num_threads--;
timeouts+= timeout;
if (!rt_num_threads)
{
pthread_cond_signal(&rt_cond);
diag("number of timeouts: %d", timeouts);
}
pthread_mutex_unlock(&rt_mutex);
return 0;
}
int main()
{
int i;
my_init();
plan(14);
if (my_atomic_initialize())
return exit_status();
pthread_attr_init(&rt_attr);
pthread_attr_setdetachstate(&rt_attr,PTHREAD_CREATE_DETACHED);
pthread_mutex_init(&rt_mutex, 0);
pthread_cond_init(&rt_cond, 0);
lockman_init(&lockman, &loid2lo, 50);
for (i= 0; i < Nlos; i++)
{
loarray[i].pins= lf_alloc_get_pins(&lockman.alloc);
loarray[i].all_locks= 0;
loarray[i].waiting_for= 0;
pthread_mutex_init(&mutexes[i], MY_MUTEX_INIT_FAST);
pthread_cond_init (&conds[i], 0);
loarray[i].mutex= &mutexes[i];
loarray[i].cond= &conds[i];
loarray[i].loid= i+1;
}
test_lockman_simple();
#define CYCLES 100
#define THREADS Nlos /* don't change this line */
run_test("lockman", test_lockman, THREADS,CYCLES);
for (i= 0; i < Nlos; i++)
{
lockman_release_locks(&lockman, &loarray[i]);
pthread_mutex_destroy(loarray[i].mutex);
pthread_cond_destroy(loarray[i].cond);
lf_pinbox_put_pins(loarray[i].pins);
}
lockman_destroy(&lockman);
pthread_mutex_destroy(&rt_mutex);
pthread_cond_destroy(&rt_cond);
pthread_attr_destroy(&rt_attr);
my_end(0);
return exit_status();
}
......@@ -20,7 +20,7 @@
#include <my_sys.h>
#include <my_atomic.h>
#include <lf.h>
#include "../../storage/maria/trxman.h"
#include "../trnman.h"
pthread_attr_t rt_attr;
pthread_mutex_t rt_mutex;
......@@ -29,61 +29,48 @@ int rt_num_threads;
int litmus;
/* template for a test: the goal is to have litmus==0 if the test passed
#define ITER nnn
pthread_handler_t test_XXXXXXXX(void *arg)
{
int m=(*(int *)arg)/ITER, x;
for (x=((int)(intptr)(&m)); m ; m--)
{
// do something with litmus
}
// do something more with litmus
pthread_mutex_lock(&rt_mutex);
rt_num_threads--;
if (!rt_num_threads)
{
diag("whatever diagnostics we want", blabla, foobar);
pthread_cond_signal(&rt_cond);
}
pthread_mutex_unlock(&rt_mutex);
return 0;
}
#undef ITER
*/
/*
create and end (commit or rollback) transactions randomly
*/
#define MAX_ITER 100
pthread_handler_t test_trxman(void *arg)
pthread_handler_t test_trnman(void *arg)
{
int m=(*(int *)arg);
int m= (*(int *)arg);
uint x, y, i, j, n;
TRX *trx[MAX_ITER];
TRN *trn[MAX_ITER];
pthread_mutex_t mutexes[MAX_ITER];
pthread_cond_t conds[MAX_ITER];
for (i=0; i < MAX_ITER; i++)
{
pthread_mutex_init(&mutexes[i], MY_MUTEX_INIT_FAST);
pthread_cond_init(&conds[i], 0);
}
for (x=((int)(intptr)(&m)); m > 0; )
for (x= ((int)(intptr)(&m)); m > 0; )
{
y= x= (x*3628273133 + 1500450271) % 9576890767; /* three prime numbers */
m-= n= x % MAX_ITER;
for (i=0; i < n; i++)
trx[i]=trxman_new_trx();
for (i=0; i < n; i++)
for (i= 0; i < n; i++)
trn[i]= trnman_new_trn(&mutexes[i], &conds[i]);
for (i= 0; i < n; i++)
{
y=(y*19 + 7) % 31;
trxman_end_trx(trx[i], y & 1);
y= (y*19 + 7) % 31;
trnman_end_trn(trn[i], y & 1);
}
}
for (i=0; i < MAX_ITER; i++)
{
pthread_mutex_destroy(&mutexes[i]);
pthread_cond_destroy(&conds[i]);
}
pthread_mutex_lock(&rt_mutex);
rt_num_threads--;
if (!rt_num_threads)
pthread_cond_signal(&rt_cond);
pthread_mutex_unlock(&rt_mutex);
return 0;
}
#undef MAX_ITER
......@@ -91,30 +78,75 @@ pthread_handler_t test_trxman(void *arg)
void run_test(const char *test, pthread_handler handler, int n, int m)
{
pthread_t t;
ulonglong now=my_getsystime();
ulonglong now= my_getsystime();
litmus= 0;
diag("Testing %s with %d threads, %d iterations... ", test, n, m);
for (rt_num_threads=n ; n ; n--)
for (rt_num_threads= n ; n ; n--)
pthread_create(&t, &rt_attr, handler, &m);
pthread_mutex_lock(&rt_mutex);
while (rt_num_threads)
pthread_cond_wait(&rt_cond, &rt_mutex);
pthread_mutex_unlock(&rt_mutex);
now=my_getsystime()-now;
now= my_getsystime()-now;
ok(litmus == 0, "tested %s in %g secs (%d)", test, ((double)now)/1e7, litmus);
}
#define ok_read_from(T1, T2, RES) \
i=trnman_can_read_from(trn[T1], trid[T2]); \
ok(i == RES, "trn" #T1 " %s read from trn" #T2, i ? "can" : "cannot")
#define start_transaction(T) \
trn[T]= trnman_new_trn(&mutexes[T], &conds[T]); \
trid[T]= trn[T]->trid
#define commit(T) trnman_commit_trn(trn[T])
#define abort(T) trnman_abort_trn(trn[T])
#define Ntrns 4
void test_trnman_read_from()
{
TRN *trn[Ntrns];
TrID trid[Ntrns];
pthread_mutex_t mutexes[Ntrns];
pthread_cond_t conds[Ntrns];
int i;
for (i=0; i < Ntrns; i++)
{
pthread_mutex_init(&mutexes[i], MY_MUTEX_INIT_FAST);
pthread_cond_init(&conds[i], 0);
}
start_transaction(0); /* start trn1 */
start_transaction(1); /* start trn2 */
ok_read_from(1,0,0);
commit(0); /* commit trn1 */
start_transaction(2); /* start trn4 */
abort(2); /* abort trn4 */
start_transaction(3); /* start trn5 */
ok_read_from(3,0,1);
ok_read_from(3,1,0);
ok_read_from(3,2,0);
commit(1); /* commit trn2 */
ok_read_from(3,1,0);
commit(3); /* commit trn5 */
for (i=0; i < Ntrns; i++)
{
pthread_mutex_destroy(&mutexes[i]);
pthread_cond_destroy(&conds[i]);
}
}
int main()
{
plan(1);
my_init();
plan(6);
if (my_atomic_initialize())
return exit_status();
my_init();
pthread_attr_init(&rt_attr);
pthread_attr_setdetachstate(&rt_attr,PTHREAD_CREATE_DETACHED);
pthread_mutex_init(&rt_mutex, 0);
......@@ -123,10 +155,18 @@ int main()
#define CYCLES 10000
#define THREADS 10
trxman_init();
run_test("trxman", test_trxman, THREADS,CYCLES);
trxman_destroy();
diag("mallocs: %d\n", trxman_allocated_transactions);
trnman_init();
test_trnman_read_from();
run_test("trnman", test_trnman, THREADS,CYCLES);
diag("mallocs: %d", trnman_allocated_transactions);
{
ulonglong now= my_getsystime();
trnman_destroy();
now= my_getsystime()-now;
diag("trnman_destroy: %g", ((double)now)/1e7);
}
pthread_mutex_destroy(&rt_mutex);
pthread_cond_destroy(&rt_cond);
......
AM_CPPFLAGS = @ZLIB_INCLUDES@ -I$(top_builddir)/include
AM_CPPFLAGS += -I$(top_srcdir)/include -I$(top_srcdir)/unittest/mytap
LDADD = $(top_builddir)/unittest/mytap/libmytap.a \
$(top_builddir)/storage/maria/libmaria.a \
$(top_builddir)/mysys/libmysys.a \
$(top_builddir)/dbug/libdbug.a \
$(top_builddir)/strings/libmystrings.a
noinst_PROGRAMS = trxman-t
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment