Commit 5a19908b authored by Alexander Barkov's avatar Alexander Barkov

MDEV-19653 Add class Sql_cmd_create_table

parent dd939d6f
...@@ -211,6 +211,40 @@ plugin_ref ha_resolve_by_name(THD *thd, const LEX_STRING *name, bool tmp_table) ...@@ -211,6 +211,40 @@ plugin_ref ha_resolve_by_name(THD *thd, const LEX_STRING *name, bool tmp_table)
} }
bool
Storage_engine_name::resolve_storage_engine_with_error(THD *thd,
handlerton **ha,
bool tmp_table)
{
#if MYSQL_VERSION_ID < 100300
/*
Please remove tmp_name when merging to 10.3 and pass m_storage_engine_name
directly to ha_resolve_by_name().
*/
LEX_STRING tmp_name;
tmp_name.str= const_cast<char*>(m_storage_engine_name.str);
tmp_name.length= m_storage_engine_name.length;
#endif
if (plugin_ref plugin= ha_resolve_by_name(thd, &tmp_name, tmp_table))
{
*ha= plugin_hton(plugin);
return false;
}
*ha= NULL;
if (thd->variables.sql_mode & MODE_NO_ENGINE_SUBSTITUTION)
{
my_error(ER_UNKNOWN_STORAGE_ENGINE, MYF(0), m_storage_engine_name.str);
return true;
}
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_UNKNOWN_STORAGE_ENGINE,
ER_THD(thd, ER_UNKNOWN_STORAGE_ENGINE),
m_storage_engine_name.str);
return false;
}
plugin_ref ha_lock_engine(THD *thd, const handlerton *hton) plugin_ref ha_lock_engine(THD *thd, const handlerton *hton)
{ {
if (hton) if (hton)
......
...@@ -193,6 +193,18 @@ bool Sql_cmd_alter_table::execute(THD *thd) ...@@ -193,6 +193,18 @@ bool Sql_cmd_alter_table::execute(THD *thd)
SELECT_LEX *select_lex= &lex->select_lex; SELECT_LEX *select_lex= &lex->select_lex;
/* first table of first SELECT_LEX */ /* first table of first SELECT_LEX */
TABLE_LIST *first_table= (TABLE_LIST*) select_lex->table_list.first; TABLE_LIST *first_table= (TABLE_LIST*) select_lex->table_list.first;
const bool used_engine= lex->create_info.used_fields & HA_CREATE_USED_ENGINE;
DBUG_ASSERT((m_storage_engine_name.str != NULL) == used_engine);
if (used_engine)
{
if (resolve_storage_engine_with_error(thd, &lex->create_info.db_type,
lex->create_info.tmp_table()))
return true; // Engine not found, substitution is not allowed
if (!lex->create_info.db_type) // Not found, but substitution is allowed
lex->create_info.used_fields&= ~HA_CREATE_USED_ENGINE;
}
/* /*
Code in mysql_alter_table() may modify its HA_CREATE_INFO argument, Code in mysql_alter_table() may modify its HA_CREATE_INFO argument,
so we have to use a copy of this structure to make execution so we have to use a copy of this structure to make execution
......
...@@ -385,7 +385,8 @@ class Sql_cmd_common_alter_table : public Sql_cmd ...@@ -385,7 +385,8 @@ class Sql_cmd_common_alter_table : public Sql_cmd
Sql_cmd_alter_table represents the generic ALTER TABLE statement. Sql_cmd_alter_table represents the generic ALTER TABLE statement.
@todo move Alter_info and other ALTER specific structures from Lex here. @todo move Alter_info and other ALTER specific structures from Lex here.
*/ */
class Sql_cmd_alter_table : public Sql_cmd_common_alter_table class Sql_cmd_alter_table : public Sql_cmd_common_alter_table,
public Storage_engine_name
{ {
public: public:
/** /**
...@@ -397,6 +398,8 @@ class Sql_cmd_alter_table : public Sql_cmd_common_alter_table ...@@ -397,6 +398,8 @@ class Sql_cmd_alter_table : public Sql_cmd_common_alter_table
~Sql_cmd_alter_table() ~Sql_cmd_alter_table()
{} {}
Storage_engine_name *option_storage_engine_name() { return this; }
bool execute(THD *thd); bool execute(THD *thd);
}; };
......
...@@ -102,6 +102,31 @@ enum enum_sql_command { ...@@ -102,6 +102,31 @@ enum enum_sql_command {
SQLCOM_END SQLCOM_END
}; };
class Storage_engine_name
{
protected:
LEX_CSTRING m_storage_engine_name;
public:
Storage_engine_name()
{
m_storage_engine_name.str= NULL;
m_storage_engine_name.length= 0;
}
Storage_engine_name(const LEX_CSTRING &name)
:m_storage_engine_name(name)
{ }
Storage_engine_name(const LEX_STRING &name)
{
m_storage_engine_name.str= name.str;
m_storage_engine_name.length= name.length;
}
bool resolve_storage_engine_with_error(THD *thd,
handlerton **ha,
bool tmp_table);
};
/** /**
@class Sql_cmd - Representation of an SQL command. @class Sql_cmd - Representation of an SQL command.
...@@ -145,6 +170,11 @@ class Sql_cmd : public Sql_alloc ...@@ -145,6 +170,11 @@ class Sql_cmd : public Sql_alloc
*/ */
virtual bool execute(THD *thd) = 0; virtual bool execute(THD *thd) = 0;
virtual Storage_engine_name *option_storage_engine_name()
{
return NULL;
}
protected: protected:
Sql_cmd() Sql_cmd()
{} {}
...@@ -161,4 +191,15 @@ class Sql_cmd : public Sql_alloc ...@@ -161,4 +191,15 @@ class Sql_cmd : public Sql_alloc
} }
}; };
class Sql_cmd_create_table: public Sql_cmd,
public Storage_engine_name
{
public:
enum_sql_command sql_command_code() const { return SQLCOM_CREATE_TABLE; }
Storage_engine_name *option_storage_engine_name() { return this; }
bool execute(THD *thd);
};
#endif // SQL_CMD_INCLUDED #endif // SQL_CMD_INCLUDED
...@@ -3280,274 +3280,6 @@ mysql_execute_command(THD *thd) ...@@ -3280,274 +3280,6 @@ mysql_execute_command(THD *thd)
res = ha_show_status(thd, lex->create_info.db_type, HA_ENGINE_MUTEX); res = ha_show_status(thd, lex->create_info.db_type, HA_ENGINE_MUTEX);
break; break;
} }
case SQLCOM_CREATE_TABLE:
{
DBUG_ASSERT(first_table == all_tables && first_table != 0);
bool link_to_local;
TABLE_LIST *create_table= first_table;
TABLE_LIST *select_tables= lex->create_last_non_select_table->next_global;
if (lex->tmp_table())
{
status_var_decrement(thd->status_var.com_stat[SQLCOM_CREATE_TABLE]);
status_var_increment(thd->status_var.com_create_tmp_table);
}
/*
Code below (especially in mysql_create_table() and select_create
methods) may modify HA_CREATE_INFO structure in LEX, so we have to
use a copy of this structure to make execution prepared statement-
safe. A shallow copy is enough as this code won't modify any memory
referenced from this structure.
*/
Table_specification_st create_info(lex->create_info);
/*
We need to copy alter_info for the same reasons of re-execution
safety, only in case of Alter_info we have to do (almost) a deep
copy.
*/
Alter_info alter_info(lex->alter_info, thd->mem_root);
if (thd->is_fatal_error)
{
/* If out of memory when creating a copy of alter_info. */
res= 1;
goto end_with_restore_list;
}
/* Check privileges */
if ((res= create_table_precheck(thd, select_tables, create_table)))
goto end_with_restore_list;
/* Might have been updated in create_table_precheck */
create_info.alias= create_table->alias;
/* Fix names if symlinked or relocated tables */
if (append_file_to_dir(thd, &create_info.data_file_name,
create_table->table_name) ||
append_file_to_dir(thd, &create_info.index_file_name,
create_table->table_name))
goto end_with_restore_list;
/*
If no engine type was given, work out the default now
rather than at parse-time.
*/
if (!(create_info.used_fields & HA_CREATE_USED_ENGINE))
create_info.use_default_db_type(thd);
/*
If we are using SET CHARSET without DEFAULT, add an implicit
DEFAULT to not confuse old users. (This may change).
*/
if ((create_info.used_fields &
(HA_CREATE_USED_DEFAULT_CHARSET | HA_CREATE_USED_CHARSET)) ==
HA_CREATE_USED_CHARSET)
{
create_info.used_fields&= ~HA_CREATE_USED_CHARSET;
create_info.used_fields|= HA_CREATE_USED_DEFAULT_CHARSET;
create_info.default_table_charset= create_info.table_charset;
create_info.table_charset= 0;
}
/*
If we are a slave, we should add OR REPLACE if we don't have
IF EXISTS. This will help a slave to recover from
CREATE TABLE OR EXISTS failures by dropping the table and
retrying the create.
*/
if (thd->slave_thread &&
slave_ddl_exec_mode_options == SLAVE_EXEC_MODE_IDEMPOTENT &&
!lex->create_info.if_not_exists())
{
create_info.add(DDL_options_st::OPT_OR_REPLACE);
create_info.add(DDL_options_st::OPT_OR_REPLACE_SLAVE_GENERATED);
}
#ifdef WITH_PARTITION_STORAGE_ENGINE
{
partition_info *part_info= thd->lex->part_info;
if (part_info && !(part_info= part_info->get_clone(thd)))
{
res= -1;
goto end_with_restore_list;
}
thd->work_part_info= part_info;
}
#endif
if (select_lex->item_list.elements) // With select
{
select_result *result;
/*
CREATE TABLE...IGNORE/REPLACE SELECT... can be unsafe, unless
ORDER BY PRIMARY KEY clause is used in SELECT statement. We therefore
use row based logging if mixed or row based logging is available.
TODO: Check if the order of the output of the select statement is
deterministic. Waiting for BUG#42415
*/
if(lex->ignore)
lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_CREATE_IGNORE_SELECT);
if(lex->duplicates == DUP_REPLACE)
lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_CREATE_REPLACE_SELECT);
/*
If:
a) we inside an SP and there was NAME_CONST substitution,
b) binlogging is on (STMT mode),
c) we log the SP as separate statements
raise a warning, as it may cause problems
(see 'NAME_CONST issues' in 'Binary Logging of Stored Programs')
*/
if (thd->query_name_consts && mysql_bin_log.is_open() &&
thd->wsrep_binlog_format() == BINLOG_FORMAT_STMT &&
!mysql_bin_log.is_query_in_union(thd, thd->query_id))
{
List_iterator_fast<Item> it(select_lex->item_list);
Item *item;
uint splocal_refs= 0;
/* Count SP local vars in the top-level SELECT list */
while ((item= it++))
{
if (item->get_item_splocal())
splocal_refs++;
}
/*
If it differs from number of NAME_CONST substitution applied,
we may have a SOME_FUNC(NAME_CONST()) in the SELECT list,
that may cause a problem with binary log (see BUG#35383),
raise a warning.
*/
if (splocal_refs != thd->query_name_consts)
push_warning(thd,
Sql_condition::WARN_LEVEL_WARN,
ER_UNKNOWN_ERROR,
"Invoked routine ran a statement that may cause problems with "
"binary log, see 'NAME_CONST issues' in 'Binary Logging of Stored Programs' "
"section of the manual.");
}
select_lex->options|= SELECT_NO_UNLOCK;
unit->set_limit(select_lex);
/*
Disable non-empty MERGE tables with CREATE...SELECT. Too
complicated. See Bug #26379. Empty MERGE tables are read-only
and don't allow CREATE...SELECT anyway.
*/
if (create_info.used_fields & HA_CREATE_USED_UNION)
{
my_error(ER_WRONG_OBJECT, MYF(0), create_table->db,
create_table->table_name, "BASE TABLE");
res= 1;
goto end_with_restore_list;
}
/* Copy temporarily the statement flags to thd for lock_table_names() */
uint save_thd_create_info_options= thd->lex->create_info.options;
thd->lex->create_info.options|= create_info.options;
res= open_and_lock_tables(thd, create_info, lex->query_tables, TRUE, 0);
thd->lex->create_info.options= save_thd_create_info_options;
if (res)
{
/* Got error or warning. Set res to 1 if error */
if (!(res= thd->is_error()))
my_ok(thd); // CREATE ... IF NOT EXISTS
goto end_with_restore_list;
}
/* Ensure we don't try to create something from which we select from */
if (create_info.or_replace() && !create_info.tmp_table())
{
TABLE_LIST *duplicate;
if ((duplicate= unique_table(thd, lex->query_tables,
lex->query_tables->next_global,
CHECK_DUP_FOR_CREATE)))
{
update_non_unique_table_error(lex->query_tables, "CREATE",
duplicate);
res= TRUE;
goto end_with_restore_list;
}
}
{
/*
Remove target table from main select and name resolution
context. This can't be done earlier as it will break view merging in
statements like "CREATE TABLE IF NOT EXISTS existing_view SELECT".
*/
lex->unlink_first_table(&link_to_local);
/* Store reference to table in case of LOCK TABLES */
create_info.table= create_table->table;
/*
select_create is currently not re-execution friendly and
needs to be created for every execution of a PS/SP.
Note: In wsrep-patch, CTAS is handled like a regular transaction.
*/
if ((result= new (thd->mem_root) select_create(thd, create_table,
&create_info,
&alter_info,
select_lex->item_list,
lex->duplicates,
lex->ignore,
select_tables)))
{
/*
CREATE from SELECT give its SELECT_LEX for SELECT,
and item_list belong to SELECT
*/
if (!(res= handle_select(thd, lex, result, 0)))
{
if (create_info.tmp_table())
thd->variables.option_bits|= OPTION_KEEP_LOG;
}
delete result;
}
lex->link_first_table_back(create_table, link_to_local);
}
}
else
{
/* regular create */
if (create_info.like())
{
/* CREATE TABLE ... LIKE ... */
res= mysql_create_like_table(thd, create_table, select_tables,
&create_info);
}
else
{
/*
In STATEMENT format, we probably have to replicate also temporary
tables, like mysql replication does. Also check if the requested
engine is allowed/supported.
*/
if (WSREP(thd) &&
!check_engine(thd, create_table->db, create_table->table_name,
&create_info) &&
(!thd->is_current_stmt_binlog_format_row() ||
!create_info.tmp_table()))
{
WSREP_TO_ISOLATION_BEGIN(create_table->db, create_table->table_name, NULL)
}
/* Regular CREATE TABLE */
res= mysql_create_table(thd, create_table, &create_info, &alter_info);
}
if (!res)
{
/* So that CREATE TEMPORARY TABLE gets to binlog at commit/rollback */
if (create_info.tmp_table())
thd->variables.option_bits|= OPTION_KEEP_LOG;
my_ok(thd);
}
}
end_with_restore_list:
break;
}
case SQLCOM_CREATE_INDEX: case SQLCOM_CREATE_INDEX:
case SQLCOM_DROP_INDEX: case SQLCOM_DROP_INDEX:
/* /*
...@@ -5699,6 +5431,7 @@ mysql_execute_command(THD *thd) ...@@ -5699,6 +5431,7 @@ mysql_execute_command(THD *thd)
case SQLCOM_OPTIMIZE: case SQLCOM_OPTIMIZE:
case SQLCOM_REPAIR: case SQLCOM_REPAIR:
case SQLCOM_TRUNCATE: case SQLCOM_TRUNCATE:
case SQLCOM_CREATE_TABLE:
case SQLCOM_ALTER_TABLE: case SQLCOM_ALTER_TABLE:
DBUG_ASSERT(first_table == all_tables && first_table != 0); DBUG_ASSERT(first_table == all_tables && first_table != 0);
/* fall through */ /* fall through */
......
...@@ -10116,3 +10116,303 @@ bool check_engine(THD *thd, const char *db_name, ...@@ -10116,3 +10116,303 @@ bool check_engine(THD *thd, const char *db_name,
DBUG_RETURN(false); DBUG_RETURN(false);
} }
bool Sql_cmd_create_table::execute(THD *thd)
{
DBUG_ENTER("Sql_cmd_create_table::execute");
LEX *lex= thd->lex;
TABLE_LIST *all_tables= lex->query_tables;
SELECT_LEX *select_lex= &lex->select_lex;
TABLE_LIST *first_table= select_lex->table_list.first;
DBUG_ASSERT(first_table == all_tables && first_table != 0);
bool link_to_local;
TABLE_LIST *create_table= first_table;
TABLE_LIST *select_tables= lex->create_last_non_select_table->next_global;
/* most outer SELECT_LEX_UNIT of query */
SELECT_LEX_UNIT *unit= &lex->unit;
int res= 0;
const bool used_engine= lex->create_info.used_fields & HA_CREATE_USED_ENGINE;
DBUG_ASSERT((m_storage_engine_name.str != NULL) == used_engine);
if (used_engine)
{
if (resolve_storage_engine_with_error(thd, &lex->create_info.db_type,
lex->create_info.tmp_table()))
DBUG_RETURN(true); // Engine not found, substitution is not allowed
if (!lex->create_info.db_type) // Not found, but substitution is allowed
{
lex->create_info.use_default_db_type(thd);
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_WARN_USING_OTHER_HANDLER,
ER_THD(thd, ER_WARN_USING_OTHER_HANDLER),
hton_name(lex->create_info.db_type)->str,
create_table->table_name);
}
}
if (lex->tmp_table())
{
status_var_decrement(thd->status_var.com_stat[SQLCOM_CREATE_TABLE]);
status_var_increment(thd->status_var.com_create_tmp_table);
}
/*
Code below (especially in mysql_create_table() and select_create
methods) may modify HA_CREATE_INFO structure in LEX, so we have to
use a copy of this structure to make execution prepared statement-
safe. A shallow copy is enough as this code won't modify any memory
referenced from this structure.
*/
Table_specification_st create_info(lex->create_info);
/*
We need to copy alter_info for the same reasons of re-execution
safety, only in case of Alter_info we have to do (almost) a deep
copy.
*/
Alter_info alter_info(lex->alter_info, thd->mem_root);
if (thd->is_fatal_error)
{
/* If out of memory when creating a copy of alter_info. */
res= 1;
goto end_with_restore_list;
}
/* Check privileges */
if ((res= create_table_precheck(thd, select_tables, create_table)))
goto end_with_restore_list;
/* Might have been updated in create_table_precheck */
create_info.alias= create_table->alias;
/* Fix names if symlinked or relocated tables */
if (append_file_to_dir(thd, &create_info.data_file_name,
create_table->table_name) ||
append_file_to_dir(thd, &create_info.index_file_name,
create_table->table_name))
goto end_with_restore_list;
/*
If no engine type was given, work out the default now
rather than at parse-time.
*/
if (!(create_info.used_fields & HA_CREATE_USED_ENGINE))
create_info.use_default_db_type(thd);
/*
If we are using SET CHARSET without DEFAULT, add an implicit
DEFAULT to not confuse old users. (This may change).
*/
if ((create_info.used_fields &
(HA_CREATE_USED_DEFAULT_CHARSET | HA_CREATE_USED_CHARSET)) ==
HA_CREATE_USED_CHARSET)
{
create_info.used_fields&= ~HA_CREATE_USED_CHARSET;
create_info.used_fields|= HA_CREATE_USED_DEFAULT_CHARSET;
create_info.default_table_charset= create_info.table_charset;
create_info.table_charset= 0;
}
/*
If we are a slave, we should add OR REPLACE if we don't have
IF EXISTS. This will help a slave to recover from
CREATE TABLE OR EXISTS failures by dropping the table and
retrying the create.
*/
if (thd->slave_thread &&
slave_ddl_exec_mode_options == SLAVE_EXEC_MODE_IDEMPOTENT &&
!lex->create_info.if_not_exists())
{
create_info.add(DDL_options_st::OPT_OR_REPLACE);
create_info.add(DDL_options_st::OPT_OR_REPLACE_SLAVE_GENERATED);
}
#ifdef WITH_PARTITION_STORAGE_ENGINE
{
partition_info *part_info= thd->lex->part_info;
if (part_info && !(part_info= part_info->get_clone(thd)))
{
res= -1;
goto end_with_restore_list;
}
thd->work_part_info= part_info;
}
#endif
if (select_lex->item_list.elements) // With select
{
select_result *result;
/*
CREATE TABLE...IGNORE/REPLACE SELECT... can be unsafe, unless
ORDER BY PRIMARY KEY clause is used in SELECT statement. We therefore
use row based logging if mixed or row based logging is available.
TODO: Check if the order of the output of the select statement is
deterministic. Waiting for BUG#42415
*/
if(lex->ignore)
lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_CREATE_IGNORE_SELECT);
if(lex->duplicates == DUP_REPLACE)
lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_CREATE_REPLACE_SELECT);
/*
If:
a) we inside an SP and there was NAME_CONST substitution,
b) binlogging is on (STMT mode),
c) we log the SP as separate statements
raise a warning, as it may cause problems
(see 'NAME_CONST issues' in 'Binary Logging of Stored Programs')
*/
if (thd->query_name_consts && mysql_bin_log.is_open() &&
thd->wsrep_binlog_format() == BINLOG_FORMAT_STMT &&
!mysql_bin_log.is_query_in_union(thd, thd->query_id))
{
List_iterator_fast<Item> it(select_lex->item_list);
Item *item;
uint splocal_refs= 0;
/* Count SP local vars in the top-level SELECT list */
while ((item= it++))
{
if (item->get_item_splocal())
splocal_refs++;
}
/*
If it differs from number of NAME_CONST substitution applied,
we may have a SOME_FUNC(NAME_CONST()) in the SELECT list,
that may cause a problem with binary log (see BUG#35383),
raise a warning.
*/
if (splocal_refs != thd->query_name_consts)
push_warning(thd,
Sql_condition::WARN_LEVEL_WARN,
ER_UNKNOWN_ERROR,
"Invoked routine ran a statement that may cause problems with "
"binary log, see 'NAME_CONST issues' in 'Binary Logging of Stored Programs' "
"section of the manual.");
}
select_lex->options|= SELECT_NO_UNLOCK;
unit->set_limit(select_lex);
/*
Disable non-empty MERGE tables with CREATE...SELECT. Too
complicated. See Bug #26379. Empty MERGE tables are read-only
and don't allow CREATE...SELECT anyway.
*/
if (create_info.used_fields & HA_CREATE_USED_UNION)
{
my_error(ER_WRONG_OBJECT, MYF(0), create_table->db,
create_table->table_name, "BASE TABLE");
res= 1;
goto end_with_restore_list;
}
/* Copy temporarily the statement flags to thd for lock_table_names() */
uint save_thd_create_info_options= thd->lex->create_info.options;
thd->lex->create_info.options|= create_info.options;
res= open_and_lock_tables(thd, create_info, lex->query_tables, TRUE, 0);
thd->lex->create_info.options= save_thd_create_info_options;
if (res)
{
/* Got error or warning. Set res to 1 if error */
if (!(res= thd->is_error()))
my_ok(thd); // CREATE ... IF NOT EXISTS
goto end_with_restore_list;
}
/* Ensure we don't try to create something from which we select from */
if (create_info.or_replace() && !create_info.tmp_table())
{
TABLE_LIST *duplicate;
if ((duplicate= unique_table(thd, lex->query_tables,
lex->query_tables->next_global,
CHECK_DUP_FOR_CREATE)))
{
update_non_unique_table_error(lex->query_tables, "CREATE",
duplicate);
res= TRUE;
goto end_with_restore_list;
}
}
{
/*
Remove target table from main select and name resolution
context. This can't be done earlier as it will break view merging in
statements like "CREATE TABLE IF NOT EXISTS existing_view SELECT".
*/
lex->unlink_first_table(&link_to_local);
/* Store reference to table in case of LOCK TABLES */
create_info.table= create_table->table;
/*
select_create is currently not re-execution friendly and
needs to be created for every execution of a PS/SP.
Note: In wsrep-patch, CTAS is handled like a regular transaction.
*/
if ((result= new (thd->mem_root) select_create(thd, create_table,
&create_info,
&alter_info,
select_lex->item_list,
lex->duplicates,
lex->ignore,
select_tables)))
{
/*
CREATE from SELECT give its SELECT_LEX for SELECT,
and item_list belong to SELECT
*/
if (!(res= handle_select(thd, lex, result, 0)))
{
if (create_info.tmp_table())
thd->variables.option_bits|= OPTION_KEEP_LOG;
}
delete result;
}
lex->link_first_table_back(create_table, link_to_local);
}
}
else
{
/* regular create */
if (create_info.like())
{
/* CREATE TABLE ... LIKE ... */
res= mysql_create_like_table(thd, create_table, select_tables,
&create_info);
}
else
{
/*
In STATEMENT format, we probably have to replicate also temporary
tables, like mysql replication does. Also check if the requested
engine is allowed/supported.
*/
if (WSREP(thd) &&
!check_engine(thd, create_table->db, create_table->table_name,
&create_info) &&
(!thd->is_current_stmt_binlog_format_row() ||
!create_info.tmp_table()))
{
WSREP_TO_ISOLATION_BEGIN(create_table->db, create_table->table_name, NULL)
}
/* Regular CREATE TABLE */
res= mysql_create_table(thd, create_table, &create_info, &alter_info);
}
if (!res)
{
/* So that CREATE TEMPORARY TABLE gets to binlog at commit/rollback */
if (create_info.tmp_table())
thd->variables.option_bits|= OPTION_KEEP_LOG;
my_ok(thd);
}
}
end_with_restore_list:
DBUG_RETURN(res);
WSREP_ERROR_LABEL:
DBUG_RETURN(true);
}
...@@ -2454,6 +2454,8 @@ create: ...@@ -2454,6 +2454,8 @@ create:
create_or_replace opt_temporary TABLE_SYM opt_if_not_exists table_ident create_or_replace opt_temporary TABLE_SYM opt_if_not_exists table_ident
{ {
LEX *lex= thd->lex; LEX *lex= thd->lex;
if (!(lex->m_sql_cmd= new (thd->mem_root) Sql_cmd_create_table()))
MYSQL_YYABORT;
lex->create_info.init(); lex->create_info.init();
if (lex->set_command_with_check(SQLCOM_CREATE_TABLE, $2, $1 | $4)) if (lex->set_command_with_check(SQLCOM_CREATE_TABLE, $2, $1 | $4))
MYSQL_YYABORT; MYSQL_YYABORT;
...@@ -2475,16 +2477,6 @@ create: ...@@ -2475,16 +2477,6 @@ create:
{ {
LEX *lex= thd->lex; LEX *lex= thd->lex;
lex->current_select= &lex->select_lex; lex->current_select= &lex->select_lex;
if ((lex->create_info.used_fields & HA_CREATE_USED_ENGINE) &&
!lex->create_info.db_type)
{
lex->create_info.use_default_db_type(thd);
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_WARN_USING_OTHER_HANDLER,
ER_THD(thd, ER_WARN_USING_OTHER_HANDLER),
hton_name(lex->create_info.db_type)->str,
$5->table.str);
}
create_table_set_open_action_and_adjust_tables(lex); create_table_set_open_action_and_adjust_tables(lex);
} }
| create_or_replace opt_unique INDEX_SYM opt_if_not_exists ident | create_or_replace opt_unique INDEX_SYM opt_if_not_exists ident
...@@ -5515,10 +5507,20 @@ create_table_options: ...@@ -5515,10 +5507,20 @@ create_table_options:
; ;
create_table_option: create_table_option:
ENGINE_SYM opt_equal storage_engines ENGINE_SYM opt_equal ident_or_text
{ {
Lex->create_info.db_type= $3; LEX *lex= Lex;
Lex->create_info.used_fields|= HA_CREATE_USED_ENGINE; if (!lex->m_sql_cmd)
{
DBUG_ASSERT(lex->sql_command == SQLCOM_ALTER_TABLE);
if (!(lex->m_sql_cmd= new (thd->mem_root) Sql_cmd_alter_table()))
MYSQL_YYABORT;
}
Storage_engine_name *opt=
lex->m_sql_cmd->option_storage_engine_name();
DBUG_ASSERT(opt); // Expect a proper Sql_cmd
*opt= Storage_engine_name($3);
lex->create_info.used_fields|= HA_CREATE_USED_ENGINE;
} }
| MAX_ROWS opt_equal ulonglong_num | MAX_ROWS opt_equal ulonglong_num
{ {
...@@ -5783,21 +5785,10 @@ default_collation: ...@@ -5783,21 +5785,10 @@ default_collation:
storage_engines: storage_engines:
ident_or_text ident_or_text
{ {
plugin_ref plugin= ha_resolve_by_name(thd, &$1, if (Storage_engine_name($1).
thd->lex->create_info.tmp_table()); resolve_storage_engine_with_error(thd, &$$,
thd->lex->create_info.tmp_table()))
if (plugin) MYSQL_YYABORT;
$$= plugin_hton(plugin);
else
{
if (thd->variables.sql_mode & MODE_NO_ENGINE_SUBSTITUTION)
my_yyabort_error((ER_UNKNOWN_STORAGE_ENGINE, MYF(0), $1.str));
$$= 0;
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_UNKNOWN_STORAGE_ENGINE,
ER_THD(thd, ER_UNKNOWN_STORAGE_ENGINE),
$1.str);
}
} }
; ;
...@@ -7533,11 +7524,6 @@ alter_list_item: ...@@ -7533,11 +7524,6 @@ alter_list_item:
{ {
LEX *lex=Lex; LEX *lex=Lex;
lex->alter_info.flags|= Alter_info::ALTER_OPTIONS; lex->alter_info.flags|= Alter_info::ALTER_OPTIONS;
if ((lex->create_info.used_fields & HA_CREATE_USED_ENGINE) &&
!lex->create_info.db_type)
{
lex->create_info.used_fields&= ~HA_CREATE_USED_ENGINE;
}
} }
| FORCE_SYM | FORCE_SYM
{ {
......
...@@ -2177,8 +2177,20 @@ static bool sql_unusable_for_discovery(THD *thd, handlerton *engine, ...@@ -2177,8 +2177,20 @@ static bool sql_unusable_for_discovery(THD *thd, handlerton *engine,
if (create_info->data_file_name || create_info->index_file_name) if (create_info->data_file_name || create_info->index_file_name)
return 1; return 1;
// ... engine // ... engine
if (create_info->db_type && create_info->db_type != engine) DBUG_ASSERT(lex->m_sql_cmd);
return 1; if (lex->create_info.used_fields & HA_CREATE_USED_ENGINE)
{
/*
TODO: we could just compare engine names here, without resolving.
But this optimization is too late for 10.1.
*/
Storage_engine_name *opt= lex->m_sql_cmd->option_storage_engine_name();
DBUG_ASSERT(opt); // lex->m_sql_cmd must be an Sql_cmd_create_table instance
if (opt->resolve_storage_engine_with_error(thd, &create_info->db_type,
false) ||
(create_info->db_type && create_info->db_type != engine))
return 1;
}
return 0; return 0;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment