Commit 91ae1258 authored by Oleksandr Byelkin's avatar Oleksandr Byelkin

MDEV-12471: BULK Command

BULK execution moved to a new command.
parent e813fe86
......@@ -12,7 +12,8 @@ enum enum_server_command
COM_UNIMPLEMENTED,
COM_RESET_CONNECTION,
COM_MDB_GAP_BEG,
COM_MDB_GAP_END=250,
COM_MDB_GAP_END=249,
COM_STMT_BULK_EXECUTE=250,
COM_SLAVE_WORKER=251,
COM_SLAVE_IO=252,
COM_SLAVE_SQL=253,
......
......@@ -115,7 +115,8 @@ enum enum_server_command
COM_RESET_CONNECTION,
/* don't forget to update const char *command_name[] in sql_parse.cc */
COM_MDB_GAP_BEG,
COM_MDB_GAP_END=250,
COM_MDB_GAP_END=249,
COM_STMT_BULK_EXECUTE=250,
COM_SLAVE_WORKER=251,
COM_SLAVE_IO=252,
COM_SLAVE_SQL=253,
......@@ -136,6 +137,13 @@ enum enum_indicator_type
STMT_INDICATOR_IGNORE
};
/*
bulk PS flags
*/
#define STMT_BULK_FLAG_CLIENT_SEND_TYPES 128
#define STMT_BULK_FLAG_INSERT_ID_REQUEST 64
/* sql type stored in .frm files for virtual fields */
#define MYSQL_TYPE_VIRTUAL 245
/*
......@@ -311,7 +319,8 @@ enum enum_indicator_type
CLIENT_SESSION_TRACK |\
CLIENT_DEPRECATE_EOF |\
CLIENT_CONNECT_ATTRS |\
MARIADB_CLIENT_COM_MULTI)
MARIADB_CLIENT_COM_MULTI |\
MARIADB_CLIENT_STMT_BULK_OPERATIONS)
/*
To be added later:
......
......@@ -1396,7 +1396,7 @@ performance-schema-max-rwlock-instances -1
performance-schema-max-socket-classes 10
performance-schema-max-socket-instances -1
performance-schema-max-stage-classes 150
performance-schema-max-statement-classes 187
performance-schema-max-statement-classes 188
performance-schema-max-table-handles -1
performance-schema-max-table-instances -1
performance-schema-max-thread-classes 50
......
......@@ -2867,9 +2867,9 @@ READ_ONLY YES
COMMAND_LINE_ARGUMENT REQUIRED
VARIABLE_NAME PERFORMANCE_SCHEMA_MAX_STATEMENT_CLASSES
SESSION_VALUE NULL
GLOBAL_VALUE 187
GLOBAL_VALUE 188
GLOBAL_VALUE_ORIGIN COMPILE-TIME
DEFAULT_VALUE 187
DEFAULT_VALUE 188
VARIABLE_SCOPE GLOBAL
VARIABLE_TYPE BIGINT UNSIGNED
VARIABLE_COMMENT Maximum number of statement instruments.
......
......@@ -3063,9 +3063,9 @@ READ_ONLY YES
COMMAND_LINE_ARGUMENT REQUIRED
VARIABLE_NAME PERFORMANCE_SCHEMA_MAX_STATEMENT_CLASSES
SESSION_VALUE NULL
GLOBAL_VALUE 187
GLOBAL_VALUE 188
GLOBAL_VALUE_ORIGIN COMPILE-TIME
DEFAULT_VALUE 187
DEFAULT_VALUE 188
VARIABLE_SCOPE GLOBAL
VARIABLE_TYPE BIGINT UNSIGNED
VARIABLE_COMMENT Maximum number of statement instruments.
......
......@@ -3347,7 +3347,7 @@ Item_param::Item_param(THD *thd, uint pos_in_query_arg):
state(NO_VALUE),
/* Don't pretend to be a literal unless value for this item is set. */
item_type(PARAM_ITEM),
indicators(0), indicator(STMT_INDICATOR_NONE),
indicator(STMT_INDICATOR_NONE),
set_param_func(default_set_param_func),
m_out_param_info(NULL),
/*
......
......@@ -2873,10 +2873,8 @@ class Item_param :public Item_basic_value,
};
/*
Used for bulk protocol. Indicates if we should expect
indicators byte before value of the parameter
Used for bulk protocol only.
*/
my_bool indicators;
enum enum_indicator_type indicator;
/*
......
......@@ -697,9 +697,9 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
bool using_bulk_insert= 0;
uint value_count;
ulong counter = 1;
ulong iteration= 0;
/* counter of iteration in bulk PS operation*/
ulonglong iteration= 0;
ulonglong id;
ulong bulk_iterations= bulk_parameters_iterations(thd);
COPY_INFO info;
TABLE *table= 0;
List_iterator_fast<List_item> its(values_list);
......@@ -767,7 +767,6 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
DBUG_RETURN(TRUE);
value_count= values->elements;
DBUG_ASSERT(bulk_iterations > 0);
if (mysql_prepare_insert(thd, table_list, table, fields, values,
update_fields, update_values, duplic, &unused_conds,
FALSE))
......@@ -939,6 +938,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
}
do
{
DBUG_PRINT("info", ("iteration %llu", iteration));
if (iteration && bulk_parameters_set(thd))
goto abort;
......@@ -1059,7 +1059,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
}
its.rewind();
iteration++;
} while (iteration < bulk_iterations);
} while (bulk_parameters_iterations(thd));
values_loop_end:
free_underlaid_joins(thd, &thd->lex->select_lex);
......@@ -1206,7 +1206,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
retval= thd->lex->explain->send_explain(thd);
goto abort;
}
if ((bulk_iterations * values_list.elements) == 1 && (!(thd->variables.option_bits & OPTION_WARNINGS) ||
if ((iteration * values_list.elements) == 1 && (!(thd->variables.option_bits & OPTION_WARNINGS) ||
!thd->cuted_fields))
{
my_ok(thd, info.copied + info.deleted +
......
......@@ -391,7 +391,7 @@ const LEX_STRING command_name[257]={
{ 0, 0 }, //247
{ 0, 0 }, //248
{ 0, 0 }, //249
{ 0, 0 }, //250
{ C_STRING_WITH_LEN("Bulk_execute") }, //250
{ C_STRING_WITH_LEN("Slave_worker") }, //251
{ C_STRING_WITH_LEN("Slave_IO") }, //252
{ C_STRING_WITH_LEN("Slave_SQL") }, //253
......@@ -1749,6 +1749,11 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
}
break;
}
case COM_STMT_BULK_EXECUTE:
{
mysqld_stmt_bulk_execute(thd, packet, packet_length);
break;
}
case COM_STMT_EXECUTE:
{
mysqld_stmt_execute(thd, packet, packet_length);
......
......@@ -164,7 +164,6 @@ class Prepared_statement: public Statement
Server_side_cursor *cursor;
uchar *packet;
uchar *packet_end;
ulong iterations;
uint param_count;
uint last_errno;
uint flags;
......@@ -183,7 +182,9 @@ class Prepared_statement: public Statement
*/
uint select_number_after_prepare;
char last_error[MYSQL_ERRMSG_SIZE];
my_bool iterations;
my_bool start_param;
my_bool read_types;
#ifndef EMBEDDED_LIBRARY
bool (*set_params)(Prepared_statement *st, uchar *data, uchar *data_end,
uchar *read_pos, String *expanded_query);
......@@ -213,11 +214,10 @@ class Prepared_statement: public Statement
uchar *packet_arg, uchar *packet_end_arg);
bool execute_bulk_loop(String *expanded_query,
bool open_cursor,
uchar *packet_arg, uchar *packet_end_arg,
ulong iterations);
uchar *packet_arg, uchar *packet_end_arg);
bool execute_server_runnable(Server_runnable *server_runnable);
my_bool set_bulk_parameters(bool reset);
ulong bulk_iterations();
bool bulk_iterations() { return iterations; };
/* Destroy this statement */
void deallocate();
bool execute_immediate(const char *query, uint query_length);
......@@ -935,6 +935,7 @@ static bool insert_params(Prepared_statement *stmt, uchar *null_array,
for (Item_param **it= begin; it < end; ++it)
{
Item_param *param= *it;
param->indicator= STMT_INDICATOR_NONE; // only for bulk parameters
if (!param->has_long_data_value())
{
if (is_param_null(null_array, (uint) (it - begin)))
......@@ -979,10 +980,7 @@ static bool insert_bulk_params(Prepared_statement *stmt,
param->reset();
if (!param->has_long_data_value())
{
if (param->indicators)
param->indicator= (enum_indicator_type) *((*read_pos)++);
else
param->indicator= STMT_INDICATOR_NONE;
param->indicator= (enum_indicator_type) *((*read_pos)++);
if ((*read_pos) > data_end)
DBUG_RETURN(1);
switch (param->indicator)
......@@ -993,6 +991,8 @@ static bool insert_bulk_params(Prepared_statement *stmt,
param->set_param_func(param, read_pos, (uint) (data_end - (*read_pos)));
if (param->has_no_value())
DBUG_RETURN(1);
if (param->convert_str_value(stmt->thd))
DBUG_RETURN(1); /* out of memory */
break;
case STMT_INDICATOR_NULL:
param->set_null();
......@@ -1011,6 +1011,36 @@ static bool insert_bulk_params(Prepared_statement *stmt,
DBUG_RETURN(0);
}
static bool set_conversion_functions(Prepared_statement *stmt,
uchar **data, uchar *data_end)
{
uchar *read_pos= *data;
const uint signed_bit= 1 << 15;
DBUG_ENTER("set_conversion_functions");
/*
First execute or types altered by the client, setup the
conversion routines for all parameters (one time)
*/
Item_param **it= stmt->param_array;
Item_param **end= it + stmt->param_count;
THD *thd= stmt->thd;
for (; it < end; ++it)
{
ushort typecode;
if (read_pos >= data_end)
DBUG_RETURN(1);
typecode= sint2korr(read_pos);
read_pos+= 2;
(**it).unsigned_flag= MY_TEST(typecode & signed_bit);
setup_one_conversion_function(thd, *it, (uchar) (typecode & 0xff));
}
*data= read_pos;
DBUG_RETURN(0);
}
static bool setup_conversion_functions(Prepared_statement *stmt,
uchar **data, uchar *data_end,
bool bulk_protocol= 0)
......@@ -1024,30 +1054,9 @@ static bool setup_conversion_functions(Prepared_statement *stmt,
if (*read_pos++) //types supplied / first execute
{
/*
First execute or types altered by the client, setup the
conversion routines for all parameters (one time)
*/
Item_param **it= stmt->param_array;
Item_param **end= it + stmt->param_count;
THD *thd= stmt->thd;
for (; it < end; ++it)
{
ushort typecode;
const uint signed_bit= 1 << 15;
const uint indicators_bit= 1 << 14;
if (read_pos >= data_end)
DBUG_RETURN(1);
typecode= sint2korr(read_pos);
read_pos+= 2;
(**it).unsigned_flag= MY_TEST(typecode & signed_bit);
if (bulk_protocol)
(**it).indicators= MY_TEST(typecode & indicators_bit);
setup_one_conversion_function(thd, *it,
(uchar) (typecode & 0xff));
}
*data= read_pos;
bool res= set_conversion_functions(stmt, data, data_end);
DBUG_RETURN(res);
}
*data= read_pos;
DBUG_RETURN(0);
......@@ -3032,6 +3041,14 @@ static void reset_stmt_params(Prepared_statement *stmt)
}
static void mysql_stmt_execute_common(THD *thd,
ulong stmt_id,
uchar *packet,
uchar *packet_end,
ulong cursor_flags,
bool iteration,
bool types);
/**
COM_STMT_EXECUTE handler: execute a previously prepared statement.
......@@ -3054,20 +3071,91 @@ void mysqld_stmt_execute(THD *thd, char *packet_arg, uint packet_length)
uchar *packet= (uchar*)packet_arg; // GCC 4.0.1 workaround
ulong stmt_id= uint4korr(packet);
ulong flags= (ulong) packet[4];
#ifndef EMBEDDED_LIBRARY
ulong iterations= uint4korr(packet + 5);
#else
ulong iterations= 0; // no support
#endif
uchar *packet_end= packet + packet_length;
DBUG_ENTER("mysqld_stmt_execute");
packet+= 9; /* stmt_id + 5 bytes of flags */
mysql_stmt_execute_common(thd, stmt_id, packet, packet_end, flags, FALSE,
FALSE);
DBUG_VOID_RETURN;
}
/**
COM_STMT_BULK_EXECUTE handler: execute a previously prepared statement.
If there are any parameters, then replace parameter markers with the
data supplied from the client, and then execute the statement.
This function uses binary protocol to send a possible result set
to the client.
@param thd current thread
@param packet_arg parameter types and data, if any
@param packet_length packet length, including the terminator character.
@return
none: in case of success OK packet or a result set is sent to the
client, otherwise an error message is set in THD.
*/
void mysqld_stmt_bulk_execute(THD *thd, char *packet_arg, uint packet_length)
{
uchar *packet= (uchar*)packet_arg; // GCC 4.0.1 workaround
ulong stmt_id= uint4korr(packet);
uint flags= (uint) uint2korr(packet + 4);
uchar *packet_end= packet + packet_length;
DBUG_ENTER("mysqld_stmt_execute_bulk");
if (!(thd->client_capabilities &
MARIADB_CLIENT_STMT_BULK_OPERATIONS))
{
DBUG_PRINT("error",
("An attempt to execute bulk operation without support"));
my_error(ER_UNSUPPORTED_PS, MYF(0));
}
/* Check for implemented parameters */
if (flags & (~STMT_BULK_FLAG_CLIENT_SEND_TYPES))
{
DBUG_PRINT("error", ("unsupported bulk execute flags %x", flags));
my_error(ER_UNSUPPORTED_PS, MYF(0));
}
/* stmt id and two bytes of flags */
packet+= 4 + 2;
mysql_stmt_execute_common(thd, stmt_id, packet, packet_end, 0, TRUE,
(flags & STMT_BULK_FLAG_CLIENT_SEND_TYPES));
DBUG_VOID_RETURN;
}
/**
Common part of prepared statement execution
@param thd THD handle
@param stmt_id id of the prepared statement
@param paket packet with parameters to bind
@param packet_end pointer to the byte after parameters end
@param cursor_flags cursor flags
@param bulk_op id it bulk operation
@param read_types flag say that types muast been read
*/
static void mysql_stmt_execute_common(THD *thd,
ulong stmt_id,
uchar *packet,
uchar *packet_end,
ulong cursor_flags,
bool bulk_op,
bool read_types)
{
/* Query text for binary, general or slow log, if any of them is open */
String expanded_query;
uchar *packet_end= packet + packet_length;
Prepared_statement *stmt;
Protocol *save_protocol= thd->protocol;
bool open_cursor;
DBUG_ENTER("mysqld_stmt_execute");
packet+= 9; /* stmt_id + 5 bytes of flags */
DBUG_ENTER("mysqld_stmt_execute_common");
DBUG_ASSERT((!read_types) || (read_types && bulk_op));
/* First of all clear possible warnings from the previous command */
thd->reset_for_next_command();
......@@ -3079,21 +3167,21 @@ void mysqld_stmt_execute(THD *thd, char *packet_arg, uint packet_length)
llstr(stmt_id, llbuf), "mysqld_stmt_execute");
DBUG_VOID_RETURN;
}
stmt->read_types= read_types;
#if defined(ENABLED_PROFILING)
thd->profiling.set_query_source(stmt->query(), stmt->query_length());
#endif
DBUG_PRINT("exec_query", ("%s", stmt->query()));
DBUG_PRINT("info",("stmt: %p iterations: %lu", stmt, iterations));
DBUG_PRINT("info",("stmt: %p bulk_op %d", stmt, bulk_op));
open_cursor= MY_TEST(flags & (ulong) CURSOR_TYPE_READ_ONLY);
open_cursor= MY_TEST(cursor_flags & (ulong) CURSOR_TYPE_READ_ONLY);
thd->protocol= &thd->protocol_binary;
if (iterations <= 1)
if (!bulk_op)
stmt->execute_loop(&expanded_query, open_cursor, packet, packet_end);
else
stmt->execute_bulk_loop(&expanded_query, open_cursor, packet, packet_end,
iterations);
stmt->execute_bulk_loop(&expanded_query, open_cursor, packet, packet_end);
thd->protocol= save_protocol;
sp_cache_enforce_limit(thd->sp_proc_cache, stored_program_cache_size);
......@@ -3600,11 +3688,12 @@ Prepared_statement::Prepared_statement(THD *thd_arg)
cursor(0),
packet(0),
packet_end(0),
iterations(0),
param_count(0),
last_errno(0),
flags((uint) IS_IN_USE),
iterations(0),
start_param(0),
read_types(0),
m_sql_mode(thd->variables.sql_mode)
{
init_sql_alloc(&main_mem_root, thd_arg->variables.query_alloc_block_size,
......@@ -3641,7 +3730,7 @@ void Prepared_statement::setup_set_params()
set_params_from_actual_params= insert_params_from_actual_params_with_log;
#ifndef EMBEDDED_LIBRARY
set_params= insert_params_with_log;
set_bulk_params= insert_bulk_params; // TODO: add binlog support
set_bulk_params= insert_bulk_params; // RBR is on for bulk operation
#else
//TODO: add bulk support for bulk parameters
set_params_data= emb_insert_params_with_log;
......@@ -4023,7 +4112,7 @@ Prepared_statement::execute_loop(String *expanded_query,
Reprepare_observer reprepare_observer;
bool error;
int reprepare_attempt= 0;
iterations= 0;
iterations= FALSE;
/*
- In mysql_sql_stmt_execute() we hide all "external" Items
......@@ -4126,11 +4215,11 @@ my_bool bulk_parameters_set(THD *thd)
DBUG_RETURN(FALSE);
}
ulong bulk_parameters_iterations(THD *thd)
my_bool bulk_parameters_iterations(THD *thd)
{
Prepared_statement *stmt= (Prepared_statement *) thd->bulk_param;
if (!stmt)
return 1;
return FALSE;
return stmt->bulk_iterations();
}
......@@ -4138,7 +4227,8 @@ ulong bulk_parameters_iterations(THD *thd)
my_bool Prepared_statement::set_bulk_parameters(bool reset)
{
DBUG_ENTER("Prepared_statement::set_bulk_parameters");
DBUG_PRINT("info", ("iteration: %lu", iterations));
DBUG_PRINT("info", ("iteration: %d", iterations));
if (iterations)
{
#ifndef EMBEDDED_LIBRARY
......@@ -4152,31 +4242,24 @@ my_bool Prepared_statement::set_bulk_parameters(bool reset)
reset_stmt_params(this);
DBUG_RETURN(true);
}
iterations--;
if (packet >= packet_end)
iterations= FALSE;
}
start_param= 0;
DBUG_RETURN(false);
}
ulong Prepared_statement::bulk_iterations()
{
if (iterations)
return iterations;
return start_param ? 1 : 0;
}
bool
Prepared_statement::execute_bulk_loop(String *expanded_query,
bool open_cursor,
uchar *packet_arg,
uchar *packet_end_arg,
ulong iterations_arg)
uchar *packet_end_arg)
{
Reprepare_observer reprepare_observer;
bool error= 0;
packet= packet_arg;
packet_end= packet_end_arg;
iterations= iterations_arg;
iterations= TRUE;
start_param= true;
#ifndef DBUG_OFF
Item *free_list_state= thd->free_list;
......@@ -4190,16 +4273,26 @@ Prepared_statement::execute_bulk_loop(String *expanded_query,
thd->set_bulk_execution(0);
return TRUE;
}
/* Check for non zero parameter count*/
if (param_count == 0)
{
DBUG_PRINT("error", ("Statement with no parameters for bulk execution."));
my_error(ER_UNSUPPORTED_PS, MYF(0));
thd->set_bulk_execution(0);
return TRUE;
}
if (!(sql_command_flags[lex->sql_command] & CF_SP_BULK_SAFE))
{
DBUG_PRINT("error", ("Command is not supported in bulk execution."));
my_error(ER_UNSUPPORTED_PS, MYF(0));
thd->set_bulk_execution(0);
return TRUE;
}
#ifndef EMBEDDED_LIBRARY
if (setup_conversion_functions(this, &packet, packet_end, TRUE))
if (read_types &&
set_conversion_functions(this, &packet, packet_end))
#else
// bulk parameters are not supported for embedded, so it will an error
#endif
......@@ -4210,6 +4303,7 @@ Prepared_statement::execute_bulk_loop(String *expanded_query,
thd->set_bulk_execution(0);
return true;
}
read_types= FALSE;
#ifdef NOT_YET_FROM_MYSQL_5_6
if (unlikely(thd->security_ctx->password_expired &&
......
......@@ -72,6 +72,7 @@ class Reprepare_observer
void mysqld_stmt_prepare(THD *thd, const char *packet, uint packet_length);
void mysqld_stmt_execute(THD *thd, char *packet, uint packet_length);
void mysqld_stmt_execute_bulk(THD *thd, char *packet, uint packet_length);
void mysqld_stmt_bulk_execute(THD *thd, char *packet, uint packet_length);
void mysqld_stmt_close(THD *thd, char *packet);
void mysql_sql_stmt_prepare(THD *thd);
......@@ -83,7 +84,7 @@ void mysqld_stmt_reset(THD *thd, char *packet);
void mysql_stmt_get_longdata(THD *thd, char *pos, ulong packet_length);
void reinit_stmt_before_use(THD *thd, LEX *lex);
ulong bulk_parameters_iterations(THD *thd);
my_bool bulk_parameters_iterations(THD *thd);
my_bool bulk_parameters_set(THD *thd);
/**
Execute a fragment of server code in an isolated context, so that
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment