Commit 2533633b authored by Vladislav Vaintroub's avatar Vladislav Vaintroub

Simplify thread attach/detach. Use connection specific mysys_var, rather than...

Simplify thread attach/detach. Use connection specific mysys_var, rather than sharing worker thread's my_thread_var with THD.
parent 85fc94ec
...@@ -26,99 +26,105 @@ uint threadpool_max_threads; ...@@ -26,99 +26,105 @@ uint threadpool_max_threads;
uint threadpool_oversubscribe; uint threadpool_oversubscribe;
extern "C" pthread_key(struct st_my_thread_var*, THR_KEY_mysys);
/* /*
Attach/associate the connection with the OS thread, for command processing. Worker threads contexts, and THD contexts.
=====================================
Both worker threads and connections have their sets of thread local variables
At the moment it is mysys_var (which has e.g dbug my_error and similar
goodies inside), and PSI per-client structure.
Whenever query is executed following needs to be done:
1. Save worker thread context.
2. Change TLS variables to connection specific ones using thread_attach(THD*).
This function does some additional work , e.g setting up
thread_stack/thread_ends_here pointers.
3. Process query
4. Restore worker thread context.
Connection login and termination follows similar schema w.r.t saving and
restoring contexts.
For both worker thread, and for the connection, mysys variables are created
using my_thread_init() and freed with my_thread_end().
*/ */
static inline bool thread_attach(THD* thd, char *stack_start, PSI_thread **save_psi_thread) struct Worker_thread_context
{ {
DBUG_ENTER("thread_attach"); PSI_thread *psi_thread;
st_my_thread_var* mysys_var;
if (PSI_server) void save()
{ {
*save_psi_thread= PSI_server->get_thread(); psi_thread= PSI_server?PSI_server->get_thread():0;
PSI_server->set_thread(thd->event_scheduler.m_psi); mysys_var= (st_my_thread_var *)pthread_getspecific(THR_KEY_mysys);
} }
else
*save_psi_thread= NULL;
/*
We need to know the start of the stack so that we could check for
stack overruns.
*/
thd->thread_stack= stack_start;
/* Calls close_connection() on failure */ void restore()
if (setup_connection_thread_globals(thd))
{ {
DBUG_RETURN(TRUE); if (PSI_server)
PSI_server->set_thread(psi_thread);
pthread_setspecific(THR_KEY_mysys,mysys_var);
pthread_setspecific(THR_THD, 0);
pthread_setspecific(THR_MALLOC, 0);
} }
};
/* clear errors from processing the previous THD */
my_errno= 0;
thd->mysys_var->abort= 0;
#ifndef DBUG_OFF
if (thd->event_scheduler.set_explain)
DBUG_SET(thd->event_scheduler.dbug_explain);
#endif
DBUG_RETURN(FALSE);
}
/* /*
Detach/disassociate the connection with the OS thread. Attach/associate the connection with the OS thread,
*/ */
static inline void thread_detach(THD* thd, PSI_thread *restore_psi_thread) static inline bool thread_attach(THD* thd)
{ {
DBUG_ENTER("thread_detach"); pthread_setspecific(THR_KEY_mysys,thd->mysys_var);
mysql_mutex_lock(&thd->LOCK_thd_data); thd->thread_stack=(char*)&thd;
thd->mysys_var = NULL; thd->store_globals();
mysql_mutex_unlock(&thd->LOCK_thd_data);
#ifndef DBUG_OFF
/*
If during the session @@session.dbug was assigned, the
dbug options/state has been pushed. Check if this is the
case, to be able to restore the state when we attach this
logical connection to a physical thread.
*/
if (_db_is_pushed_())
{
thd->event_scheduler.set_explain= TRUE;
if (DBUG_EXPLAIN(thd->event_scheduler.dbug_explain, sizeof(thd->event_scheduler.dbug_explain)))
sql_print_error("thd_scheduler: DBUG_EXPLAIN buffer is too small");
}
/* DBUG_POP() is a no-op in case there is no session state */
DBUG_POP();
#endif
if (PSI_server) if (PSI_server)
PSI_server->set_thread(restore_psi_thread); PSI_server->set_thread(thd->event_scheduler.m_psi);
pthread_setspecific(THR_THD, NULL); return 0;
DBUG_VOID_RETURN;
} }
int threadpool_add_connection(THD *thd) int threadpool_add_connection(THD *thd)
{ {
int retval=1; int retval=1;
PSI_thread *psi_thread; Worker_thread_context worker_context;
#ifndef DBUG_OFF worker_context.save();
thd->event_scheduler.set_explain = 0;
#endif /*
thread_attach(thd, (char *)&thd, &psi_thread); Create a new connection context: mysys_thread_var and PSI thread
ulonglong now= microsecond_interval_timer(); Store them in thd->mysys_var and thd->scheduler.m_psi.
thd->prior_thr_create_utime= now; */
thd->start_utime= now;
thd->thr_create_utime= now; /* Use my_thread_init() to create new mysys_thread_var. */
pthread_setspecific(THR_KEY_mysys, 0);
my_thread_init();
thd->mysys_var= (st_my_thread_var *)pthread_getspecific(THR_KEY_mysys);
if (!thd->mysys_var)
{
/* Out of memory? */
worker_context.restore();
return 1;
}
/* Create new PSI thread for use with the THD. */
if (PSI_server) if (PSI_server)
{ {
thd->event_scheduler.m_psi = thd->event_scheduler.m_psi =
PSI_server->new_thread(key_thread_one_connection, thd, thd->thread_id); PSI_server->new_thread(key_thread_one_connection, thd, thd->thread_id);
PSI_server->set_thread(thd->event_scheduler.m_psi);
} }
/* Login. */
thread_attach(thd);
ulonglong now= microsecond_interval_timer();
thd->prior_thr_create_utime= now;
thd->start_utime= now;
thd->thr_create_utime= now;
if (setup_connection_thread_globals(thd) == 0) if (setup_connection_thread_globals(thd) == 0)
{ {
if (login_connection(thd) == 0) if (login_connection(thd) == 0)
...@@ -129,15 +135,19 @@ int threadpool_add_connection(THD *thd) ...@@ -129,15 +135,19 @@ int threadpool_add_connection(THD *thd)
} }
} }
thd->skip_wait_timeout= true; thd->skip_wait_timeout= true;
thread_detach(thd, psi_thread);
worker_context.restore();
return retval; return retval;
} }
void threadpool_remove_connection(THD *thd) void threadpool_remove_connection(THD *thd)
{ {
PSI_thread *save_psi_thread;
thread_attach(thd, (char *)&thd, &save_psi_thread); Worker_thread_context worker_context;
worker_context.save();
thread_attach(thd);
thd->killed= KILL_CONNECTION; thd->killed= KILL_CONNECTION;
thd->net.reading_or_writing= 0; thd->net.reading_or_writing= 0;
...@@ -152,17 +162,21 @@ void threadpool_remove_connection(THD *thd) ...@@ -152,17 +162,21 @@ void threadpool_remove_connection(THD *thd)
unlink_thd(thd); unlink_thd(thd);
mysql_mutex_unlock(&LOCK_thread_count); mysql_mutex_unlock(&LOCK_thread_count);
mysql_cond_broadcast(&COND_thread_count); mysql_cond_broadcast(&COND_thread_count);
DBUG_POP();
if (PSI_server) /* Free resources (thread_var and PSI connection specific struct)*/
PSI_server->delete_current_thread(); my_thread_end();
pthread_setspecific(THR_THD, NULL);
worker_context.restore();
} }
int threadpool_process_request(THD *thd) int threadpool_process_request(THD *thd)
{ {
int retval= 0; int retval= 0;
PSI_thread *psi_thread; Worker_thread_context worker_context;
thread_attach(thd, (char *)&thd, &psi_thread); worker_context.save();
thread_attach(thd);
if (thd->killed >= KILL_CONNECTION) if (thd->killed >= KILL_CONNECTION)
{ {
...@@ -170,7 +184,7 @@ int threadpool_process_request(THD *thd) ...@@ -170,7 +184,7 @@ int threadpool_process_request(THD *thd)
kill flag can be set have been killed by kill flag can be set have been killed by
timeout handler or by a KILL command timeout handler or by a KILL command
*/ */
thread_detach(thd, psi_thread); worker_context.restore();
return 1; return 1;
} }
...@@ -199,9 +213,10 @@ int threadpool_process_request(THD *thd) ...@@ -199,9 +213,10 @@ int threadpool_process_request(THD *thd)
break; break;
} }
} }
thread_detach(thd, psi_thread);
if (!retval) if (!retval)
thd->net.reading_or_writing= 1; thd->net.reading_or_writing= 1;
worker_context.restore();
return retval; return retval;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment