Commit 9423c79b authored by Sergei Golubchik's avatar Sergei Golubchik

maria: deadlock detection when waiting on unique key (useless until we can rollback)

include/my_pthread.h:
  cleanup
include/waiting_threads.h:
  header guard
mysys/waiting_threads.c:
  bug - kill strategy were not applied to deadlocks of length 1.
  cast timeout to ulonglong.
storage/maria/ma_static.c:
  declare WT_RESOURCE_TYPE ma_rc_dup_unique
storage/maria/ma_write.c:
  deadlock detection when waiting on unique key (useless until we can rollback)
storage/maria/maria_def.h:
  deadlock detection when waiting on unique key (useless until we can rollback)
storage/maria/trnman.c:
  use deadlock detector.
  protect state transitions of a TRN with a mutex.
  trnman_trid_to_trn() function.
storage/maria/trnman.h:
  trnman_trid_to_trn() function
  protect state transitions of a TRN with a mutex
  use deadlock detector.
storage/maria/trnman_public.h:
  trnman_trid_to_trn()
parent 72ffd154
......@@ -437,9 +437,10 @@ int my_pthread_mutex_trylock(pthread_mutex_t *mutex);
#ifndef set_timespec_time_nsec
#define set_timespec_time_nsec(ABSTIME,TIME,NSEC) do { \
ulonglong now= (TIME) + (NSEC/100); \
ulonglong nsec= (NSEC); \
ulonglong now= (TIME) + (nsec/100); \
(ABSTIME).TV_sec= (now / ULL(10000000)); \
(ABSTIME).TV_nsec= (now % ULL(10000000) * 100 + ((NSEC) % 100)); \
(ABSTIME).TV_nsec= (now % ULL(10000000) * 100 + (nsec % 100)); \
} while(0)
#endif /* !set_timespec_time_nsec */
......
......@@ -13,6 +13,9 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef _waiting_threads_h
#define _waiting_threads_h
#include <my_global.h>
#include <my_sys.h>
#include <lf.h>
......@@ -152,3 +155,4 @@ void wt_thd_release(WT_THD *, WT_RESOURCE_ID *);
#define wt_thd_release_all(THD) wt_thd_release((THD), 0)
int wt_resource_id_memcmp(void *, void *);
#endif
......@@ -227,6 +227,20 @@ struct deadlock_arg {
WT_RESOURCE *rc;
};
static void change_victim(WT_THD* found, struct deadlock_arg *arg)
{
if (found->weight < arg->victim->weight)
{
if (arg->victim != arg->thd)
{
rc_unlock(arg->victim->waiting_for); /* release the previous victim */
DBUG_ASSERT(arg->rc == found->waiting_for);
}
arg->victim= found;
arg->rc= 0;
}
}
/*
loop detection in a wait-for graph with a limited search depth.
*/
......@@ -294,16 +308,8 @@ static int deadlock_search(struct deadlock_arg *arg, WT_THD *blocker,
break;
case WT_DEADLOCK:
ret= WT_DEADLOCK;
if (cursor->weight < arg->victim->weight)
{
if (arg->victim != arg->thd)
{
rc_unlock(arg->victim->waiting_for); /* release the previous victim */
DBUG_ASSERT(arg->rc == cursor->waiting_for);
}
arg->victim= cursor;
}
else if (arg->rc)
change_victim(cursor, arg);
if (arg->rc)
rc_unlock(arg->rc);
goto end;
case WT_OK:
......@@ -329,13 +335,15 @@ static int deadlock(WT_THD *thd, WT_THD *blocker, uint depth,
int ret;
DBUG_ENTER("deadlock");
ret= deadlock_search(&arg, blocker, depth);
if (arg.rc)
rc_unlock(arg.rc);
if (ret == WT_DEPTH_EXCEEDED)
{
increment_cycle_stats(WT_CYCLE_STATS, max_depth);
ret= WT_OK;
}
if (ret == WT_DEADLOCK && depth)
change_victim(blocker, &arg);
if (arg.rc)
rc_unlock(arg.rc);
if (ret == WT_DEADLOCK && arg.victim != thd)
{
DBUG_PRINT("wt", ("killing %s", arg.victim->name));
......@@ -570,7 +578,7 @@ int wt_thd_cond_timedwait(WT_THD *thd, pthread_mutex_t *mutex)
ret= WT_OK;
rc_unlock(rc);
set_timespec_time_nsec(timeout, starttime, wt_timeout_short*1000);
set_timespec_time_nsec(timeout, starttime, wt_timeout_short*ULL(1000));
if (ret == WT_TIMEOUT)
ret= pthread_cond_timedwait(&rc->cond, mutex, &timeout);
if (ret == WT_TIMEOUT)
......@@ -579,7 +587,7 @@ int wt_thd_cond_timedwait(WT_THD *thd, pthread_mutex_t *mutex)
ret= WT_DEADLOCK;
else if (wt_timeout_long > wt_timeout_short)
{
set_timespec_time_nsec(timeout, starttime, wt_timeout_long*1000);
set_timespec_time_nsec(timeout, starttime, wt_timeout_long*ULL(1000));
if (!thd->killed)
ret= pthread_cond_timedwait(&rc->cond, mutex, &timeout);
}
......
......@@ -88,7 +88,7 @@ my_bool _ma_setup_live_state(MARIA_HA *info)
It's enough to compare trids here (instead of calling
tranman_can_read_from) as history->trid is a commit_trid
*/
while (trn->trid < history->trid)
while (trn->trid < history->trid && history->trid != ~(TrID)0)
history= history->next;
pthread_mutex_unlock(&share->intern_lock);
/* The current item can't be deleted as it's the first one visible for us */
......
......@@ -64,6 +64,9 @@ HASH maria_stored_state;
*/
TRN dummy_transaction_object;
/* a WT_RESOURCE_TYPE for transactions waiting on a unique key conflict */
WT_RESOURCE_TYPE ma_rc_dup_unique={ wt_resource_id_memcmp, 0};
/* Enough for comparing if number is zero */
uchar maria_zero_string[]= {0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0};
......
......@@ -180,16 +180,48 @@ int maria_write(MARIA_HA *info, uchar *record)
}
else
{
if (keyinfo->ck_insert(info,
while (keyinfo->ck_insert(info,
(*keyinfo->make_key)(info, &int_key, i,
buff, record, filepos,
info->trn->trid)))
{
TRN *blocker=trnman_trid_to_trn(info->trn, info->dup_key_trid);
DBUG_PRINT("error",("Got error: %d on write",my_errno));
/*
if blocker TRN was not found, it means that the conflicting
transaction was committed long time ago. It could not be
aborted, as it would have to wait on the key tree lock
to remove the conflicting key it has inserted.
*/
if (local_lock_tree)
rw_unlock(&keyinfo->root_lock);
DBUG_PRINT("error",("Got error: %d on write",my_errno));
if (!blocker)
goto err;
if (blocker->commit_trid != ~(TrID)0)
{ /* committed, albeit recently */
pthread_mutex_unlock(& blocker->state_lock);
goto err;
}
{ /* running. now we wait */
WT_RESOURCE_ID rc;
int res;
rc.type= &ma_rc_dup_unique;
rc.value.ptr= blocker; /* TODO savepoint id when we'll have them */
res= wt_thd_will_wait_for(& info->trn->wt, & blocker->wt, & rc);
if (res != WT_OK)
{
pthread_mutex_unlock(& blocker->state_lock);
goto err;
}
res=wt_thd_cond_timedwait(& info->trn->wt, & blocker->state_lock);
pthread_mutex_unlock(& blocker->state_lock);
if (res != WT_OK)
goto err;
}
if (local_lock_tree)
rw_wrlock(&keyinfo->root_lock);
}
}
/* The above changed info->lastkey2. Inform maria_rnext_same(). */
......@@ -597,9 +629,22 @@ static int w_search(register MARIA_HA *info, uint32 comp_flag, MARIA_KEY *key,
else /* not HA_FULLTEXT, normal HA_NOSAME key */
{
DBUG_PRINT("warning", ("Duplicate key"));
/*
FIXME
When the index will support true versioning - with multiple
identical values in the UNIQUE index, invisible to each other -
the following should be changed to "continue inserting keys, at the
end (of the row or statement) wait". Until it's done we cannot properly
support deadlock timeouts.
*/
/*
transaction that has inserted the conflicting key is in progress.
wait for it to be committed or aborted.
*/
info->dup_key_trid= _ma_trid_from_key(&tmp_key);
info->dup_key_pos= dup_key_pos;
my_afree((uchar*) temp_buff);
my_errno=HA_ERR_FOUND_DUPP_KEY;
my_errno= HA_ERR_FOUND_DUPP_KEY;
DBUG_RETURN(-1);
}
}
......
......@@ -29,6 +29,7 @@
#include "ma_loghandler.h"
#include "ma_control_file.h"
#include "ma_state.h"
#include <waiting_threads.h>
/* For testing recovery */
#ifdef TO_BE_REMOVED
......@@ -499,6 +500,7 @@ struct st_maria_handler
ulong last_loop; /* last used counter */
MARIA_RECORD_POS save_lastpos;
MARIA_RECORD_POS dup_key_pos;
TrID dup_key_trid;
my_off_t pos; /* Intern variable */
my_off_t last_keypage; /* Last key page read */
my_off_t last_search_keypage; /* Last keypage when searching */
......@@ -759,6 +761,7 @@ extern char *maria_data_root;
extern uchar maria_zero_string[];
extern my_bool maria_inited, maria_in_ha_maria;
extern HASH maria_stored_state;
extern WT_RESOURCE_TYPE ma_rc_dup_unique;
/* This is used by _ma_calc_xxx_key_length och _ma_store_key */
typedef struct st_maria_s_param
......@@ -782,7 +785,6 @@ typedef struct st_pinned_page
my_bool changed;
} MARIA_PINNED_PAGE;
/* Prototypes for intern functions */
extern int _ma_read_dynamic_record(MARIA_HA *, uchar *, MARIA_RECORD_POS);
extern int _ma_read_rnd_dynamic_record(MARIA_HA *, uchar *, MARIA_RECORD_POS,
......
......@@ -46,7 +46,7 @@ static TRN *pool;
/* a hash for committed transactions that maps trid to a TRN structure */
static LF_HASH trid_to_trn;
/* an array that maps short_trid of an active transaction to a TRN structure */
/* an array that maps short_id of an active transaction to a TRN structure */
static TRN **short_trid_to_active_trn;
/* locks for short_trid_to_active_trn and pool */
......@@ -114,11 +114,13 @@ int trnman_init(TrID initial_trid)
{
DBUG_ENTER("trnman_init");
wt_init(); /* FIXME this should be done in the server, not in the engine! */
short_trid_to_active_trn= (TRN **)my_malloc(SHORT_TRID_MAX*sizeof(TRN*),
MYF(MY_WME|MY_ZEROFILL));
if (unlikely(!short_trid_to_active_trn))
DBUG_RETURN(1);
short_trid_to_active_trn--; /* min short_trid is 1 */
short_trid_to_active_trn--; /* min short_id is 1 */
/*
Initialize lists.
......@@ -179,6 +181,8 @@ void trnman_destroy()
{
TRN *trn= pool;
pool= pool->next;
pthread_mutex_destroy(&trn->state_lock);
wt_thd_destroy(&trn->wt);
my_free((void *)trn, MYF(0));
}
lf_hash_destroy(&trid_to_trn);
......@@ -188,6 +192,9 @@ void trnman_destroy()
my_atomic_rwlock_destroy(&LOCK_pool);
my_free((void *)(short_trid_to_active_trn+1), MYF(0));
short_trid_to_active_trn= NULL;
wt_end();
DBUG_VOID_RETURN;
}
......@@ -206,11 +213,13 @@ static TrID new_trid()
DBUG_RETURN(++global_trid_generator);
}
static void set_short_trid(TRN *trn)
static uint get_short_trid(TRN *trn)
{
int i= (int) ((global_trid_generator + (intptr)trn) * 312089 %
SHORT_TRID_MAX + 1);
for ( ; !trn->short_id ; i= 1)
uint res=0;
for ( ; !res ; i= 1)
{
my_atomic_rwlock_wrlock(&LOCK_short_trid_to_trn);
for ( ; i <= SHORT_TRID_MAX; i++) /* the range is [1..SHORT_TRID_MAX] */
......@@ -219,12 +228,13 @@ static void set_short_trid(TRN *trn)
if (short_trid_to_active_trn[i] == NULL &&
my_atomic_casptr((void **)&short_trid_to_active_trn[i], &tmp, trn))
{
trn->short_id= i;
res= i;
break;
}
}
my_atomic_rwlock_wrunlock(&LOCK_short_trid_to_trn);
}
return res;
}
/*
......@@ -243,7 +253,7 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond)
we have a mutex, to do simple things under it - allocate a TRN,
increment trnman_active_transactions, set trn->min_read_from.
Note that all the above is fast. generating short_trid may be slow,
Note that all the above is fast. generating short_id may be slow,
as it involves scanning a large array - so it's done outside of the
mutex.
*/
......@@ -280,6 +290,8 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond)
return 0;
}
trnman_allocated_transactions++;
pthread_mutex_init(&trn->state_lock, MY_MUTEX_INIT_FAST);
wt_thd_init(&trn->wt);
}
trn->pins= lf_hash_get_pins(&trid_to_trn);
if (!trn->pins)
......@@ -293,7 +305,6 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond)
trn->min_read_from= active_list_min.next->trid;
trn->trid= new_trid();
trn->short_id= 0;
trn->next= &active_list_max;
trn->prev= active_list_max.prev;
......@@ -320,7 +331,9 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond)
only after the following function TRN is considered initialized,
so it must be done the last
*/
set_short_trid(trn);
pthread_mutex_lock(&trn->state_lock);
trn->short_id= get_short_trid(trn);
pthread_mutex_unlock(&trn->state_lock);
res= lf_hash_insert(&trid_to_trn, trn->pins, &trn);
DBUG_ASSERT(res <= 0);
......@@ -364,6 +377,7 @@ my_bool trnman_end_trn(TRN *trn, my_bool commit)
/* if a rollback, all UNDO records should have been executed */
DBUG_ASSERT(commit || trn->undo_lsn == 0);
DBUG_PRINT("info", ("pthread_mutex_lock LOCK_trn_list"));
pthread_mutex_lock(&LOCK_trn_list);
/* remove from active list */
......@@ -402,7 +416,11 @@ my_bool trnman_end_trn(TRN *trn, my_bool commit)
*/
if (commit && active_list_min.next != &active_list_max)
{
pthread_mutex_lock(&trn->state_lock);
trn->commit_trid= global_trid_generator;
wt_thd_release_all(& trn->wt);
pthread_mutex_unlock(&trn->state_lock);
trn->next= &committed_list_max;
trn->prev= committed_list_max.prev;
trnman_committed_transactions++;
......@@ -436,11 +454,14 @@ my_bool trnman_end_trn(TRN *trn, my_bool commit)
TRN *t= free_me;
free_me= free_me->next;
/*
ignore OOM here. it's harmless, and there's nothing we could do, anyway
*/
/* ignore OOM. it's harmless, and we can do nothing here anyway */
(void)lf_hash_delete(&trid_to_trn, pins, &t->trid, sizeof(TrID));
pthread_mutex_lock(&trn->state_lock);
trn->short_id= 0;
wt_thd_release_all(& trn->wt);
pthread_mutex_unlock(&trn->state_lock);
trnman_free_trn(t);
}
......@@ -533,6 +554,33 @@ int trnman_can_read_from(TRN *trn, TrID trid)
return can;
}
TRN *trnman_trid_to_trn(TRN *trn, TrID trid)
{
TRN **found;
LF_REQUIRE_PINS(3);
if (trid < trn->min_read_from)
return 0; /* it's committed eons ago */
found= lf_hash_search(&trid_to_trn, trn->pins, &trid, sizeof(trid));
if (found == NULL || found == MY_ERRPTR)
return 0; /* no luck */
/* we've found something */
pthread_mutex_lock(&(*found)->state_lock);
if ((*found)->short_id == 0)
{
pthread_mutex_unlock(&(*found)->state_lock);
lf_hash_search_unpin(trn->pins);
return 0; /* but it was a ghost */
}
lf_hash_search_unpin(trn->pins);
/* Gotcha! */
return *found; /* note that TRN is returned locked !!! */
}
/* TODO: the stubs below are waiting for savepoints to be implemented */
void trnman_new_statement(TRN *trn __attribute__ ((unused)))
......
......@@ -21,19 +21,32 @@ C_MODE_START
#include <lf.h>
#include "trnman_public.h"
#include "ma_loghandler_lsn.h"
#include <waiting_threads.h>
/*
trid - 6 uchar transaction identifier. Assigned when a transaction
is created. Transaction can always be identified by its trid,
even after transaction has ended.
short_trid - 2-byte transaction identifier, identifies a running
short_id - 2-byte transaction identifier, identifies a running
transaction, is reassigned when transaction ends.
when short_id is 0, TRN is not initialized, for all practical purposes
it could be considered unused.
when commit_trid is ~(TrID)0 the transaction is running, otherwise it's
committed.
state_lock mutex protects the state of a TRN, that is whether a TRN
is committed/running/unused. Meaning that modifications of short_id and
commit_trid happen under this mutex.
*/
struct st_transaction
{
LF_PINS *pins;
WT_THD wt;
pthread_mutex_t state_lock;
void *used_tables; /* Tables used by transaction */
TRN *next, *prev;
TrID trid, min_read_from, commit_trid;
......@@ -41,7 +54,6 @@ struct st_transaction
LSN_WITH_FLAGS first_undo_lsn;
uint locked_tables;
uint16 short_id;
/* Note! if short_id is 0, trn is NOT initialized */
};
#define TRANSACTION_LOGGED_LONG_ID ULL(0x8000000000000000)
......
......@@ -45,6 +45,7 @@ my_bool trnman_end_trn(TRN *trn, my_bool commit);
#define trnman_rollback_trn(T) trnman_end_trn(T, FALSE)
void trnman_free_trn(TRN *trn);
int trnman_can_read_from(TRN *trn, TrID trid);
TRN *trnman_trid_to_trn(TRN *trn, TrID trid);
void trnman_new_statement(TRN *trn);
void trnman_rollback_statement(TRN *trn);
my_bool trnman_collect_transactions(LEX_STRING *str_act, LEX_STRING *str_com,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment