Commit 7ee95409 authored by marko@hundin.mysql.fi's avatar marko@hundin.mysql.fi

InnoDB cleanup and possible bug-fix: Remove srv0que

parent 03ade8c5
...@@ -43,7 +43,7 @@ noinst_HEADERS = btr0btr.h btr0btr.ic btr0cur.h btr0cur.ic \ ...@@ -43,7 +43,7 @@ noinst_HEADERS = btr0btr.h btr0btr.ic btr0cur.h btr0cur.ic \
row0purge.ic row0row.h row0row.ic row0sel.h row0sel.ic \ row0purge.ic row0row.h row0row.ic row0sel.h row0sel.ic \
row0types.h row0uins.h row0uins.ic row0umod.h row0umod.ic \ row0types.h row0uins.h row0uins.ic row0umod.h row0umod.ic \
row0undo.h row0undo.ic row0upd.h row0upd.ic row0vers.h \ row0undo.h row0undo.ic row0upd.h row0upd.ic row0vers.h \
row0vers.ic srv0que.h srv0srv.h srv0srv.ic srv0start.h \ row0vers.ic srv0srv.h srv0srv.ic srv0start.h \
sync0arr.h sync0arr.ic sync0rw.h \ sync0arr.h sync0arr.ic sync0rw.h \
sync0rw.ic sync0sync.h sync0sync.ic sync0types.h \ sync0rw.ic sync0sync.h sync0sync.ic sync0types.h \
thr0loc.h thr0loc.ic trx0purge.h trx0purge.ic trx0rec.h \ thr0loc.h thr0loc.ic trx0purge.h trx0purge.ic trx0rec.h \
......
...@@ -152,17 +152,6 @@ que_run_threads( ...@@ -152,17 +152,6 @@ que_run_threads(
/*============*/ /*============*/
que_thr_t* thr); /* in: query thread which is run initially */ que_thr_t* thr); /* in: query thread which is run initially */
/************************************************************************** /**************************************************************************
After signal handling is finished, returns control to a query graph error
handling routine. (Currently, just returns the control to the root of the
graph so that the graph can communicate an error message to the client.) */
void
que_fork_error_handle(
/*==================*/
trx_t* trx, /* in: trx */
que_t* fork); /* in: query graph which was run before signal
handling started, NULL not allowed */
/**************************************************************************
Handles an SQL error noticed during query thread execution. At the moment, Handles an SQL error noticed during query thread execution. At the moment,
does nothing! */ does nothing! */
...@@ -181,18 +170,15 @@ a single worker thread to execute it. This function should be used to end ...@@ -181,18 +170,15 @@ a single worker thread to execute it. This function should be used to end
the wait state of a query thread waiting for a lock or a stored procedure the wait state of a query thread waiting for a lock or a stored procedure
completion. */ completion. */
void que_thr_t*
que_thr_end_wait( que_thr_end_wait(
/*=============*/ /*=============*/
que_thr_t* thr, /* in: query thread in the /* out: next query thread to run;
NULL if none */
que_thr_t* thr); /* in: query thread in the
QUE_THR_LOCK_WAIT, QUE_THR_LOCK_WAIT,
or QUE_THR_PROCEDURE_WAIT, or or QUE_THR_PROCEDURE_WAIT, or
QUE_THR_SIG_REPLY_WAIT state */ QUE_THR_SIG_REPLY_WAIT state */
que_thr_t** next_thr); /* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread */
/************************************************************************** /**************************************************************************
Same as que_thr_end_wait, but no parameter next_thr available. */ Same as que_thr_end_wait, but no parameter next_thr available. */
......
/******************************************************
Server query execution
(c) 1996 Innobase Oy
Created 6/5/1996 Heikki Tuuri
*******************************************************/
#ifndef srv0que_h
#define srv0que_h
#include "univ.i"
#include "que0types.h"
/**************************************************************************
Checks if there is work to do in the server task queue. If there is, the
thread starts processing a task. Before leaving, it again checks the task
queue and picks a new task if any exists. This is called by a SRV_WORKER
thread. */
void
srv_que_task_queue_check(void);
/*==========================*/
/**************************************************************************
Performs round-robin on the server tasks. This is called by a SRV_WORKER
thread every second or so. */
que_thr_t*
srv_que_round_robin(
/*================*/
/* out: the new (may be == thr) query thread
to run */
que_thr_t* thr); /* in: query thread */
/**************************************************************************
Enqueues a task to server task queue and releases a worker thread, if
there exists one suspended. */
void
srv_que_task_enqueue(
/*=================*/
que_thr_t* thr); /* in: query thread */
/**************************************************************************
Enqueues a task to server task queue and releases a worker thread, if
there exists one suspended. */
void
srv_que_task_enqueue_low(
/*=====================*/
que_thr_t* thr); /* in: query thread */
#endif
...@@ -91,16 +91,12 @@ trx_undo_rec_release( ...@@ -91,16 +91,12 @@ trx_undo_rec_release(
/************************************************************************* /*************************************************************************
Starts a rollback operation. */ Starts a rollback operation. */
void que_thr_t*
trx_rollback( trx_rollback(
/*=========*/ /*=========*/
/* out: next query thread to run */
trx_t* trx, /* in: transaction */ trx_t* trx, /* in: transaction */
trx_sig_t* sig, /* in: signal starting the rollback */ trx_sig_t* sig); /* in: signal starting the rollback */
que_thr_t** next_thr);/* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread */
/*********************************************************************** /***********************************************************************
Rollback or clean up transactions which have no user session. If the Rollback or clean up transactions which have no user session. If the
transaction already was committed, then we clean up a possible insert transaction already was committed, then we clean up a possible insert
...@@ -112,17 +108,12 @@ trx_rollback_or_clean_all_without_sess(void); ...@@ -112,17 +108,12 @@ trx_rollback_or_clean_all_without_sess(void);
/******************************************************************** /********************************************************************
Finishes a transaction rollback. */ Finishes a transaction rollback. */
void que_thr_t*
trx_finish_rollback_off_kernel( trx_finish_rollback_off_kernel(
/*===========================*/ /*===========================*/
/* out: next query thread to run */
que_t* graph, /* in: undo graph which can now be freed */ que_t* graph, /* in: undo graph which can now be freed */
trx_t* trx, /* in: transaction */ trx_t* trx); /* in: transaction */
que_thr_t** next_thr);/* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread; if this parameter is
NULL, it is ignored */
/******************************************************************** /********************************************************************
Builds an undo 'query' graph for a transaction. The actual rollback is Builds an undo 'query' graph for a transaction. The actual rollback is
performed by executing this query graph like a query subprocedure call. performed by executing this query graph like a query subprocedure call.
......
...@@ -194,9 +194,10 @@ trx_end_lock_wait( ...@@ -194,9 +194,10 @@ trx_end_lock_wait(
/******************************************************************** /********************************************************************
Sends a signal to a trx object. */ Sends a signal to a trx object. */
ibool que_thr_t*
trx_sig_send( trx_sig_send(
/*=========*/ /*=========*/
/* out: next query thread to run */
/* out: TRUE if the signal was /* out: TRUE if the signal was
successfully delivered */ successfully delivered */
trx_t* trx, /* in: trx handle */ trx_t* trx, /* in: trx handle */
...@@ -206,27 +207,17 @@ trx_sig_send( ...@@ -206,27 +207,17 @@ trx_sig_send(
que_thr_t* receiver_thr, /* in: query thread which wants the que_thr_t* receiver_thr, /* in: query thread which wants the
reply, or NULL; if type is reply, or NULL; if type is
TRX_SIG_END_WAIT, this must be NULL */ TRX_SIG_END_WAIT, this must be NULL */
trx_savept_t* savept, /* in: possible rollback savepoint, or trx_savept_t* savept); /* in: possible rollback savepoint, or
NULL */ NULL */
que_thr_t** next_thr); /* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread; if the parameter
is NULL, it is ignored */
/******************************************************************** /********************************************************************
Send the reply message when a signal in the queue of the trx has Send the reply message when a signal in the queue of the trx has
been handled. */ been handled. */
void que_thr_t*
trx_sig_reply( trx_sig_reply(
/*==========*/ /*==========*/
trx_sig_t* sig, /* in: signal */ /* out: next query thread to run */
que_thr_t** next_thr); /* in/out: next query thread to run; trx_sig_t* sig); /* in: signal */
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread */
/******************************************************************** /********************************************************************
Removes the signal object from a trx signal queue. */ Removes the signal object from a trx signal queue. */
...@@ -238,15 +229,11 @@ trx_sig_remove( ...@@ -238,15 +229,11 @@ trx_sig_remove(
/******************************************************************** /********************************************************************
Starts handling of a trx signal. */ Starts handling of a trx signal. */
void que_thr_t*
trx_sig_start_handle( trx_sig_start_handle(
/*=================*/ /*=================*/
trx_t* trx, /* in: trx handle */ /* out: next query thread to run, or NULL */
que_thr_t** next_thr); /* in/out: next query thread to run; trx_t* trx); /* in: trx handle */
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread */
/******************************************************************** /********************************************************************
Ends signal handling. If the session is in the error state, and Ends signal handling. If the session is in the error state, and
trx->graph_before_signal_handling != NULL, returns control to the error trx->graph_before_signal_handling != NULL, returns control to the error
......
...@@ -38,7 +38,6 @@ sess_try_close( ...@@ -38,7 +38,6 @@ sess_try_close(
/* The session handle. All fields are protected by the kernel mutex */ /* The session handle. All fields are protected by the kernel mutex */
struct sess_struct{ struct sess_struct{
ulint state; /* state of the session */
trx_t* trx; /* transaction object permanently trx_t* trx; /* transaction object permanently
assigned for the session: the assigned for the session: the
transaction instance designated by the transaction instance designated by the
...@@ -49,11 +48,6 @@ struct sess_struct{ ...@@ -49,11 +48,6 @@ struct sess_struct{
session */ session */
}; };
/* Session states */
#define SESS_ACTIVE 1
#define SESS_ERROR 2 /* session contains an error message
which has not yet been communicated
to the client */
#ifndef UNIV_NONINL #ifndef UNIV_NONINL
#include "usr0sess.ic" #include "usr0sess.ic"
#endif #endif
......
...@@ -12,7 +12,6 @@ Created 5/27/1996 Heikki Tuuri ...@@ -12,7 +12,6 @@ Created 5/27/1996 Heikki Tuuri
#include "que0que.ic" #include "que0que.ic"
#endif #endif
#include "srv0que.h"
#include "usr0sess.h" #include "usr0sess.h"
#include "trx0trx.h" #include "trx0trx.h"
#include "trx0roll.h" #include "trx0roll.h"
...@@ -175,19 +174,15 @@ a single worker thread to execute it. This function should be used to end ...@@ -175,19 +174,15 @@ a single worker thread to execute it. This function should be used to end
the wait state of a query thread waiting for a lock or a stored procedure the wait state of a query thread waiting for a lock or a stored procedure
completion. */ completion. */
void que_thr_t*
que_thr_end_wait( que_thr_end_wait(
/*=============*/ /*=============*/
que_thr_t* thr, /* in: query thread in the /* out: next query thread to run;
NULL if none */
que_thr_t* thr) /* in: query thread in the
QUE_THR_LOCK_WAIT, QUE_THR_LOCK_WAIT,
or QUE_THR_PROCEDURE_WAIT, or or QUE_THR_PROCEDURE_WAIT, or
QUE_THR_SIG_REPLY_WAIT state */ QUE_THR_SIG_REPLY_WAIT state */
que_thr_t** next_thr) /* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread; if NULL is passed
as the parameter, it is ignored */
{ {
ibool was_active; ibool was_active;
...@@ -195,6 +190,8 @@ que_thr_end_wait( ...@@ -195,6 +190,8 @@ que_thr_end_wait(
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
#endif /* UNIV_SYNC_DEBUG */ #endif /* UNIV_SYNC_DEBUG */
ut_ad(thr); ut_ad(thr);
ut_ad(next_thr);
ut_ad(*next_thr == NULL);
ut_ad((thr->state == QUE_THR_LOCK_WAIT) ut_ad((thr->state == QUE_THR_LOCK_WAIT)
|| (thr->state == QUE_THR_PROCEDURE_WAIT) || (thr->state == QUE_THR_PROCEDURE_WAIT)
|| (thr->state == QUE_THR_SIG_REPLY_WAIT)); || (thr->state == QUE_THR_SIG_REPLY_WAIT));
...@@ -206,17 +203,8 @@ que_thr_end_wait( ...@@ -206,17 +203,8 @@ que_thr_end_wait(
que_thr_move_to_run_state(thr); que_thr_move_to_run_state(thr);
if (was_active) { return(was_active ? NULL : thr);
}
return;
}
if (next_thr && *next_thr == NULL) {
*next_thr = thr;
} else {
srv_que_task_enqueue_low(thr);
}
}
/************************************************************************** /**************************************************************************
Same as que_thr_end_wait, but no parameter next_thr available. */ Same as que_thr_end_wait, but no parameter next_thr available. */
...@@ -253,8 +241,6 @@ que_thr_end_wait_no_next_thr( ...@@ -253,8 +241,6 @@ que_thr_end_wait_no_next_thr(
for the lock to be released: */ for the lock to be released: */
srv_release_mysql_thread_if_suspended(thr); srv_release_mysql_thread_if_suspended(thr);
/* srv_que_task_enqueue_low(thr); */
} }
/************************************************************************** /**************************************************************************
...@@ -355,48 +341,6 @@ que_fork_start_command( ...@@ -355,48 +341,6 @@ que_fork_start_command(
return(NULL); return(NULL);
} }
/**************************************************************************
After signal handling is finished, returns control to a query graph error
handling routine. (Currently, just returns the control to the root of the
graph so that the graph can communicate an error message to the client.) */
void
que_fork_error_handle(
/*==================*/
trx_t* trx __attribute__((unused)), /* in: trx */
que_t* fork) /* in: query graph which was run before signal
handling started, NULL not allowed */
{
que_thr_t* thr;
#ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex));
#endif /* UNIV_SYNC_DEBUG */
ut_ad(trx->sess->state == SESS_ERROR);
ut_ad(UT_LIST_GET_LEN(trx->reply_signals) == 0);
ut_ad(UT_LIST_GET_LEN(trx->wait_thrs) == 0);
thr = UT_LIST_GET_FIRST(fork->thrs);
while (thr != NULL) {
ut_ad(!thr->is_active);
ut_ad(thr->state != QUE_THR_SIG_REPLY_WAIT);
ut_ad(thr->state != QUE_THR_LOCK_WAIT);
thr->run_node = thr;
thr->prev_node = thr->child;
thr->state = QUE_THR_COMPLETED;
thr = UT_LIST_GET_NEXT(thrs, thr);
}
thr = UT_LIST_GET_FIRST(fork->thrs);
que_thr_move_to_run_state(thr);
srv_que_task_enqueue_low(thr);
}
/******************************************************************** /********************************************************************
Tests if all the query threads in the same fork have a given state. */ Tests if all the query threads in the same fork have a given state. */
UNIV_INLINE UNIV_INLINE
...@@ -765,22 +709,18 @@ this function may only be called from inside que_run_threads or ...@@ -765,22 +709,18 @@ this function may only be called from inside que_run_threads or
que_thr_check_if_switch! These restrictions exist to make the rollback code que_thr_check_if_switch! These restrictions exist to make the rollback code
easier to maintain. */ easier to maintain. */
static static
void que_thr_t*
que_thr_dec_refer_count( que_thr_dec_refer_count(
/*====================*/ /*====================*/
que_thr_t* thr, /* in: query thread */ /* out: next query thread to run */
que_thr_t** next_thr) /* in/out: next query thread to run; que_thr_t* thr) /* in: query thread */
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread */
{ {
que_fork_t* fork; que_fork_t* fork;
trx_t* trx; trx_t* trx;
sess_t* sess; sess_t* sess;
ulint fork_type; ulint fork_type;
ibool stopped; que_thr_t* next_thr = NULL;
fork = thr->common.parent; fork = thr->common.parent;
trx = thr->graph->trx; trx = thr->graph->trx;
sess = trx->sess; sess = trx->sess;
...@@ -791,9 +731,7 @@ que_thr_dec_refer_count( ...@@ -791,9 +731,7 @@ que_thr_dec_refer_count(
if (thr->state == QUE_THR_RUNNING) { if (thr->state == QUE_THR_RUNNING) {
stopped = que_thr_stop(thr); if (!que_thr_stop(thr)) {
if (!stopped) {
/* The reason for the thr suspension or wait was /* The reason for the thr suspension or wait was
already canceled before we came here: continue already canceled before we came here: continue
running the thread */ running the thread */
...@@ -801,15 +739,9 @@ que_thr_dec_refer_count( ...@@ -801,15 +739,9 @@ que_thr_dec_refer_count(
/* fputs("!!!!!!!! Wait already ended: continue thr\n", /* fputs("!!!!!!!! Wait already ended: continue thr\n",
stderr); */ stderr); */
if (next_thr && *next_thr == NULL) {
*next_thr = thr;
} else {
srv_que_task_enqueue_low(thr);
}
mutex_exit(&kernel_mutex); mutex_exit(&kernel_mutex);
return; return(thr);
} }
} }
...@@ -825,7 +757,7 @@ que_thr_dec_refer_count( ...@@ -825,7 +757,7 @@ que_thr_dec_refer_count(
mutex_exit(&kernel_mutex); mutex_exit(&kernel_mutex);
return; return(next_thr);
} }
fork_type = fork->fork_type; fork_type = fork->fork_type;
...@@ -841,7 +773,7 @@ que_thr_dec_refer_count( ...@@ -841,7 +773,7 @@ que_thr_dec_refer_count(
ut_ad(UT_LIST_GET_LEN(trx->signals) > 0); ut_ad(UT_LIST_GET_LEN(trx->signals) > 0);
ut_ad(trx->handling_signals == TRUE); ut_ad(trx->handling_signals == TRUE);
trx_finish_rollback_off_kernel(fork, trx, next_thr); next_thr = trx_finish_rollback_off_kernel(fork, trx);
} else if (fork_type == QUE_FORK_PURGE) { } else if (fork_type == QUE_FORK_PURGE) {
...@@ -863,7 +795,7 @@ que_thr_dec_refer_count( ...@@ -863,7 +795,7 @@ que_thr_dec_refer_count(
zero, then we start processing a signal; from it we may get zero, then we start processing a signal; from it we may get
a new query thread to run */ a new query thread to run */
trx_sig_start_handle(trx, next_thr); next_thr = trx_sig_start_handle(trx);
} }
if (trx->handling_signals && UT_LIST_GET_LEN(trx->signals) == 0) { if (trx->handling_signals && UT_LIST_GET_LEN(trx->signals) == 0) {
...@@ -872,6 +804,8 @@ que_thr_dec_refer_count( ...@@ -872,6 +804,8 @@ que_thr_dec_refer_count(
} }
mutex_exit(&kernel_mutex); mutex_exit(&kernel_mutex);
return(next_thr);
} }
/************************************************************************** /**************************************************************************
...@@ -1243,6 +1177,7 @@ que_run_threads( ...@@ -1243,6 +1177,7 @@ que_run_threads(
/*-------------------------*/ /*-------------------------*/
next_thr = que_thr_step(thr); next_thr = que_thr_step(thr);
/*-------------------------*/ /*-------------------------*/
ut_a(next_thr == thr || next_thr == NULL);
/* Test the effect on performance of adding extra mutex /* Test the effect on performance of adding extra mutex
reservations */ reservations */
...@@ -1259,7 +1194,7 @@ que_run_threads( ...@@ -1259,7 +1194,7 @@ que_run_threads(
loop_count++; loop_count++;
if (next_thr != thr) { if (next_thr != thr) {
que_thr_dec_refer_count(thr, &next_thr); next_thr = que_thr_dec_refer_count(thr);
if (next_thr == NULL) { if (next_thr == NULL) {
......
...@@ -19,6 +19,6 @@ include ../include/Makefile.i ...@@ -19,6 +19,6 @@ include ../include/Makefile.i
noinst_LIBRARIES = libsrv.a noinst_LIBRARIES = libsrv.a
libsrv_a_SOURCES = srv0srv.c srv0que.c srv0start.c libsrv_a_SOURCES = srv0srv.c srv0start.c
EXTRA_PROGRAMS = EXTRA_PROGRAMS =
/******************************************************
Server query execution
(c) 1996 Innobase Oy
Created 6/5/1996 Heikki Tuuri
*******************************************************/
#include "srv0que.h"
#include "srv0srv.h"
#include "sync0sync.h"
#include "os0thread.h"
#include "usr0sess.h"
#include "que0que.h"
/**************************************************************************
Checks if there is work to do in the server task queue. If there is, the
thread starts processing a task. Before leaving, it again checks the task
queue and picks a new task if any exists. This is called by a SRV_WORKER
thread. */
void
srv_que_task_queue_check(void)
/*==========================*/
{
que_thr_t* thr;
for (;;) {
mutex_enter(&kernel_mutex);
thr = UT_LIST_GET_FIRST(srv_sys->tasks);
if (thr == NULL) {
mutex_exit(&kernel_mutex);
return;
}
UT_LIST_REMOVE(queue, srv_sys->tasks, thr);
mutex_exit(&kernel_mutex);
que_run_threads(thr);
}
}
/**************************************************************************
Performs round-robin on the server tasks. This is called by a SRV_WORKER
thread every second or so. */
que_thr_t*
srv_que_round_robin(
/*================*/
/* out: the new (may be == thr) query thread
to run */
que_thr_t* thr) /* in: query thread */
{
que_thr_t* new_thr;
ut_ad(thr);
ut_ad(thr->state == QUE_THR_RUNNING);
mutex_enter(&kernel_mutex);
UT_LIST_ADD_LAST(queue, srv_sys->tasks, thr);
new_thr = UT_LIST_GET_FIRST(srv_sys->tasks);
mutex_exit(&kernel_mutex);
return(new_thr);
}
/**************************************************************************
Enqueues a task to server task queue and releases a worker thread, if there
is a suspended one. */
void
srv_que_task_enqueue_low(
/*=====================*/
que_thr_t* thr) /* in: query thread */
{
ut_ad(thr);
#ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex));
#endif /* UNIV_SYNC_DEBUG */
UT_LIST_ADD_LAST(queue, srv_sys->tasks, thr);
srv_release_threads(SRV_WORKER, 1);
}
/**************************************************************************
Enqueues a task to server task queue and releases a worker thread, if there
is a suspended one. */
void
srv_que_task_enqueue(
/*=================*/
que_thr_t* thr) /* in: query thread */
{
ut_ad(thr);
mutex_enter(&kernel_mutex);
srv_que_task_enqueue_low(thr);
mutex_exit(&kernel_mutex);
}
...@@ -34,7 +34,6 @@ Created 10/8/1995 Heikki Tuuri ...@@ -34,7 +34,6 @@ Created 10/8/1995 Heikki Tuuri
#include "sync0sync.h" #include "sync0sync.h"
#include "thr0loc.h" #include "thr0loc.h"
#include "que0que.h" #include "que0que.h"
#include "srv0que.h"
#include "log0recv.h" #include "log0recv.h"
#include "pars0pars.h" #include "pars0pars.h"
#include "usr0sess.h" #include "usr0sess.h"
......
...@@ -23,7 +23,6 @@ Created 3/26/1996 Heikki Tuuri ...@@ -23,7 +23,6 @@ Created 3/26/1996 Heikki Tuuri
#include "row0purge.h" #include "row0purge.h"
#include "row0upd.h" #include "row0upd.h"
#include "trx0rec.h" #include "trx0rec.h"
#include "srv0que.h"
#include "os0thread.h" #include "os0thread.h"
/* The global data structure coordinating a purge */ /* The global data structure coordinating a purge */
...@@ -1060,8 +1059,6 @@ trx_purge(void) ...@@ -1060,8 +1059,6 @@ trx_purge(void)
mutex_exit(&kernel_mutex); mutex_exit(&kernel_mutex);
/* srv_que_task_enqueue(thr2); */
if (srv_print_thread_releases) { if (srv_print_thread_releases) {
fputs("Starting purge\n", stderr); fputs("Starting purge\n", stderr);
......
...@@ -20,7 +20,6 @@ Created 3/26/1996 Heikki Tuuri ...@@ -20,7 +20,6 @@ Created 3/26/1996 Heikki Tuuri
#include "trx0rec.h" #include "trx0rec.h"
#include "que0que.h" #include "que0que.h"
#include "usr0sess.h" #include "usr0sess.h"
#include "srv0que.h"
#include "srv0start.h" #include "srv0start.h"
#include "row0undo.h" #include "row0undo.h"
#include "row0mysql.h" #include "row0mysql.h"
...@@ -932,21 +931,15 @@ trx_undo_rec_release( ...@@ -932,21 +931,15 @@ trx_undo_rec_release(
/************************************************************************* /*************************************************************************
Starts a rollback operation. */ Starts a rollback operation. */
void que_thr_t*
trx_rollback( trx_rollback(
/*=========*/ /*=========*/
/* out: next query thread to run */
trx_t* trx, /* in: transaction */ trx_t* trx, /* in: transaction */
trx_sig_t* sig, /* in: signal starting the rollback */ trx_sig_t* sig) /* in: signal starting the rollback */
que_thr_t** next_thr)/* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread; if the passed value is
NULL, the parameter is ignored */
{ {
que_t* roll_graph; que_t* roll_graph;
que_thr_t* thr; que_thr_t* thr;
/* que_thr_t* thr2; */
#ifdef UNIV_SYNC_DEBUG #ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
...@@ -988,18 +981,7 @@ trx_rollback( ...@@ -988,18 +981,7 @@ trx_rollback(
thr = que_fork_start_command(roll_graph); thr = que_fork_start_command(roll_graph);
ut_ad(thr); ut_ad(thr);
return(thr);
/* thr2 = que_fork_start_command(roll_graph);
ut_ad(thr2); */
if (next_thr && (*next_thr == NULL)) {
*next_thr = thr;
/* srv_que_task_enqueue_low(thr2); */
} else {
srv_que_task_enqueue_low(thr);
/* srv_que_task_enqueue_low(thr2); */
}
} }
/******************************************************************** /********************************************************************
...@@ -1071,17 +1053,14 @@ trx_finish_error_processing( ...@@ -1071,17 +1053,14 @@ trx_finish_error_processing(
/************************************************************************* /*************************************************************************
Finishes a partial rollback operation. */ Finishes a partial rollback operation. */
static static
void que_thr_t*
trx_finish_partial_rollback_off_kernel( trx_finish_partial_rollback_off_kernel(
/*===================================*/ /*===================================*/
trx_t* trx, /* in: transaction */ /* out: next query thread to run */
que_thr_t** next_thr)/* in/out: next query thread to run; trx_t* trx) /* in: transaction */
if the value which is passed in is a pointer
to a NULL pointer, then the calling function
can start running a new query thread; if this
parameter is NULL, it is ignored */
{ {
trx_sig_t* sig; trx_sig_t* sig;
que_thr_t* next_thr;
#ifdef UNIV_SYNC_DEBUG #ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
...@@ -1092,29 +1071,26 @@ trx_finish_partial_rollback_off_kernel( ...@@ -1092,29 +1071,26 @@ trx_finish_partial_rollback_off_kernel(
/* Remove the signal from the signal queue and send reply message /* Remove the signal from the signal queue and send reply message
to it */ to it */
trx_sig_reply(sig, next_thr); next_thr = trx_sig_reply(sig);
trx_sig_remove(trx, sig); trx_sig_remove(trx, sig);
trx->que_state = TRX_QUE_RUNNING; trx->que_state = TRX_QUE_RUNNING;
return(next_thr);
} }
/******************************************************************** /********************************************************************
Finishes a transaction rollback. */ Finishes a transaction rollback. */
void que_thr_t*
trx_finish_rollback_off_kernel( trx_finish_rollback_off_kernel(
/*===========================*/ /*===========================*/
/* out: next query thread to run */
que_t* graph, /* in: undo graph which can now be freed */ que_t* graph, /* in: undo graph which can now be freed */
trx_t* trx, /* in: transaction */ trx_t* trx) /* in: transaction */
que_thr_t** next_thr)/* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread; if this parameter is
NULL, it is ignored */
{ {
trx_sig_t* sig; trx_sig_t* sig;
trx_sig_t* next_sig; trx_sig_t* next_sig;
que_thr_t* next_thr;
#ifdef UNIV_SYNC_DEBUG #ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
...@@ -1129,15 +1105,13 @@ trx_finish_rollback_off_kernel( ...@@ -1129,15 +1105,13 @@ trx_finish_rollback_off_kernel(
if (sig->type == TRX_SIG_ROLLBACK_TO_SAVEPT) { if (sig->type == TRX_SIG_ROLLBACK_TO_SAVEPT) {
trx_finish_partial_rollback_off_kernel(trx, next_thr); return(trx_finish_partial_rollback_off_kernel(trx));
return;
} else if (sig->type == TRX_SIG_ERROR_OCCURRED) { } else if (sig->type == TRX_SIG_ERROR_OCCURRED) {
trx_finish_error_processing(trx); trx_finish_error_processing(trx);
return; return(NULL);
} }
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
...@@ -1153,19 +1127,23 @@ trx_finish_rollback_off_kernel( ...@@ -1153,19 +1127,23 @@ trx_finish_rollback_off_kernel(
send reply messages to them */ send reply messages to them */
trx->que_state = TRX_QUE_RUNNING; trx->que_state = TRX_QUE_RUNNING;
next_thr = NULL;
while (sig != NULL) { while (sig != NULL) {
next_sig = UT_LIST_GET_NEXT(signals, sig); next_sig = UT_LIST_GET_NEXT(signals, sig);
if (sig->type == TRX_SIG_TOTAL_ROLLBACK) { if (sig->type == TRX_SIG_TOTAL_ROLLBACK) {
trx_sig_reply(sig, next_thr); ut_a(next_thr == NULL);
next_thr = trx_sig_reply(sig);
trx_sig_remove(trx, sig); trx_sig_remove(trx, sig);
} }
sig = next_sig; sig = next_sig;
} }
return(next_thr);
} }
/************************************************************************* /*************************************************************************
...@@ -1198,7 +1176,6 @@ trx_rollback_step( ...@@ -1198,7 +1176,6 @@ trx_rollback_step(
que_thr_t* thr) /* in: query thread */ que_thr_t* thr) /* in: query thread */
{ {
roll_node_t* node; roll_node_t* node;
ibool success;
ulint sig_no; ulint sig_no;
trx_savept_t* savept; trx_savept_t* savept;
...@@ -1225,19 +1202,13 @@ trx_rollback_step( ...@@ -1225,19 +1202,13 @@ trx_rollback_step(
/* Send a rollback signal to the transaction */ /* Send a rollback signal to the transaction */
success = trx_sig_send(thr_get_trx(thr), trx_sig_send(thr_get_trx(thr), sig_no, TRX_SIG_SELF,
sig_no, TRX_SIG_SELF, thr, savept);
thr, savept, NULL);
thr->state = QUE_THR_SIG_REPLY_WAIT; thr->state = QUE_THR_SIG_REPLY_WAIT;
mutex_exit(&kernel_mutex); mutex_exit(&kernel_mutex);
if (!success) {
/* Error in delivering the rollback signal */
que_thr_handle_error(thr, DB_ERROR, NULL, 0);
}
return(NULL); return(NULL);
} }
......
...@@ -895,18 +895,15 @@ trx_assign_read_view( ...@@ -895,18 +895,15 @@ trx_assign_read_view(
/******************************************************************** /********************************************************************
Commits a transaction. NOTE that the kernel mutex is temporarily released. */ Commits a transaction. NOTE that the kernel mutex is temporarily released. */
static static
void que_thr_t*
trx_handle_commit_sig_off_kernel( trx_handle_commit_sig_off_kernel(
/*=============================*/ /*=============================*/
trx_t* trx, /* in: transaction */ /* out: next query thread to run */
que_thr_t** next_thr) /* in/out: next query thread to run; trx_t* trx) /* in: transaction */
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread */
{ {
trx_sig_t* sig; trx_sig_t* sig;
trx_sig_t* next_sig; trx_sig_t* next_sig;
que_thr_t* next_thr = NULL;
#ifdef UNIV_SYNC_DEBUG #ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
...@@ -928,7 +925,8 @@ trx_handle_commit_sig_off_kernel( ...@@ -928,7 +925,8 @@ trx_handle_commit_sig_off_kernel(
if (sig->type == TRX_SIG_COMMIT) { if (sig->type == TRX_SIG_COMMIT) {
trx_sig_reply(sig, next_thr); ut_a(next_thr == NULL);
next_thr = trx_sig_reply(sig);
trx_sig_remove(trx, sig); trx_sig_remove(trx, sig);
} }
...@@ -936,6 +934,8 @@ trx_handle_commit_sig_off_kernel( ...@@ -936,6 +934,8 @@ trx_handle_commit_sig_off_kernel(
} }
trx->que_state = TRX_QUE_RUNNING; trx->que_state = TRX_QUE_RUNNING;
return(next_thr);
} }
/*************************************************************** /***************************************************************
...@@ -997,39 +997,6 @@ trx_lock_wait_to_suspended( ...@@ -997,39 +997,6 @@ trx_lock_wait_to_suspended(
trx->que_state = TRX_QUE_RUNNING; trx->que_state = TRX_QUE_RUNNING;
} }
/***************************************************************
Moves the query threads in the sig reply wait list of trx to the SUSPENDED
state. */
static
void
trx_sig_reply_wait_to_suspended(
/*============================*/
trx_t* trx) /* in: transaction */
{
trx_sig_t* sig;
que_thr_t* thr;
#ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex));
#endif /* UNIV_SYNC_DEBUG */
sig = UT_LIST_GET_FIRST(trx->reply_signals);
while (sig != NULL) {
thr = sig->receiver;
ut_ad(thr->state == QUE_THR_SIG_REPLY_WAIT);
thr->state = QUE_THR_SUSPENDED;
sig->receiver = NULL;
UT_LIST_REMOVE(reply_signals, trx->reply_signals, sig);
sig = UT_LIST_GET_FIRST(trx->reply_signals);
}
}
/********************************************************************* /*********************************************************************
Checks the compatibility of a new signal with the other signals in the Checks the compatibility of a new signal with the other signals in the
queue. */ queue. */
...@@ -1109,11 +1076,10 @@ trx_sig_is_compatible( ...@@ -1109,11 +1076,10 @@ trx_sig_is_compatible(
/******************************************************************** /********************************************************************
Sends a signal to a trx object. */ Sends a signal to a trx object. */
ibool que_thr_t*
trx_sig_send( trx_sig_send(
/*=========*/ /*=========*/
/* out: TRUE if the signal was /* out: next query thread to run */
successfully delivered */
trx_t* trx, /* in: trx handle */ trx_t* trx, /* in: trx handle */
ulint type, /* in: signal type */ ulint type, /* in: signal type */
ulint sender, /* in: TRX_SIG_SELF or ulint sender, /* in: TRX_SIG_SELF or
...@@ -1121,14 +1087,8 @@ trx_sig_send( ...@@ -1121,14 +1087,8 @@ trx_sig_send(
que_thr_t* receiver_thr, /* in: query thread which wants the que_thr_t* receiver_thr, /* in: query thread which wants the
reply, or NULL; if type is reply, or NULL; if type is
TRX_SIG_END_WAIT, this must be NULL */ TRX_SIG_END_WAIT, this must be NULL */
trx_savept_t* savept, /* in: possible rollback savepoint, or trx_savept_t* savept) /* in: possible rollback savepoint, or
NULL */ NULL */
que_thr_t** next_thr) /* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread; if the parameter
is NULL, it is ignored */
{ {
trx_sig_t* sig; trx_sig_t* sig;
trx_t* receiver_trx; trx_t* receiver_trx;
...@@ -1138,14 +1098,7 @@ trx_sig_send( ...@@ -1138,14 +1098,7 @@ trx_sig_send(
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
#endif /* UNIV_SYNC_DEBUG */ #endif /* UNIV_SYNC_DEBUG */
if (!trx_sig_is_compatible(trx, type, sender)) { ut_a(trx_sig_is_compatible(trx, type, sender));
/* The signal is not compatible with the other signals in
the queue: do nothing */
ut_error;
return(FALSE);
}
/* Queue the signal object */ /* Queue the signal object */
...@@ -1179,11 +1132,6 @@ trx_sig_send( ...@@ -1179,11 +1132,6 @@ trx_sig_send(
sig); sig);
} }
if (trx->sess->state == SESS_ERROR) {
trx_sig_reply_wait_to_suspended(trx);
}
if ((sender != TRX_SIG_SELF) || (type == TRX_SIG_BREAK_EXECUTION)) { if ((sender != TRX_SIG_SELF) || (type == TRX_SIG_BREAK_EXECUTION)) {
/* The following call will add a TRX_SIG_ERROR_OCCURRED /* The following call will add a TRX_SIG_ERROR_OCCURRED
...@@ -1198,10 +1146,10 @@ trx_sig_send( ...@@ -1198,10 +1146,10 @@ trx_sig_send(
if (UT_LIST_GET_FIRST(trx->signals) == sig) { if (UT_LIST_GET_FIRST(trx->signals) == sig) {
trx_sig_start_handle(trx, next_thr); return(trx_sig_start_handle(trx));
} }
return(TRUE); return(NULL);
} }
/******************************************************************** /********************************************************************
...@@ -1223,27 +1171,18 @@ trx_end_signal_handling( ...@@ -1223,27 +1171,18 @@ trx_end_signal_handling(
trx->handling_signals = FALSE; trx->handling_signals = FALSE;
trx->graph = trx->graph_before_signal_handling; trx->graph = trx->graph_before_signal_handling;
if (trx->graph && (trx->sess->state == SESS_ERROR)) {
que_fork_error_handle(trx, trx->graph);
}
} }
/******************************************************************** /********************************************************************
Starts handling of a trx signal. */ Starts handling of a trx signal. */
void que_thr_t*
trx_sig_start_handle( trx_sig_start_handle(
/*=================*/ /*=================*/
trx_t* trx, /* in: trx handle */ /* out: next query thread to run, or NULL */
que_thr_t** next_thr) /* in/out: next query thread to run; trx_t* trx) /* in: trx handle */
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread; if the parameter
is NULL, it is ignored */
{ {
que_thr_t* next_thr = NULL;
trx_sig_t* sig; trx_sig_t* sig;
ulint type; ulint type;
loop: loop:
...@@ -1259,7 +1198,7 @@ trx_sig_start_handle( ...@@ -1259,7 +1198,7 @@ trx_sig_start_handle(
trx_end_signal_handling(trx); trx_end_signal_handling(trx);
return; return(next_thr);
} }
if (trx->conc_state == TRX_NOT_STARTED) { if (trx->conc_state == TRX_NOT_STARTED) {
...@@ -1275,23 +1214,13 @@ trx_sig_start_handle( ...@@ -1275,23 +1214,13 @@ trx_sig_start_handle(
trx_lock_wait_to_suspended(trx); trx_lock_wait_to_suspended(trx);
} }
/* If the session is in the error state and this trx has threads
waiting for reply from signals, moves these threads to the suspended
state, canceling wait reservations; note that if the transaction has
sent a commit or rollback signal to itself, and its session is not in
the error state, then nothing is done here. */
if (trx->sess->state == SESS_ERROR) {
trx_sig_reply_wait_to_suspended(trx);
}
/* If there are no running query threads, we can start processing of a /* If there are no running query threads, we can start processing of a
signal, otherwise we have to wait until all query threads of this signal, otherwise we have to wait until all query threads of this
transaction are aware of the arrival of the signal. */ transaction are aware of the arrival of the signal. */
if (trx->n_active_thrs > 0) { if (trx->n_active_thrs > 0) {
return; return(NULL);
} }
if (trx->handling_signals == FALSE) { if (trx->handling_signals == FALSE) {
...@@ -1305,30 +1234,19 @@ trx_sig_start_handle( ...@@ -1305,30 +1234,19 @@ trx_sig_start_handle(
if (type == TRX_SIG_COMMIT) { if (type == TRX_SIG_COMMIT) {
trx_handle_commit_sig_off_kernel(trx, next_thr); next_thr = trx_handle_commit_sig_off_kernel(trx);
} else if ((type == TRX_SIG_TOTAL_ROLLBACK) } else if ((type == TRX_SIG_TOTAL_ROLLBACK)
|| (type == TRX_SIG_ROLLBACK_TO_SAVEPT)) { || (type == TRX_SIG_ROLLBACK_TO_SAVEPT)
|| (type == TRX_SIG_ERROR_OCCURRED)) {
trx_rollback(trx, sig, next_thr);
/* No further signals can be handled until the rollback
completes, therefore we return */
return;
} else if (type == TRX_SIG_ERROR_OCCURRED) {
trx_rollback(trx, sig, next_thr);
/* No further signals can be handled until the rollback /* No further signals can be handled until the rollback
completes, therefore we return */ completes, therefore we return */
return; return(trx_rollback(trx, sig));
} else if (type == TRX_SIG_BREAK_EXECUTION) { } else if (type == TRX_SIG_BREAK_EXECUTION) {
trx_sig_reply(sig, next_thr); next_thr = trx_sig_reply(sig);
trx_sig_remove(trx, sig); trx_sig_remove(trx, sig);
} else { } else {
ut_error; ut_error;
...@@ -1341,17 +1259,14 @@ trx_sig_start_handle( ...@@ -1341,17 +1259,14 @@ trx_sig_start_handle(
Send the reply message when a signal in the queue of the trx has been Send the reply message when a signal in the queue of the trx has been
handled. */ handled. */
void que_thr_t*
trx_sig_reply( trx_sig_reply(
/*==========*/ /*==========*/
trx_sig_t* sig, /* in: signal */ /* out: next query thread to run */
que_thr_t** next_thr) /* in/out: next query thread to run; trx_sig_t* sig) /* in: signal */
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread */
{ {
trx_t* receiver_trx; trx_t* receiver_trx;
que_thr_t* next_thr = NULL;
ut_ad(sig); ut_ad(sig);
#ifdef UNIV_SYNC_DEBUG #ifdef UNIV_SYNC_DEBUG
...@@ -1365,13 +1280,13 @@ trx_sig_reply( ...@@ -1365,13 +1280,13 @@ trx_sig_reply(
UT_LIST_REMOVE(reply_signals, receiver_trx->reply_signals, UT_LIST_REMOVE(reply_signals, receiver_trx->reply_signals,
sig); sig);
ut_ad(receiver_trx->sess->state != SESS_ERROR); next_thr = que_thr_end_wait(sig->receiver);
que_thr_end_wait(sig->receiver, next_thr);
sig->receiver = NULL; sig->receiver = NULL;
} }
return(next_thr);
} }
/******************************************************************** /********************************************************************
...@@ -1427,7 +1342,6 @@ trx_commit_step( ...@@ -1427,7 +1342,6 @@ trx_commit_step(
{ {
commit_node_t* node; commit_node_t* node;
que_thr_t* next_thr; que_thr_t* next_thr;
ibool success;
node = thr->run_node; node = thr->run_node;
...@@ -1442,21 +1356,14 @@ trx_commit_step( ...@@ -1442,21 +1356,14 @@ trx_commit_step(
node->state = COMMIT_NODE_WAIT; node->state = COMMIT_NODE_WAIT;
next_thr = NULL;
thr->state = QUE_THR_SIG_REPLY_WAIT; thr->state = QUE_THR_SIG_REPLY_WAIT;
/* Send the commit signal to the transaction */ /* Send the commit signal to the transaction */
success = trx_sig_send(thr_get_trx(thr), TRX_SIG_COMMIT, next_thr = trx_sig_send(thr_get_trx(thr), TRX_SIG_COMMIT,
TRX_SIG_SELF, thr, NULL, &next_thr); TRX_SIG_SELF, thr, NULL);
mutex_exit(&kernel_mutex);
if (!success) { mutex_exit(&kernel_mutex);
/* Error in delivering the commit signal */
que_thr_handle_error(thr, DB_ERROR, NULL, 0);
}
return(next_thr); return(next_thr);
} }
......
...@@ -37,8 +37,6 @@ sess_open(void) ...@@ -37,8 +37,6 @@ sess_open(void)
#endif /* UNIV_SYNC_DEBUG */ #endif /* UNIV_SYNC_DEBUG */
sess = mem_alloc(sizeof(sess_t)); sess = mem_alloc(sizeof(sess_t));
sess->state = SESS_ACTIVE;
sess->trx = trx_create(sess); sess->trx = trx_create(sess);
UT_LIST_INIT(sess->graphs); UT_LIST_INIT(sess->graphs);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment