Commit c0413b74 authored by Alexey Botchkov's avatar Alexey Botchkov

Bug#54861 Additional connections not handled properly in mtr --embedded

        When in embedded-serve mode, mysqltest tried to run '--send' commands in the separate thread.
        That upsets some engines (InnoDB particularly) as the transaction has to be executed in the same
        thread completely. So i implemented some different approach. So we create one separate thread for
        each connection and execute all the queries of this connection inside it. Looks even simpler than it was
        for me.

per-file comments:
  client/mysqltest.cc
Bug#54861      Additional connections not handled properly in mtr --embedded
        Now the connection has one running connection_thread() attached. And sends all the
        query and read-result requests to it.
parent 90eef290
......@@ -248,10 +248,14 @@ struct st_connection
my_bool pending;
#ifdef EMBEDDED_LIBRARY
pthread_t tid;
const char *cur_query;
int cur_query_len;
pthread_mutex_t mutex;
pthread_cond_t cond;
int command, result;
pthread_mutex_t query_mutex;
pthread_cond_t query_cond;
pthread_mutex_t result_mutex;
pthread_cond_t result_cond;
int query_done;
#endif /*EMBEDDED_LIBRARY*/
};
......@@ -703,67 +707,148 @@ void handle_no_error(struct st_command*);
#ifdef EMBEDDED_LIBRARY
#define EMB_SEND_QUERY 1
#define EMB_READ_QUERY_RESULT 2
#define EMB_END_CONNECTION 3
/* attributes of the query thread */
pthread_attr_t cn_thd_attrib;
/*
send_one_query executes query in separate thread, which is
necessary in embedded library to run 'send' in proper way.
This implementation doesn't handle errors returned
by mysql_send_query. It's technically possible, though
I don't see where it is needed.
This procedure represents the connection and actually
runs queries when in the EMBEDDED-SERVER mode.
The run_query_normal() just sends request for running
mysql_send_query and mysql_read_query_result() here.
*/
pthread_handler_t send_one_query(void *arg)
pthread_handler_t connection_thread(void *arg)
{
struct st_connection *cn= (struct st_connection*)arg;
mysql_thread_init();
(void) mysql_send_query(&cn->mysql, cn->cur_query, cn->cur_query_len);
while (cn->command != EMB_END_CONNECTION)
{
if (!cn->command)
{
pthread_mutex_lock(&cn->query_mutex);
while (!cn->command)
pthread_cond_wait(&cn->query_cond, &cn->query_mutex);
pthread_mutex_unlock(&cn->query_mutex);
}
switch (cn->command)
{
case EMB_END_CONNECTION:
goto end_thread;
case EMB_SEND_QUERY:
cn->result= mysql_send_query(&cn->mysql, cn->cur_query, cn->cur_query_len);
break;
case EMB_READ_QUERY_RESULT:
cn->result= mysql_read_query_result(&cn->mysql);
break;
default:
DBUG_ASSERT(0);
}
cn->command= 0;
pthread_mutex_lock(&cn->result_mutex);
cn->query_done= 1;
pthread_cond_signal(&cn->result_cond);
pthread_mutex_unlock(&cn->result_mutex);
}
mysql_thread_end();
pthread_mutex_lock(&cn->mutex);
end_thread:
cn->query_done= 1;
pthread_cond_signal(&cn->cond);
pthread_mutex_unlock(&cn->mutex);
mysql_thread_end();
pthread_exit(0);
return 0;
}
static int do_send_query(struct st_connection *cn, const char *q, int q_len,
int flags)
static void wait_query_thread_done(struct st_connection *con)
{
DBUG_ASSERT(con->tid);
if (!con->query_done)
{
pthread_mutex_lock(&con->result_mutex);
while (!con->query_done)
pthread_cond_wait(&con->result_cond, &con->result_mutex);
pthread_mutex_unlock(&con->result_mutex);
}
}
static void signal_connection_thd(struct st_connection *cn, int command)
{
pthread_t tid;
DBUG_ASSERT(cn->tid);
cn->query_done= 0;
cn->command= command;
pthread_mutex_lock(&cn->query_mutex);
pthread_cond_signal(&cn->query_cond);
pthread_mutex_unlock(&cn->query_mutex);
}
if (flags & QUERY_REAP_FLAG)
return mysql_send_query(&cn->mysql, q, q_len);
if (pthread_mutex_init(&cn->mutex, NULL) ||
pthread_cond_init(&cn->cond, NULL))
die("Error in the thread library");
/*
Sometimes we try to execute queries when the connection is closed.
It's done to make sure it was closed completely.
So that if our connection is closed (cn->tid == 0), we just return
the mysql_send_query() result which is an error in this case.
*/
static int do_send_query(struct st_connection *cn, const char *q, int q_len)
{
if (!cn->tid)
return mysql_send_query(&cn->mysql, q, q_len);
cn->cur_query= q;
cn->cur_query_len= q_len;
cn->query_done= 0;
if (pthread_create(&tid, &cn_thd_attrib, send_one_query, (void*)cn))
die("Cannot start new thread for query");
signal_connection_thd(cn, EMB_SEND_QUERY);
return 0;
}
static void wait_query_thread_end(struct st_connection *con)
static int do_read_query_result(struct st_connection *cn)
{
if (!con->query_done)
{
pthread_mutex_lock(&con->mutex);
while (!con->query_done)
pthread_cond_wait(&con->cond, &con->mutex);
pthread_mutex_unlock(&con->mutex);
}
DBUG_ASSERT(cn->tid);
wait_query_thread_done(cn);
signal_connection_thd(cn, EMB_READ_QUERY_RESULT);
wait_query_thread_done(cn);
return cn->result;
}
static void emb_close_connection(struct st_connection *cn)
{
if (!cn->tid)
return;
wait_query_thread_done(cn);
signal_connection_thd(cn, EMB_END_CONNECTION);
pthread_join(cn->tid, NULL);
cn->tid= 0;
pthread_mutex_destroy(&cn->query_mutex);
pthread_cond_destroy(&cn->query_cond);
pthread_mutex_destroy(&cn->result_mutex);
pthread_cond_destroy(&cn->result_cond);
}
static void init_connection_thd(struct st_connection *cn)
{
cn->query_done= 1;
cn->command= 0;
if (pthread_mutex_init(&cn->query_mutex, NULL) ||
pthread_cond_init(&cn->query_cond, NULL) ||
pthread_mutex_init(&cn->result_mutex, NULL) ||
pthread_cond_init(&cn->result_cond, NULL) ||
pthread_create(&cn->tid, &cn_thd_attrib, connection_thread, (void*)cn))
die("Error in the thread library");
}
#else /*EMBEDDED_LIBRARY*/
#define do_send_query(cn,q,q_len,flags) mysql_send_query(&cn->mysql, q, q_len)
#define do_send_query(cn,q,q_len) mysql_send_query(&cn->mysql, q, q_len)
#define do_read_query_result(cn) mysql_read_query_result(&cn->mysql)
#endif /*EMBEDDED_LIBRARY*/
......@@ -1106,6 +1191,9 @@ void close_connections()
DBUG_ENTER("close_connections");
for (--next_con; next_con >= connections; --next_con)
{
#ifdef EMBEDDED_LIBRARY
emb_close_connection(next_con);
#endif
if (next_con->stmt)
mysql_stmt_close(next_con->stmt);
next_con->stmt= 0;
......@@ -4866,7 +4954,7 @@ void do_close_connection(struct st_command *command)
we need to check if the query's thread was finished and probably wait
(embedded-server specific)
*/
wait_query_thread_end(con);
emb_close_connection(con);
#endif /*EMBEDDED_LIBRARY*/
if (con->stmt)
mysql_stmt_close(con->stmt);
......@@ -5216,8 +5304,9 @@ void do_connect(struct st_command *command)
}
#ifdef EMBEDDED_LIBRARY
con_slot->query_done= 1;
#endif
init_connection_thd(con_slot);
#endif /*EMBEDDED_LIBRARY*/
if (!mysql_init(&con_slot->mysql))
die("Failed on mysql_init()");
......@@ -6766,21 +6855,13 @@ void run_query_normal(struct st_connection *cn, struct st_command *command,
/*
Send the query
*/
if (do_send_query(cn, query, query_len, flags))
if (do_send_query(cn, query, query_len))
{
handle_error(command, mysql_errno(mysql), mysql_error(mysql),
mysql_sqlstate(mysql), ds);
goto end;
}
}
#ifdef EMBEDDED_LIBRARY
/*
Here we handle 'reap' command, so we need to check if the
query's thread was finished and probably wait
*/
else if (flags & QUERY_REAP_FLAG)
wait_query_thread_end(cn);
#endif /*EMBEDDED_LIBRARY*/
if (!(flags & QUERY_REAP_FLAG))
{
cn->pending= TRUE;
......@@ -6793,7 +6874,7 @@ void run_query_normal(struct st_connection *cn, struct st_command *command,
When on first result set, call mysql_read_query_result to retrieve
answer to the query sent earlier
*/
if ((counter==0) && mysql_read_query_result(mysql))
if ((counter==0) && do_read_query_result(cn))
{
handle_error(command, mysql_errno(mysql), mysql_error(mysql),
mysql_sqlstate(mysql), ds);
......@@ -7968,6 +8049,9 @@ int main(int argc, char **argv)
ps_protocol_enabled= 1;
st_connection *con= connections;
#ifdef EMBEDDED_LIBRARY
init_connection_thd(con);
#endif /*EMBEDDED_LIBRARY*/
if (!( mysql_init(&con->mysql)))
die("Failed in mysql_init()");
if (opt_connect_timeout)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment