Commit d62ea85e authored by Oleksandr Byelkin's avatar Oleksandr Byelkin

MDEV-9059: protocol: bundle first command with the authentication packet

parent 2ba1616e
......@@ -280,6 +280,8 @@ enum enum_indicator_type
#define MARIADB_CLIENT_COM_MULTI (1ULL << 33)
/* support of array binding */
#define MARIADB_CLIENT_STMT_BULK_OPERATIONS (1ULL << 34)
/* support bundle first command with the authentication packet */
#define MARIADB_CLIENT_COM_IN_AUTH (1ULL << 35)
#ifdef HAVE_COMPRESS
#define CAN_CLIENT_COMPRESS CLIENT_COMPRESS
......@@ -320,7 +322,8 @@ enum enum_indicator_type
CLIENT_DEPRECATE_EOF |\
CLIENT_CONNECT_ATTRS |\
MARIADB_CLIENT_COM_MULTI |\
MARIADB_CLIENT_STMT_BULK_OPERATIONS)
MARIADB_CLIENT_STMT_BULK_OPERATIONS |\
MARIADB_CLIENT_COM_IN_AUTH)
/*
To be added later:
......
......@@ -573,12 +573,17 @@ void Protocol::end_statement()
break;
case Diagnostics_area::DA_OK:
case Diagnostics_area::DA_OK_BULK:
/*
We skipping flush if it is packet processing, or there is bundle in
auth packet command.
*/
error= send_ok(thd->server_status,
thd->get_stmt_da()->statement_warn_count(),
thd->get_stmt_da()->affected_rows(),
thd->get_stmt_da()->last_insert_id(),
thd->get_stmt_da()->message(),
thd->get_stmt_da()->skip_flush());
(thd->get_stmt_da()->skip_flush() ||
(thd->bundle_command.str != NULL)));
break;
case Diagnostics_area::DA_DISABLED:
break;
......
......@@ -12403,38 +12403,78 @@ static bool find_mpvio_user(MPVIO_EXT *mpvio)
}
static bool
read_client_connect_attrs(char **ptr, char *end, CHARSET_INFO *from_cs)
read_bundle_length (size_t *length, char **ptr, char *end)
{
ulonglong length;
char *ptr_save= *ptr;
/* not enough bytes to hold the length */
if (ptr_save >= end)
return true;
length= safe_net_field_length_ll((uchar **) ptr, end - ptr_save);
*length= safe_net_field_length_ll((uchar **) ptr, end - ptr_save);
/* cannot even read the length */
if (*ptr == NULL)
return true;
/* length says there're more data than can fit into the packet */
if (*ptr + length > end)
if (*ptr + *length > end)
return true;
return false;
}
static bool
read_client_connect_attrs(char **ptr, char *end, CHARSET_INFO *from_cs)
{
size_t length;
if (read_bundle_length(&length, ptr, end))
return true;
/* impose an artificial length limit of 64k */
if (length > 65535)
return true;
#ifdef HAVE_PSI_THREAD_INTERFACE
if (PSI_THREAD_CALL(set_thread_connect_attrs)(*ptr, (size_t)length, from_cs) &&
current_thd->variables.log_warnings)
sql_print_warning("Connection attributes of length %llu were truncated",
length);
#endif
*ptr+= length;
return false;
}
static LEX_STRING
read_client_bundle_com(char **ptr, char *end)
{
LEX_STRING res= {0, packet_error};
if (read_bundle_length(&res.length, ptr, end))
return res;
if (!res.length)
return res;
/* do_command add \0 to the end so we need allocate more */
res.str= (char *)my_malloc(res.length + 1, MYF(MY_WME));
if (likely(res.str))
{
memcpy(res.str, *ptr, res.length);
*ptr+= res.length;
}
else
{
*ptr+= res.length;
res.length= packet_error;
}
return res;
}
#endif
/* the packet format is described in send_change_user_packet() */
......@@ -12817,6 +12857,19 @@ static ulong parse_client_handshake_packet(MPVIO_EXT *mpvio,
mpvio->auth_info.thd->charset()))
return packet_error;
if (thd->client_capabilities & MARIADB_CLIENT_COM_IN_AUTH)
{
thd->bundle_command=
read_client_bundle_com(&next_field,
((char *)net->read_pos) + pkt_len);
if (thd->bundle_command.length == packet_error)
{
thd->bundle_command.str= NULL;
thd->bundle_command.length= 0;
return packet_error;
}
}
/*
if the acl_user needs a different plugin to authenticate
(specified in GRANT ... AUTHENTICATED VIA plugin_name ..)
......
......@@ -952,6 +952,8 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
prepare_derived_at_open= FALSE;
create_tmp_table_for_derived= FALSE;
save_prep_leaf_list= FALSE;
bundle_command.str= NULL;
bundle_command.length= 0;
/* Restore THR_THD */
set_current_thd(old_THR_THD);
inc_thread_count();
......@@ -1364,6 +1366,8 @@ void THD::init(void)
#endif //EMBEDDED_LIBRARY
apc_target.init(&LOCK_thd_data);
bundle_command.str= NULL;
bundle_command.length= 0;
DBUG_VOID_RETURN;
}
......@@ -1486,6 +1490,13 @@ void THD::cleanup(void)
DBUG_ENTER("THD::cleanup");
DBUG_ASSERT(cleanup_done == 0);
if (bundle_command.str)
{
my_free(bundle_command.str);
bundle_command.str= 0;
bundle_command.length= 0;
}
set_killed(KILL_CONNECTION);
#ifdef ENABLE_WHEN_BINLOG_WILL_BE_ABLE_TO_PREPARE
if (transaction.xid_state.xa_state == XA_PREPARED)
......
......@@ -4559,7 +4559,6 @@ class THD :public Statement,
mysql_mutex_unlock(&LOCK_thread_count);
}
uint get_net_wait_timeout()
{
if (in_active_multi_stmt_transaction())
......@@ -4612,6 +4611,9 @@ class THD :public Statement,
LOG_SLOW_DISABLE_ADMIN);
query_plan_flags|= QPLAN_ADMIN;
}
/* Auth packet bundle packet */
LEX_STRING bundle_command;
};
inline void add_to_active_threads(THD *thd)
......
......@@ -1358,6 +1358,8 @@ void do_handle_one_connection(CONNECT *connect)
{
ulonglong thr_create_utime= microsecond_interval_timer();
THD *thd;
bool close_conn= false;
if (connect->scheduler->init_new_connection_thread() ||
!(thd= connect->create_thd(NULL)))
{
......@@ -1410,13 +1412,34 @@ void do_handle_one_connection(CONNECT *connect)
{
create_user= FALSE;
goto end_thread;
}
}
while (thd_is_connection_alive(thd))
if (thd->bundle_command.str)
{
thd->bundle_command.str[thd->bundle_command.length]= '\0'; /* safety */
enum enum_server_command command=
fetch_command(thd, thd->bundle_command.str);
close_conn= dispatch_command(command, thd, thd->bundle_command.str + 1,
(uint) (thd->bundle_command.length - 1),
FALSE, FALSE);
mysql_audit_release(thd);
if (do_command(thd))
break;
my_free(thd->bundle_command.str);
thd->bundle_command.str= 0;
thd->bundle_command.length= 0;
}
if (!close_conn)
{
while (thd_is_connection_alive(thd))
{
DBUG_ASSERT(thd->bundle_command.str == NULL);
mysql_audit_release(thd);
if (do_command(thd))
break;
}
}
end_connection(thd);
......
......@@ -1138,7 +1138,7 @@ void cleanup_items(Item *item)
DBUG_VOID_RETURN;
}
static enum enum_server_command fetch_command(THD *thd, char *packet)
enum enum_server_command fetch_command(THD *thd, char *packet)
{
enum enum_server_command
command= (enum enum_server_command) (uchar) packet[0];
......@@ -1342,25 +1342,6 @@ bool do_command(THD *thd)
command= fetch_command(thd, packet);
#ifdef WITH_WSREP
/*
Bail out if DB snapshot has not been installed.
*/
if (!(server_command_flags[command] & CF_SKIP_WSREP_CHECK) &&
!wsrep_node_is_ready(thd))
{
thd->protocol->end_statement();
/* Performance Schema Interface instrumentation end. */
MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da());
thd->m_statement_psi= NULL;
thd->m_digest= NULL;
return_value= FALSE;
goto out;
}
#endif
/* Restore read timeout value */
my_net_set_read_timeout(net, thd->variables.net_read_timeout);
......@@ -1549,6 +1530,24 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
command_name[command].str :
"<?>")));
bool drop_more_results= 0;
#ifdef WITH_WSREP
/*
Bail out if DB snapshot has not been installed.
*/
if (!(server_command_flags[command] & CF_SKIP_WSREP_CHECK) &&
!wsrep_node_is_ready(thd))
{
thd->protocol->end_statement();
/* Performance Schema Interface instrumentation end. */
MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da());
thd->m_statement_psi= NULL;
thd->m_digest= NULL;
DBUG_RETURN (FALSE);
}
#endif
if (!is_com_multi)
inc_thread_running();
......
......@@ -102,6 +102,7 @@ pthread_handler_t handle_bootstrap(void *arg);
int mysql_execute_command(THD *thd);
bool do_command(THD *thd);
void do_handle_bootstrap(THD *thd);
enum enum_server_command fetch_command(THD *thd, char *packet);
bool dispatch_command(enum enum_server_command command, THD *thd,
char* packet, uint packet_length,
bool is_com_multi, bool is_next_command);
......
......@@ -23,6 +23,7 @@
#include <sql_audit.h>
#include <debug_sync.h>
#include <threadpool.h>
#include "sql_parse.h"
/* Threadpool parameters */
......@@ -46,7 +47,6 @@ static int threadpool_process_request(THD *thd);
static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data);
extern "C" pthread_key(struct st_my_thread_var*, THR_KEY_mysys);
extern bool do_command(THD*);
static inline TP_connection *get_TP_connection(THD *thd)
{
......@@ -222,6 +222,7 @@ void tp_callback(TP_connection *c)
static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
{
THD *thd= NULL;
bool error= false;
/*
Create a new connection context: mysys_thread_var and PSI thread
......@@ -283,6 +284,26 @@ static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
thd->skip_wait_timeout= true;
set_thd_idle(thd);
if (thd && thd->bundle_command.str)
{
thd->bundle_command.str[thd->bundle_command.length]= '\0'; /* safety */
enum enum_server_command command=
fetch_command(thd, thd->bundle_command.str);
/* it is not a real error, just QUIT */
error= dispatch_command(command, thd, thd->bundle_command.str + 1,
(uint) (thd->bundle_command.length - 1),
FALSE, FALSE);
net_flush(&thd->net);
mysql_audit_release(thd);
}
if (error)
{
threadpool_remove_connection(thd);
thd= NULL;
}
return thd;
end:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment