Commit b01c4261 authored by Alexey Botchkov's avatar Alexey Botchkov

MDEV-19275 Provide SQL service to plugins.

Protocol_local fixed so it can be used now.
Some Protocol:: methods made virtual so they can adapt.
as well as net_ok and net_send_error functions.
execute_sql_string function is exported to the plugins.
To be changed with the mysql_use_result.
parent 68cba091
......@@ -491,7 +491,7 @@ int emb_load_querycache_result(THD *thd, Querycache_stream *src)
*prev_row= NULL;
data->embedded_info->prev_ptr= prev_row;
return_ok:
net_send_eof(thd, thd->server_status,
thd->protocol->net_send_eof(thd, thd->server_status,
thd->get_stmt_da()->current_statement_warn_count());
DBUG_RETURN(0);
err:
......
......@@ -81,4 +81,3 @@ class Querycache_stream
uint emb_count_querycache_size(THD *thd);
int emb_load_querycache_result(THD *thd, Querycache_stream *src);
void emb_store_querycache_result(Querycache_stream *dst, THD* thd);
bool net_send_eof(THD *thd, uint server_status, uint total_warn_count);
......@@ -1139,7 +1139,7 @@ bool Protocol::send_result_set_metadata(List<Item> *list, uint flags)
for (uint pos= 0 ; (item= it++); pos++)
{
if (prot.store_field_metadata(thd, item, pos))
if (prot.store_item_metadata(thd, item, pos))
goto err;
}
......@@ -1254,8 +1254,7 @@ bool Protocol_binary::write()
@retval FALSE Success
*/
bool
net_send_ok(THD *thd,
bool Protocol::net_send_ok(THD *thd,
uint server_status, uint statement_warn_count,
ulonglong affected_rows, ulonglong id, const char *message,
bool, bool)
......@@ -1290,7 +1289,7 @@ net_send_ok(THD *thd,
*/
bool
net_send_eof(THD *thd, uint server_status, uint statement_warn_count)
Protocol::net_send_eof(THD *thd, uint server_status, uint statement_warn_count)
{
bool error= write_eof_packet(thd, server_status, statement_warn_count);
thd->cur_data= 0;
......@@ -1298,7 +1297,7 @@ net_send_eof(THD *thd, uint server_status, uint statement_warn_count)
}
bool net_send_error_packet(THD *thd, uint sql_errno, const char *err,
bool Protocol::net_send_error_packet(THD *thd, uint sql_errno, const char *err,
const char *sqlstate)
{
uint error;
......
......@@ -1087,11 +1087,6 @@ unsigned int STDCALL mysql_field_count(MYSQL *mysql)
return mysql->field_count;
}
my_ulonglong STDCALL mysql_affected_rows(MYSQL *mysql)
{
return mysql->affected_rows;
}
my_ulonglong STDCALL mysql_insert_id(MYSQL *mysql)
{
return mysql->insert_id;
......
......@@ -3735,6 +3735,12 @@ static MYSQL_RES * cli_use_result(MYSQL *mysql)
}
my_ulonglong STDCALL mysql_affected_rows(MYSQL *mysql)
{
return mysql->affected_rows;
}
/**************************************************************************
Return next row of the query results
**************************************************************************/
......
......@@ -2553,7 +2553,7 @@ void close_connection(THD *thd, uint sql_errno)
if (sql_errno)
{
net_send_error(thd, sql_errno, ER_DEFAULT(sql_errno), NULL);
thd->protocol->net_send_error(thd, sql_errno, ER_DEFAULT(sql_errno), NULL);
thd->print_aborted_warning(lvl, ER_DEFAULT(sql_errno));
}
else
......
......@@ -32,13 +32,6 @@
#include <stdarg.h>
static const unsigned int PACKET_BUFFER_EXTRA_ALLOC= 1024;
/* Declared non-static only because of the embedded library. */
bool net_send_error_packet(THD *, uint, const char *, const char *);
/* Declared non-static only because of the embedded library. */
bool net_send_ok(THD *, uint, uint, ulonglong, ulonglong, const char *,
bool, bool);
/* Declared non-static only because of the embedded library. */
bool net_send_eof(THD *thd, uint server_status, uint statement_warn_count);
#ifndef EMBEDDED_LIBRARY
static bool write_eof_packet(THD *, NET *, uint, uint);
#endif
......@@ -152,11 +145,11 @@ bool Protocol_binary::net_store_data_cs(const uchar *from, size_t length,
@retval TRUE An error occurred and the message wasn't sent properly
*/
bool net_send_error(THD *thd, uint sql_errno, const char *err,
bool Protocol::net_send_error(THD *thd, uint sql_errno, const char *err,
const char* sqlstate)
{
bool error;
DBUG_ENTER("net_send_error");
DBUG_ENTER("Protocol::net_send_error");
DBUG_ASSERT(!thd->spcont);
DBUG_ASSERT(sql_errno);
......@@ -214,10 +207,10 @@ bool net_send_error(THD *thd, uint sql_errno, const char *err,
#ifndef EMBEDDED_LIBRARY
bool
net_send_ok(THD *thd,
Protocol::net_send_ok(THD *thd,
uint server_status, uint statement_warn_count,
ulonglong affected_rows, ulonglong id, const char *message,
bool is_eof,
ulonglong affected_rows, ulonglong id,
const char *message, bool is_eof,
bool skip_flush)
{
NET *net= &thd->net;
......@@ -226,7 +219,7 @@ net_send_ok(THD *thd,
bool state_changed= false;
bool error= FALSE;
DBUG_ENTER("net_send_ok");
DBUG_ENTER("Protocol::net_send_ok");
if (! net->vio) // hack for re-parsing queries
{
......@@ -329,11 +322,11 @@ static uchar eof_buff[1]= { (uchar) 254 }; /* Marker for end of fields */
*/
bool
net_send_eof(THD *thd, uint server_status, uint statement_warn_count)
Protocol::net_send_eof(THD *thd, uint server_status, uint statement_warn_count)
{
NET *net= &thd->net;
bool error= FALSE;
DBUG_ENTER("net_send_eof");
DBUG_ENTER("Protocol::net_send_eof");
/*
Check if client understand new format packets (OK instead of EOF)
......@@ -420,7 +413,7 @@ static bool write_eof_packet(THD *thd, NET *net,
@retval TRUE An error occurred and the messages wasn't sent properly
*/
bool net_send_error_packet(THD *thd, uint sql_errno, const char *err,
bool Protocol::net_send_error_packet(THD *thd, uint sql_errno, const char *err,
const char* sqlstate)
{
......@@ -434,7 +427,7 @@ bool net_send_error_packet(THD *thd, uint sql_errno, const char *err,
char buff[2+1+SQLSTATE_LENGTH+MYSQL_ERRMSG_SIZE], *pos;
my_bool ret;
uint8 save_compress;
DBUG_ENTER("send_error_packet");
DBUG_ENTER("Protocol::send_error_packet");
if (net->vio == 0)
{
......@@ -963,7 +956,7 @@ bool Protocol::send_result_set_metadata(List<Item> *list, uint flags)
for (uint pos= 0; (item=it++); pos++)
{
prot.prepare_for_resend();
if (prot.store_field_metadata(thd, item, pos))
if (prot.store_item_metadata(thd, item, pos))
goto err;
if (prot.write())
DBUG_RETURN(1);
......@@ -1043,7 +1036,7 @@ bool Protocol::write()
#endif /* EMBEDDED_LIBRARY */
bool Protocol_text::store_field_metadata(THD *thd, Item *item, uint pos)
bool Protocol_text::store_item_metadata(THD *thd, Item *item, uint pos)
{
Send_field field(thd, item);
return store_field_metadata(thd, field, item->charset_for_protocol(), pos);
......
......@@ -50,14 +50,13 @@ class Protocol
}
#endif
uint field_count;
#ifndef EMBEDDED_LIBRARY
bool net_store_data(const uchar *from, size_t length);
bool net_store_data_cs(const uchar *from, size_t length,
CHARSET_INFO *fromcs, CHARSET_INFO *tocs);
#else
virtual bool net_store_data(const uchar *from, size_t length);
virtual bool net_store_data_cs(const uchar *from, size_t length,
CHARSET_INFO *fromcs, CHARSET_INFO *tocs);
virtual bool net_send_ok(THD *, uint, uint, ulonglong, ulonglong, const char *,
bool, bool);
virtual bool net_send_error_packet(THD *, uint, const char *, const char *);
#ifdef EMBEDDED_LIBRARY
char **next_field;
MYSQL_FIELD *next_mysql_field;
MEM_ROOT *alloc;
......@@ -181,6 +180,9 @@ class Protocol
};
virtual enum enum_protocol_type type()= 0;
virtual bool net_send_eof(THD *thd, uint server_status, uint statement_warn_count);
bool net_send_error(THD *thd, uint sql_errno, const char *err,
const char* sqlstate);
void end_statement();
friend int send_answer_1(Protocol *protocol, String *s1, String *s2,
......@@ -191,7 +193,7 @@ class Protocol
/** Class used for the old (MySQL 4.0 protocol). */
class Protocol_text final :public Protocol
class Protocol_text :public Protocol
{
StringBuffer<FLOATING_POINT_BUFFER> buffer;
bool store_numeric_string_aux(const char *from, size_t length);
......@@ -226,10 +228,10 @@ class Protocol_text final :public Protocol
#ifdef EMBEDDED_LIBRARY
void remove_last_row() override;
#endif
bool store_field_metadata(const THD *thd, const Send_field &field,
virtual bool store_field_metadata(const THD *thd, const Send_field &field,
CHARSET_INFO *charset_for_protocol,
uint pos);
bool store_field_metadata(THD *thd, Item *item, uint pos);
bool store_item_metadata(THD *thd, Item *item, uint pos);
bool store_field_metadata_for_list_fields(const THD *thd, Field *field,
const TABLE_LIST *table_list,
uint pos);
......@@ -321,8 +323,6 @@ class Protocol_discard final : public Protocol
void send_warning(THD *thd, uint sql_errno, const char *err=0);
bool net_send_error(THD *thd, uint sql_errno, const char *err,
const char* sqlstate);
void net_send_progress_packet(THD *thd);
uchar *net_store_data(uchar *to,const uchar *from, size_t length);
uchar *net_store_data(uchar *to,int32 from);
......
......@@ -254,35 +254,36 @@ void mysql_audit_notify_connection_disconnect(THD *thd, int errcode)
}
static inline
void mysql_audit_notify_connection_change_user(THD *thd)
void mysql_audit_notify_connection_change_user(THD *thd,
const Security_context *old_ctx)
{
if (mysql_audit_connection_enabled())
{
const Security_context *sctx= thd->security_ctx;
mysql_event_connection event;
event.event_subclass= MYSQL_AUDIT_CONNECTION_CHANGE_USER;
event.status= thd->get_stmt_da()->is_error() ?
thd->get_stmt_da()->sql_errno() : 0;
event.thread_id= (unsigned long)thd->thread_id;
event.user= sctx->user;
event.user_length= safe_strlen_uint(sctx->user);
event.priv_user= sctx->priv_user;
event.priv_user_length= strlen_uint(sctx->priv_user);
event.external_user= sctx->external_user;
event.external_user_length= safe_strlen_uint(sctx->external_user);
event.proxy_user= sctx->proxy_user;
event.proxy_user_length= strlen_uint(sctx->proxy_user);
event.host= sctx->host;
event.host_length= safe_strlen_uint(sctx->host);
event.ip= sctx->ip;
event.ip_length= safe_strlen_uint(sctx->ip);
event.user= old_ctx->user;
event.user_length= safe_strlen_uint(old_ctx->user);
event.priv_user= old_ctx->priv_user;
event.priv_user_length= strlen_uint(old_ctx->priv_user);
event.external_user= old_ctx->external_user;
event.external_user_length= safe_strlen_uint(old_ctx->external_user);
event.proxy_user= old_ctx->proxy_user;
event.proxy_user_length= strlen_uint(old_ctx->proxy_user);
event.host= old_ctx->host;
event.host_length= safe_strlen_uint(old_ctx->host);
event.ip= old_ctx->ip;
event.ip_length= safe_strlen_uint(old_ctx->ip);
event.database= thd->db;
mysql_audit_notify(thd, MYSQL_AUDIT_CONNECTION_CLASS, &event);
}
}
static inline
void mysql_audit_external_lock_ex(THD *thd, my_thread_id thread_id,
const char *user, const char *host, const char *ip, query_id_t query_id,
......
......@@ -348,6 +348,12 @@ void thd_clear_errors(THD *thd)
}
extern "C" unsigned long long thd_query_id(const MYSQL_THD thd)
{
return((unsigned long long)thd->query_id);
}
/**
Get thread attributes for connection threads
......@@ -4983,6 +4989,55 @@ extern "C" size_t thd_query_safe(MYSQL_THD thd, char *buf, size_t buflen)
}
extern "C" const char *thd_user_name(MYSQL_THD thd)
{
if (!thd->security_ctx)
return 0;
return thd->security_ctx->user;
}
extern "C" const char *thd_client_host(MYSQL_THD thd)
{
if (!thd->security_ctx)
return 0;
return thd->security_ctx->host;
}
extern "C" const char *thd_client_ip(MYSQL_THD thd)
{
if (!thd->security_ctx)
return 0;
return thd->security_ctx->ip;
}
extern "C" LEX_CSTRING *thd_current_db(MYSQL_THD thd)
{
return &thd->db;
}
extern "C" int thd_current_status(MYSQL_THD thd)
{
Diagnostics_area *da= thd->get_stmt_da();
if (!da)
return 0;
return da->is_error() ? da->sql_errno() : 0;
}
extern "C" enum enum_server_command thd_current_command(MYSQL_THD thd)
{
return thd->get_command();
}
extern "C" int thd_slave_thread(const MYSQL_THD thd)
{
return(thd->slave_thread);
......
......@@ -195,7 +195,14 @@ extern char empty_c_string[1];
extern MYSQL_PLUGIN_IMPORT const char **errmesg;
extern "C" LEX_STRING * thd_query_string (MYSQL_THD thd);
extern "C" unsigned long long thd_query_id(const MYSQL_THD thd);
extern "C" size_t thd_query_safe(MYSQL_THD thd, char *buf, size_t buflen);
extern "C" const char *thd_user_name(MYSQL_THD thd);
extern "C" const char *thd_client_host(MYSQL_THD thd);
extern "C" const char *thd_client_ip(MYSQL_THD thd);
extern "C" LEX_CSTRING *thd_current_db(MYSQL_THD thd);
extern "C" int thd_current_status(MYSQL_THD thd);
extern "C" enum enum_server_command thd_current_command(MYSQL_THD thd);
/**
@class CSET_STRING
......
......@@ -1494,7 +1494,7 @@ void CONNECT::close_with_error(uint sql_errno,
if (thd)
{
if (sql_errno)
net_send_error(thd, sql_errno, message, NULL);
thd->protocol->net_send_error(thd, sql_errno, message, NULL);
close_connection(thd, close_error);
delete thd;
set_current_thd(0);
......
......@@ -1748,7 +1748,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
else
auth_rc= acl_authenticate(thd, packet_length);
mysql_audit_notify_connection_change_user(thd);
mysql_audit_notify_connection_change_user(thd, &save_security_ctx);
if (auth_rc)
{
/* Free user if allocated by acl_authenticate */
......
......@@ -243,61 +243,6 @@ class Execute_sql_statement: public Server_runnable
class Ed_connection;
/**
Protocol_local: a helper class to intercept the result
of the data written to the network.
*/
class Protocol_local :public Protocol
{
public:
Protocol_local(THD *thd, Ed_connection *ed_connection);
~Protocol_local() { free_root(&m_rset_root, MYF(0)); }
protected:
virtual void prepare_for_resend();
virtual bool write();
virtual bool store_null();
virtual bool store_tiny(longlong from);
virtual bool store_short(longlong from);
virtual bool store_long(longlong from);
virtual bool store_longlong(longlong from, bool unsigned_flag);
virtual bool store_decimal(const my_decimal *);
virtual bool store_str(const char *from, size_t length,
CHARSET_INFO *fromcs, CHARSET_INFO *tocs);
virtual bool store(MYSQL_TIME *time, int decimals);
virtual bool store_date(MYSQL_TIME *time);
virtual bool store_time(MYSQL_TIME *time, int decimals);
virtual bool store_float(float value, uint32 decimals);
virtual bool store_double(double value, uint32 decimals);
virtual bool store(Field *field);
virtual bool send_result_set_metadata(List<Item> *list, uint flags);
virtual bool send_out_parameters(List<Item_param> *sp_params);
#ifdef EMBEDDED_LIBRARY
void remove_last_row();
#endif
virtual enum enum_protocol_type type() { return PROTOCOL_LOCAL; };
virtual bool send_ok(uint server_status, uint statement_warn_count,
ulonglong affected_rows, ulonglong last_insert_id,
const char *message, bool skip_flush);
virtual bool send_eof(uint server_status, uint statement_warn_count);
virtual bool send_error(uint sql_errno, const char *err_msg, const char* sqlstate);
private:
bool store_string(const char *str, size_t length,
CHARSET_INFO *src_cs, CHARSET_INFO *dst_cs);
bool store_column(const void *data, size_t length);
void opt_add_row_to_rset();
private:
Ed_connection *m_connection;
MEM_ROOT m_rset_root;
List<Ed_row> *m_rset;
size_t m_column_count;
Ed_column *m_current_row;
Ed_column *m_current_column;
};
/******************************************************************************
Implementation
......@@ -3792,6 +3737,7 @@ Execute_sql_statement::execute_server_code(THD *thd)
end:
thd->lex->restore_set_statement_var();
delete_explain_query(thd->lex);
lex_end(thd->lex);
return error;
......@@ -5035,12 +4981,12 @@ Ed_connection::free_old_result()
*/
bool
Ed_connection::execute_direct(LEX_STRING sql_text)
Ed_connection::execute_direct(Protocol *p, LEX_STRING sql_text)
{
Execute_sql_statement execute_sql_statement(sql_text);
DBUG_PRINT("ed_query", ("%s", sql_text.str));
return execute_direct(&execute_sql_statement);
return execute_direct(p, &execute_sql_statement);
}
......@@ -5057,10 +5003,9 @@ Ed_connection::execute_direct(LEX_STRING sql_text)
@param server_runnable A code fragment to execute.
*/
bool Ed_connection::execute_direct(Server_runnable *server_runnable)
bool Ed_connection::execute_direct(Protocol *p, Server_runnable *server_runnable)
{
bool rc= FALSE;
Protocol_local protocol_local(m_thd, this);
Prepared_statement stmt(m_thd);
Protocol *save_protocol= m_thd->protocol;
Diagnostics_area *save_diagnostics_area= m_thd->get_stmt_da();
......@@ -5069,7 +5014,7 @@ bool Ed_connection::execute_direct(Server_runnable *server_runnable)
free_old_result(); /* Delete all data from previous execution, if any */
m_thd->protocol= &protocol_local;
m_thd->protocol= p;
m_thd->set_stmt_da(&m_diagnostics_area);
rc= stmt.execute_server_runnable(server_runnable);
......@@ -5156,340 +5101,741 @@ Ed_connection::store_result_set()
return ed_result_set;
}
/*************************************************************************
* Protocol_local
**************************************************************************/
/*
MENT-56
Protocol_local and service_sql for plugins to enable 'local' SQL query execution.
*/
#ifndef EMBEDDED_LIBRARY
// This part is mostly copied from libmysqld/lib_sql.cc
// TODO: get rid of code duplications
#include <mysql.h>
#include "../libmysqld/embedded_priv.h"
Protocol_local::Protocol_local(THD *thd, Ed_connection *ed_connection)
:Protocol(thd),
m_connection(ed_connection),
m_rset(NULL),
m_column_count(0),
m_current_row(NULL),
m_current_column(NULL)
class Protocol_local : public Protocol_text
{
clear_alloc_root(&m_rset_root);
}
public:
struct st_mysql_data *cur_data;
struct st_mysql_data *first_data;
struct st_mysql_data **data_tail;
void clear_data_list();
struct st_mysql_data *alloc_new_dataset();
char **next_field;
MYSQL_FIELD *next_mysql_field;
MEM_ROOT *alloc;
Protocol_local(THD *thd_arg, ulong prealloc= 0) :
Protocol_text(thd_arg, prealloc),
cur_data(0), first_data(0), data_tail(&first_data)
{}
/**
Called between two result set rows.
protected:
bool net_store_data(const uchar *from, size_t length);
bool net_store_data_cs(const uchar *from, size_t length,
CHARSET_INFO *fromcs, CHARSET_INFO *tocs);
bool net_send_eof(THD *thd, uint server_status, uint statement_warn_count);
bool net_send_ok(THD *, uint, uint, ulonglong, ulonglong, const char *,
bool, bool);
bool net_send_error_packet(THD *, uint, const char *, const char *);
bool begin_dataset();
bool begin_dataset(THD *thd, uint numfields);
bool write();
bool flush();
bool store_field_metadata(const THD *thd, const Send_field &field,
CHARSET_INFO *charset_for_protocol,
uint pos);
bool send_result_set_metadata(List<Item> *list, uint flags);
void remove_last_row();
bool store_null();
void prepare_for_resend();
bool send_list_fields(List<Field> *list, const TABLE_LIST *table_list);
Prepare structures to fill result set rows.
Unfortunately, we can't return an error here. If memory allocation
fails, we'll have to return an error later. And so is done
in methods such as @sa store_column().
*/
enum enum_protocol_type type() { return PROTOCOL_LOCAL; };
};
void Protocol_local::prepare_for_resend()
static
bool
write_eof_packet_local(THD *thd,
Protocol_local *p, uint server_status, uint statement_warn_count)
{
DBUG_ASSERT(alloc_root_inited(&m_rset_root));
opt_add_row_to_rset();
/* Start a new row. */
m_current_row= (Ed_column *) alloc_root(&m_rset_root,
sizeof(Ed_column) * m_column_count);
m_current_column= m_current_row;
// if (!thd->mysql) // bootstrap file handling
// return FALSE;
/*
The following test should never be true, but it's better to do it
because if 'is_fatal_error' is set the server is not going to execute
other queries (see the if test in dispatch_command / COM_QUERY)
*/
if (thd->is_fatal_error)
thd->server_status&= ~SERVER_MORE_RESULTS_EXISTS;
p->cur_data->embedded_info->server_status= server_status;
/*
Don't send warn count during SP execution, as the warn_list
is cleared between substatements, and mysqltest gets confused
*/
p->cur_data->embedded_info->warning_count=
(thd->spcont ? 0 : MY_MIN(statement_warn_count, 65535));
return FALSE;
}
/**
In "real" protocols this is called to finish a result set row.
Unused in the local implementation.
*/
bool Protocol_local::write()
MYSQL_DATA *Protocol_local::alloc_new_dataset()
{
return FALSE;
MYSQL_DATA *data;
struct embedded_query_result *emb_data;
if (!my_multi_malloc(PSI_INSTRUMENT_ME, MYF(MY_WME | MY_ZEROFILL),
&data, sizeof(*data),
&emb_data, sizeof(*emb_data),
NULL))
return NULL;
emb_data->prev_ptr= &data->data;
cur_data= data;
*data_tail= data;
data_tail= &emb_data->next;
data->embedded_info= emb_data;
return data;
}
/**
A helper function to add the current row to the current result
set. Called in @sa prepare_for_resend(), when a new row is started,
and in send_eof(), when the result set is finished.
*/
void Protocol_local::opt_add_row_to_rset()
static char *dup_str_aux(MEM_ROOT *root, const char *from, uint length,
CHARSET_INFO *fromcs, CHARSET_INFO *tocs)
{
if (m_current_row)
uint32 dummy32;
uint dummy_err;
char *result;
/* 'tocs' is set 0 when client issues SET character_set_results=NULL */
if (tocs && String::needs_conversion(0, fromcs, tocs, &dummy32))
{
/* Add the old row to the result set */
Ed_row *ed_row= new (&m_rset_root) Ed_row(m_current_row, m_column_count);
if (ed_row)
m_rset->push_back(ed_row, &m_rset_root);
uint new_len= (tocs->mbmaxlen * length) / fromcs->mbminlen + 1;
result= (char *)alloc_root(root, new_len);
length= copy_and_convert(result, new_len,
tocs, from, length, fromcs, &dummy_err);
}
else
{
result= (char *)alloc_root(root, length + 1);
memcpy(result, from, length);
}
result[length]= 0;
return result;
}
/**
Add a NULL column to the current row.
*/
static char *dup_str_aux(MEM_ROOT *root, const LEX_CSTRING &from,
CHARSET_INFO *fromcs, CHARSET_INFO *tocs)
{
return dup_str_aux(root, from.str, (uint) from.length, fromcs, tocs);
}
bool Protocol_local::store_null()
bool Protocol_local::net_store_data(const uchar *from, size_t length)
{
if (m_current_column == NULL)
return TRUE; /* prepare_for_resend() failed to allocate memory. */
char *field_buf;
// if (!thd->mysql) // bootstrap file handling
// return FALSE;
bzero(m_current_column, sizeof(*m_current_column));
++m_current_column;
if (!(field_buf= (char*) alloc_root(alloc, length + sizeof(uint) + 1)))
return TRUE;
*(uint *)field_buf= (uint) length;
*next_field= field_buf + sizeof(uint);
memcpy((uchar*) *next_field, from, length);
(*next_field)[length]= 0;
if (next_mysql_field->max_length < length)
next_mysql_field->max_length= (unsigned long) length;
++next_field;
++next_mysql_field;
return FALSE;
}
bool Protocol_local::net_store_data_cs(const uchar *from, size_t length,
CHARSET_INFO *from_cs, CHARSET_INFO *to_cs)
{
uint conv_length= (uint) (to_cs->mbmaxlen * length / from_cs->mbminlen);
uint dummy_error;
char *field_buf;
// if (!thd->mysql) // bootstrap file handling
// return false;
if (!(field_buf= (char*) alloc_root(alloc, conv_length + sizeof(uint) + 1)))
return true;
*next_field= field_buf + sizeof(uint);
length= copy_and_convert(*next_field, conv_length, to_cs,
(const char*) from, length, from_cs, &dummy_error);
*(uint *) field_buf= (uint) length;
(*next_field)[length]= 0;
if (next_mysql_field->max_length < length)
next_mysql_field->max_length= (unsigned long) length;
++next_field;
++next_mysql_field;
return false;
}
/**
A helper method to add any column to the current row
in its binary form.
Embedded library implementation of OK response.
This function is used by the server to write 'OK' packet to
the "network" when the server is compiled as an embedded library.
Since there is no network in the embedded configuration,
a different implementation is necessary.
Instead of marshalling response parameters to a network representation
and then writing it to the socket, here we simply copy the data to the
corresponding client-side connection structures.
Allocates memory for the data in the result set memory root.
@sa Server implementation of net_send_ok in protocol.cc for
description of the arguments.
@return
@retval TRUE An error occurred
@retval FALSE Success
*/
bool Protocol_local::store_column(const void *data, size_t length)
bool
Protocol_local::net_send_ok(THD *thd,
uint server_status, uint statement_warn_count,
ulonglong affected_rows, ulonglong id, const char *message, bool, bool)
{
if (m_current_column == NULL)
return TRUE; /* prepare_for_resend() failed to allocate memory. */
/*
alloc_root() automatically aligns memory, so we don't need to
do any extra alignment if we're pointing to, say, an integer.
*/
m_current_column->str= (char*) memdup_root(&m_rset_root,
data,
length + 1 /* Safety */);
if (! m_current_column->str)
return TRUE;
m_current_column->str[length]= '\0'; /* Safety */
m_current_column->length= length;
++m_current_column;
return FALSE;
DBUG_ENTER("emb_net_send_ok");
MYSQL_DATA *data;
// MYSQL *mysql= thd->mysql;
// if (!mysql) // bootstrap file handling
// DBUG_RETURN(FALSE);
if (!(data= alloc_new_dataset()))
DBUG_RETURN(TRUE);
data->embedded_info->affected_rows= affected_rows;
data->embedded_info->insert_id= id;
if (message)
strmake_buf(data->embedded_info->info, message);
bool error= write_eof_packet_local(thd, this,
server_status, statement_warn_count);
cur_data= 0;
DBUG_RETURN(error);
}
/**
Store a string value in a result set column, optionally
having converted it to character_set_results.
Embedded library implementation of EOF response.
@sa net_send_ok
@return
@retval TRUE An error occurred
@retval FALSE Success
*/
bool
Protocol_local::store_string(const char *str, size_t length,
CHARSET_INFO *src_cs, CHARSET_INFO *dst_cs)
Protocol_local::net_send_eof(THD *thd, uint server_status,
uint statement_warn_count)
{
/* Store with conversion */
uint error_unused;
if (needs_conversion(src_cs, dst_cs))
{
if (unlikely(convert->copy(str, length, src_cs, dst_cs, &error_unused)))
return TRUE;
str= convert->ptr();
length= convert->length();
}
return store_column(str, length);
bool error= write_eof_packet_local(thd, this, server_status,
statement_warn_count);
cur_data= 0;
return error;
}
/** Store a tiny int as is (1 byte) in a result set column. */
bool Protocol_local::net_send_error_packet(THD *thd, uint sql_errno,
const char *err, const char *sqlstate)
{
uint error;
char converted_err[MYSQL_ERRMSG_SIZE];
MYSQL_DATA *data= cur_data;
struct embedded_query_result *ei;
// if (!thd->mysql) // bootstrap file handling
// {
// fprintf(stderr, "ERROR: %d %s\n", sql_errno, err);
// return TRUE;
// }
if (!data)
data= alloc_new_dataset();
ei= data->embedded_info;
ei->last_errno= sql_errno;
convert_error_message(converted_err, sizeof(converted_err),
thd->variables.character_set_results,
err, strlen(err),
system_charset_info, &error);
/* Converted error message is always null-terminated. */
strmake_buf(ei->info, converted_err);
strmov(ei->sqlstate, sqlstate);
ei->server_status= thd->server_status;
cur_data= 0;
return FALSE;
}
bool Protocol_local::store_tiny(longlong value)
bool Protocol_local::begin_dataset()
{
char v= (char) value;
return store_column(&v, 1);
MYSQL_DATA *data= alloc_new_dataset();
if (!data)
return 1;
alloc= &data->alloc;
/* Assume rowlength < 8192 */
init_alloc_root(PSI_INSTRUMENT_ME, alloc, 8192, 0, MYF(0));
alloc->min_malloc= sizeof(MYSQL_ROWS);
return 0;
}
/** Store a short as is (2 bytes, host order) in a result set column. */
bool Protocol_local::store_short(longlong value)
bool Protocol_local::begin_dataset(THD *thd, uint numfields)
{
int16 v= (int16) value;
return store_column(&v, 2);
if (begin_dataset())
return true;
MYSQL_DATA *data= cur_data;
data->fields= field_count= numfields;
if (!(data->embedded_info->fields_list=
(MYSQL_FIELD*)alloc_root(&data->alloc, sizeof(MYSQL_FIELD)*field_count)))
return true;
return false;
}
/** Store a "long" as is (4 bytes, host order) in a result set column. */
bool Protocol_local::store_long(longlong value)
bool Protocol_local::write()
{
int32 v= (int32) value;
return store_column(&v, 4);
}
// if (!thd->mysql) // bootstrap file handling
// return false;
*next_field= 0;
return false;
}
/** Store a "longlong" as is (8 bytes, host order) in a result set column. */
bool Protocol_local::store_longlong(longlong value, bool unsigned_flag)
bool Protocol_local::flush()
{
int64 v= (int64) value;
return store_column(&v, 8);
return 0;
}
/** Store a decimal in string format in a result set column */
bool Protocol_local::store_decimal(const my_decimal *value)
bool Protocol_local::store_field_metadata(const THD * thd,
const Send_field &server_field,
CHARSET_INFO *charset_for_protocol,
uint pos)
{
DBUG_ASSERT(0); // This method is not used yet
StringBuffer<DECIMAL_MAX_STR_LENGTH> str;
return value->to_string(&str) ? store_column(str.ptr(), str.length()) : true;
}
CHARSET_INFO *cs= system_charset_info;
CHARSET_INFO *thd_cs= thd->variables.character_set_results;
MYSQL_DATA *data= cur_data;
MEM_ROOT *field_alloc= &data->alloc;
MYSQL_FIELD *client_field= &cur_data->embedded_info->fields_list[pos];
DBUG_ASSERT(server_field.is_sane());
client_field->db= dup_str_aux(field_alloc, server_field.db_name,
cs, thd_cs);
client_field->table= dup_str_aux(field_alloc, server_field.table_name,
cs, thd_cs);
client_field->name= dup_str_aux(field_alloc, server_field.col_name,
cs, thd_cs);
client_field->org_table= dup_str_aux(field_alloc, server_field.org_table_name,
cs, thd_cs);
client_field->org_name= dup_str_aux(field_alloc, server_field.org_col_name,
cs, thd_cs);
if (charset_for_protocol == &my_charset_bin || thd_cs == NULL)
{
/* No conversion */
client_field->charsetnr= charset_for_protocol->number;
client_field->length= server_field.length;
}
else
{
/* With conversion */
client_field->charsetnr= thd_cs->number;
client_field->length= server_field.max_octet_length(charset_for_protocol,
thd_cs);
}
client_field->type= server_field.type_handler()->type_code_for_protocol();
client_field->flags= (uint16) server_field.flags;
client_field->decimals= server_field.decimals;
client_field->db_length= (unsigned int) strlen(client_field->db);
client_field->table_length= (unsigned int) strlen(client_field->table);
client_field->name_length= (unsigned int) strlen(client_field->name);
client_field->org_name_length= (unsigned int) strlen(client_field->org_name);
client_field->org_table_length= (unsigned int) strlen(client_field->org_table);
/** Store a string. */
client_field->catalog= dup_str_aux(field_alloc, "def", 3, cs, thd_cs);
client_field->catalog_length= 3;
bool Protocol_local::store_str(const char *str, size_t length,
CHARSET_INFO *src_cs, CHARSET_INFO *dst_cs)
{
return store_string(str, length, src_cs, dst_cs);
}
if (IS_NUM(client_field->type))
client_field->flags|= NUM_FLAG;
client_field->max_length= 0;
client_field->def= 0;
return false;
}
/* Store MYSQL_TIME (in binary format) */
bool Protocol_local::store(MYSQL_TIME *time, int decimals)
void Protocol_local::remove_last_row()
{
if (decimals != AUTO_SEC_PART_DIGITS)
my_datetime_trunc(time, decimals);
return store_column(time, sizeof(MYSQL_TIME));
}
MYSQL_DATA *data= cur_data;
MYSQL_ROWS **last_row_hook= &data->data;
my_ulonglong count= data->rows;
DBUG_ENTER("Protocol_text::remove_last_row");
while (--count)
last_row_hook= &(*last_row_hook)->next;
*last_row_hook= 0;
data->embedded_info->prev_ptr= last_row_hook;
data->rows--;
/** Store MYSQL_TIME (in binary format) */
DBUG_VOID_RETURN;
}
bool Protocol_local::store_date(MYSQL_TIME *time)
bool Protocol_local::send_result_set_metadata(List<Item> *list, uint flags)
{
return store_column(time, sizeof(MYSQL_TIME));
}
List_iterator_fast<Item> it(*list);
Item *item;
// Protocol_local prot(thd);
DBUG_ENTER("send_result_set_metadata");
// if (!thd->mysql) // bootstrap file handling
// DBUG_RETURN(0);
/** Store MYSQL_TIME (in binary format) */
if (begin_dataset(thd, list->elements))
goto err;
bool Protocol_local::store_time(MYSQL_TIME *time, int decimals)
{
if (decimals != AUTO_SEC_PART_DIGITS)
my_time_trunc(time, decimals);
return store_column(time, sizeof(MYSQL_TIME));
for (uint pos= 0 ; (item= it++); pos++)
{
if (/*prot.*/store_item_metadata(thd, item, pos))
goto err;
}
if (flags & SEND_EOF)
write_eof_packet_local(thd, this, thd->server_status,
thd->get_stmt_da()->current_statement_warn_count());
DBUG_RETURN(prepare_for_send(list->elements));
err:
my_error(ER_OUT_OF_RESOURCES, MYF(0)); /* purecov: inspected */
DBUG_RETURN(1); /* purecov: inspected */
}
static void
list_fields_send_default(THD *thd, Protocol_local *p, Field *fld, uint pos)
{
char buff[80];
String tmp(buff, sizeof(buff), default_charset_info), *res;
MYSQL_FIELD *client_field= &p->cur_data->embedded_info->fields_list[pos];
if (fld->is_null() || !(res= fld->val_str(&tmp)))
{
client_field->def_length= 0;
client_field->def= strmake_root(&p->cur_data->alloc, "", 0);
}
else
{
client_field->def_length= res->length();
client_field->def= strmake_root(&p->cur_data->alloc, res->ptr(),
client_field->def_length);
}
}
/* Store a floating point number, as is. */
bool Protocol_local::store_float(float value, uint32 decimals)
bool Protocol_local::send_list_fields(List<Field> *list, const TABLE_LIST *table_list)
{
return store_column(&value, sizeof(float));
DBUG_ENTER("send_result_set_metadata");
Protocol_text prot(thd);
List_iterator_fast<Field> it(*list);
Field *fld;
// if (!thd->mysql) // bootstrap file handling
// DBUG_RETURN(0);
if (begin_dataset(thd, list->elements))
goto err;
for (uint pos= 0 ; (fld= it++); pos++)
{
if (prot.store_field_metadata_for_list_fields(thd, fld, table_list, pos))
goto err;
list_fields_send_default(thd, this, fld, pos);
}
DBUG_RETURN(prepare_for_send(list->elements));
err:
my_error(ER_OUT_OF_RESOURCES, MYF(0));
DBUG_RETURN(1);
}
/* Store a double precision number, as is. */
void Protocol_local::prepare_for_resend()
{
MYSQL_ROWS *cur;
MYSQL_DATA *data= cur_data;
DBUG_ENTER("send_data");
// if (!thd->mysql) // bootstrap file handling
// DBUG_VOID_RETURN;
data->rows++;
if (!(cur= (MYSQL_ROWS *)alloc_root(alloc, sizeof(MYSQL_ROWS)+(field_count + 1) * sizeof(char *))))
{
my_error(ER_OUT_OF_RESOURCES,MYF(0));
DBUG_VOID_RETURN;
}
cur->data= (MYSQL_ROW)(((char *)cur) + sizeof(MYSQL_ROWS));
*data->embedded_info->prev_ptr= cur;
data->embedded_info->prev_ptr= &cur->next;
next_field=cur->data;
next_mysql_field= data->embedded_info->fields_list;
#ifndef DBUG_OFF
field_pos= 0;
#endif
DBUG_VOID_RETURN;
}
bool Protocol_local::store_double(double value, uint32 decimals)
bool Protocol_local::store_null()
{
return store_column(&value, sizeof (double));
*(next_field++)= NULL;
++next_mysql_field;
return false;
}
/* Store a Field. */
#include <sql_common.h>
#include <errmsg.h>
bool Protocol_local::store(Field *field)
struct local_results
{
if (field->is_null())
return store_null();
return field->send(this);
}
struct st_mysql_data *cur_data;
struct st_mysql_data *first_data;
struct st_mysql_data **data_tail;
void clear_data_list();
struct st_mysql_data *alloc_new_dataset();
char **next_field;
MYSQL_FIELD *next_mysql_field;
MEM_ROOT *alloc;
};
/** Called to start a new result set. */
static void embedded_get_error(MYSQL *mysql, MYSQL_DATA *data)
{
NET *net= &mysql->net;
struct embedded_query_result *ei= data->embedded_info;
net->last_errno= ei->last_errno;
strmake_buf(net->last_error, ei->info);
memcpy(net->sqlstate, ei->sqlstate, sizeof(net->sqlstate));
mysql->server_status= ei->server_status;
my_free(data);
}
bool Protocol_local::send_result_set_metadata(List<Item> *columns, uint)
static my_bool loc_read_query_result(MYSQL *mysql)
{
DBUG_ASSERT(m_rset == 0 && !alloc_root_inited(&m_rset_root));
local_results *thd= (local_results *) mysql->thd;
init_sql_alloc(PSI_INSTRUMENT_ME, &m_rset_root, MEM_ROOT_BLOCK_SIZE, 0,
MYF(MY_THREAD_SPECIFIC));
MYSQL_DATA *res= thd->first_data;
DBUG_ASSERT(!thd->cur_data);
thd->first_data= res->embedded_info->next;
if (res->embedded_info->last_errno &&
!res->embedded_info->fields_list)
{
embedded_get_error(mysql, res);
return 1;
}
if (! (m_rset= new (&m_rset_root) List<Ed_row>))
return TRUE;
mysql->warning_count= res->embedded_info->warning_count;
mysql->server_status= res->embedded_info->server_status;
mysql->field_count= res->fields;
if (!(mysql->fields= res->embedded_info->fields_list))
{
mysql->affected_rows= res->embedded_info->affected_rows;
mysql->insert_id= res->embedded_info->insert_id;
}
net_clear_error(&mysql->net);
mysql->info= 0;
m_column_count= columns->elements;
if (res->embedded_info->info[0])
{
strmake(mysql->info_buffer, res->embedded_info->info, MYSQL_ERRMSG_SIZE-1);
mysql->info= mysql->info_buffer;
}
return FALSE;
if (res->embedded_info->fields_list)
{
mysql->status=MYSQL_STATUS_GET_RESULT;
thd->cur_data= res;
}
else
my_free(res);
return 0;
}
/**
Normally this is a separate result set with OUT parameters
of stored procedures. Currently unsupported for the local
version.
*/
static MYSQL_METHODS local_methods=
{
loc_read_query_result, /* read_query_result */
NULL/*loc_advanced_command*/, /* advanced_command */
NULL/*loc_read_rows*/, /* read_rows */
NULL/*loc_use_result*/, /* use_result */
NULL/*loc_fetch_lengths*/, /* fetch_lengths */
NULL/*loc_flush_use_result*/, /* flush_use_result */
NULL/*loc_read_change_user_result*/ /* read_change_user_result */
};
bool Protocol_local::send_out_parameters(List<Item_param> *sp_params)
extern "C" MYSQL *mysql_real_connect_local(MYSQL *mysql,
const char *host, const char *user, const char *passwd, const char *db)
{
return FALSE;
}
//char name_buff[USERNAME_LENGTH];
DBUG_ENTER("mysql_real_connect_local");
/** Called for statements that don't have a result set, at statement end. */
/* Test whether we're already connected */
if (mysql->server_version)
{
set_mysql_error(mysql, CR_ALREADY_CONNECTED, unknown_sqlstate);
DBUG_RETURN(0);
}
bool
Protocol_local::send_ok(uint server_status, uint statement_warn_count,
ulonglong affected_rows, ulonglong last_insert_id,
const char *message, bool skip_flush)
{
/*
Just make sure nothing is sent to the client, we have grabbed
the status information in the connection diagnostics area.
*/
return FALSE;
}
if (!host || !host[0])
host= mysql->options.host;
mysql->methods= &local_methods;
/**
Called at the end of a result set. Append a complete
result set to the list in Ed_connection.
if (!db || !db[0])
db=mysql->options.db;
Don't send anything to the client, but instead finish
building of the result set at hand.
*/
if (!user || !user[0])
user=mysql->options.user;
bool Protocol_local::send_eof(uint server_status, uint statement_warn_count)
{
Ed_result_set *ed_result_set;
mysql->user= my_strdup(PSI_INSTRUMENT_ME, user, MYF(0));
DBUG_ASSERT(m_rset);
opt_add_row_to_rset();
m_current_row= 0;
mysql->info_buffer= (char *) my_malloc(PSI_INSTRUMENT_ME,
MYSQL_ERRMSG_SIZE, MYF(0));
//mysql->thd= create_embedded_thd(client_flag);
ed_result_set= new (&m_rset_root) Ed_result_set(m_rset, m_column_count,
&m_rset_root);
//init_embedded_mysql(mysql, client_flag);
m_rset= NULL;
//if (mysql_init_character_set(mysql))
// goto error;
if (! ed_result_set)
return TRUE;
//if (check_embedded_connection(mysql, db))
// goto error;
/* In case of successful allocation memory ownership was transferred. */
DBUG_ASSERT(!alloc_root_inited(&m_rset_root));
mysql->server_status= SERVER_STATUS_AUTOCOMMIT;
/*
Link the created Ed_result_set instance into the list of connection
result sets. Never fails.
*/
m_connection->add_result_set(ed_result_set);
return FALSE;
}
//if (mysql->options.init_commands)
//{
// DYNAMIC_ARRAY *init_commands= mysql->options.init_commands;
// char **ptr= (char**)init_commands->buffer;
// char **end= ptr + init_commands->elements;
//
// for (; ptr<end; ptr++)
// {
// MYSQL_RES *res;
// if (mysql_query(mysql,*ptr))
// goto error;
// if (mysql->fields)
// {
// if (!(res= (*mysql->methods->use_result)(mysql)))
// goto error;
// mysql_free_result(res);
// }
// }
//}
DBUG_PRINT("exit",("Mysql handler: %p", mysql));
DBUG_RETURN(mysql);
/** Called to send an error to the client at the end of a statement. */
//error:
DBUG_PRINT("error",("message: %u (%s)",
mysql->net.last_errno,
mysql->net.last_error));
{
/* Free alloced memory */
my_bool free_me=mysql->free_me;
free_old_query(mysql);
mysql->free_me=0;
mysql_close(mysql);
mysql->free_me=free_me;
}
DBUG_RETURN(0);
}
bool
Protocol_local::send_error(uint sql_errno, const char *err_msg, const char*)
extern "C" int execute_sql_command(const char *command,
char *hosts, char *names, char *filters)
{
/*
Just make sure that nothing is sent to the client (default
implementation).
*/
return FALSE;
MYSQL_LEX_STRING sql_text;
THD *thd= current_thd;
THD *new_thd= 0;
int result;
my_bool qc_save= 0;
if (!thd)
{
new_thd= new THD(0);
new_thd->thread_stack= (char*) &sql_text;
new_thd->store_globals();
new_thd->security_ctx->skip_grants();
new_thd->query_cache_is_applicable= 0;
bzero((char*) &new_thd->net, sizeof(new_thd->net));
thd= new_thd;
}
else
{
if (thd->lock)
/* Doesn't work if the thread opened/locked tables already. */
return 2;
qc_save= thd->query_cache_is_applicable;
thd->query_cache_is_applicable= 0;
}
sql_text.str= (char *) command;
sql_text.length= strlen(command);
{
Protocol_local p(thd);
Ed_connection con(thd);
result= con.execute_direct(&p, sql_text);
if (!result && p.first_data)
{
int nr= (int) p.first_data->rows;
MYSQL_ROWS *rows= p.first_data->data;
while (nr--)
{
strcpy(hosts, rows->data[0]);
hosts+= strlen(hosts) + 1;
strcpy(names, rows->data[1]);
names+= strlen(names) + 1;
if (filters)
{
strcpy(filters, rows->data[2]);
filters+= strlen(filters) + 1;
}
rows= rows->next;
}
}
if (p.first_data)
{
if (p.alloc)
free_root(p.alloc, MYF(0));
my_free(p.first_data);
}
}
if (new_thd)
delete new_thd;
else
thd->query_cache_is_applicable= qc_save;
*hosts= 0;
return result;
}
#endif /*!EMBEDDED_LIBRARY*/
#ifdef EMBEDDED_LIBRARY
void Protocol_local::remove_last_row()
{ }
#endif
......@@ -200,7 +200,7 @@ class Ed_connection
@retval TRUE error, use get_last_error()
to see the error number.
*/
bool execute_direct(LEX_STRING sql_text);
bool execute_direct(Protocol *p, LEX_STRING sql_text);
/**
Same as the previous, but takes an instance of Server_runnable
......@@ -213,7 +213,7 @@ class Ed_connection
return a result set
@retval TRUE failure
*/
bool execute_direct(Server_runnable *server_runnable);
bool execute_direct(Protocol *p, Server_runnable *server_runnable);
/**
Get the number of affected (deleted, updated)
......@@ -309,7 +309,6 @@ class Ed_connection
THD *m_thd;
Ed_result_set *m_rsets;
Ed_result_set *m_current_rset;
friend class Protocol_local;
private:
void free_old_result();
void add_result_set(Ed_result_set *ed_result_set);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment