Commit 09944efd authored by unknown's avatar unknown

BUG#10365 Cluster handler uses non-standard error codes

 - Added better error messages when trying to open a table that can't be discovered or unpacked. The most likely cause of this is that it does not have any frm data, probably since it has been created from NdbApi or is a NDB system table.
 - Separated functionality that was in ha_create_table_from_engine into two functions. One that checks if the table exists and another one that tries to create the table from the engine.


mysql-test/r/ndb_autodiscover.result:
  Add tests for reading from a table that can't be discovered(SYSTAB_0)
  Discovery is not performed during create table anymore.
mysql-test/t/ndb_autodiscover.test:
  Add tests for reading from a table that can't be discovered(SYSTAB_0)
  Discovery is not performed during create table anymore.
ndb/test/ndbapi/create_tab.cpp:
  Set connectstring before creating Ndb object.
sql/ha_ndbcluster.cc:
  Rename and use the function ndbcluster_table_exists_in_engine.
  Correct return valu from ndbcluster_discover
  Remove old code "ndb_discover_tables"
sql/ha_ndbcluster.h:
  Rename function ndbcluster_table_exists to ndb ndbcluster_table_exists_in_engine
sql/handler.cc:
  Update comment of ha_create_table_from_engine
  Remove parameter create_if_found from ha_create_table_from_engine, the function ha_table_exists_in_engine is now used toi check if table is found in engine.
  Cleanup return codes from ha_create_table_from_engine.
  Change name of ha_table_exists to ha_table_exists_in_engine, update comment and returne codes.
sql/handler.h:
  Remove paramter create_if_cound from  ha_create_table_from_engine
  Rename ha_table_exists to ha_table_exists_in_engine
sql/sql_base.cc:
  Use the function ha_table_exists_in_engine to detect if table exists in enegine. 
  If it exists, call function ha_create_table_from_engine to try and create it.
  If create of table fails, set correct error message.
sql/sql_table.cc:
  Add comments, remove parameter create_if_found to ha_create_table_from_engine.
  When dropping a table, try to discover it from engine. If discover fails, use same error message as if the table didn't exists. 
  Maybe another message should be displayed here, ex: "Table could not be dropped, unpack failed"
  
  When creating a new table, use ha_table_exists_in_engine to check if a table with the given name already exists.
parent 5f767b11
...@@ -93,7 +93,7 @@ name char(20), a int, b float, c char(24) ...@@ -93,7 +93,7 @@ name char(20), a int, b float, c char(24)
ERROR 42S01: Table 't3' already exists ERROR 42S01: Table 't3' already exists
show status like 'handler_discover%'; show status like 'handler_discover%';
Variable_name Value Variable_name Value
Handler_discover 1 Handler_discover 0
create table IF NOT EXISTS t3( create table IF NOT EXISTS t3(
id int not null primary key, id int not null primary key,
id2 int not null, id2 int not null,
...@@ -101,7 +101,7 @@ name char(20) ...@@ -101,7 +101,7 @@ name char(20)
) engine=ndb; ) engine=ndb;
show status like 'handler_discover%'; show status like 'handler_discover%';
Variable_name Value Variable_name Value
Handler_discover 2 Handler_discover 0
SHOW CREATE TABLE t3; SHOW CREATE TABLE t3;
Table Create Table Table Create Table
t3 CREATE TABLE `t3` ( t3 CREATE TABLE `t3` (
...@@ -114,7 +114,7 @@ id name ...@@ -114,7 +114,7 @@ id name
1 Explorer 1 Explorer
show status like 'handler_discover%'; show status like 'handler_discover%';
Variable_name Value Variable_name Value
Handler_discover 2 Handler_discover 1
drop table t3; drop table t3;
flush status; flush status;
create table t7( create table t7(
...@@ -358,6 +358,20 @@ Database ...@@ -358,6 +358,20 @@ Database
mysql mysql
test test
use test; use test;
CREATE TABLE sys.SYSTAB_0 (a int);
ERROR 42S01: Table 'SYSTAB_0' already exists
select * from sys.SYSTAB_0;
ERROR HY000: Failed to open 'SYSTAB_0', error while unpacking from engine
CREATE TABLE IF NOT EXISTS sys.SYSTAB_0 (a int);
show warnings;
Level Code Message
select * from sys.SYSTAB_0;
ERROR HY000: Failed to open 'SYSTAB_0', error while unpacking from engine
drop table sys.SYSTAB_0;
ERROR 42S02: Unknown table 'SYSTAB_0'
drop table IF EXISTS sys.SYSTAB_0;
Warnings:
Note 1051 Unknown table 'SYSTAB_0'
CREATE TABLE t9 ( CREATE TABLE t9 (
a int NOT NULL PRIMARY KEY, a int NOT NULL PRIMARY KEY,
b int b int
......
...@@ -453,6 +453,27 @@ drop database test2; ...@@ -453,6 +453,27 @@ drop database test2;
show databases; show databases;
use test; use test;
#####################################################
# Test that it's not possible to create tables
# with same name as NDB internal tables
# This will also test that it's not possible to create
# a table with tha same name as a table that can't be
# discovered( for example a table created via NDBAPI)
--error 1050
CREATE TABLE sys.SYSTAB_0 (a int);
--error 1105
select * from sys.SYSTAB_0;
CREATE TABLE IF NOT EXISTS sys.SYSTAB_0 (a int);
show warnings;
--error 1105
select * from sys.SYSTAB_0;
--error 1051
drop table sys.SYSTAB_0;
drop table IF EXISTS sys.SYSTAB_0;
###################################################### ######################################################
# Note! This should always be the last step in this # Note! This should always be the last step in this
# file, the table t9 will be used and dropped # file, the table t9 will be used and dropped
......
...@@ -77,8 +77,8 @@ int main(int argc, const char** argv){ ...@@ -77,8 +77,8 @@ int main(int argc, const char** argv){
*/ */
// Connect to Ndb // Connect to Ndb
Ndb::setConnectString(_connectstr);
Ndb MyNdb( "TEST_DB" ); Ndb MyNdb( "TEST_DB" );
MyNdb.setConnectString(_connectstr);
if(MyNdb.init() != 0){ if(MyNdb.init() != 0){
ERR(MyNdb.getNdbError()); ERR(MyNdb.getNdbError());
......
...@@ -4308,13 +4308,15 @@ int ndbcluster_discover(THD* thd, const char *db, const char *name, ...@@ -4308,13 +4308,15 @@ int ndbcluster_discover(THD* thd, const char *db, const char *name,
len= tab->getFrmLength(); len= tab->getFrmLength();
if (len == 0 || tab->getFrmData() == NULL) if (len == 0 || tab->getFrmData() == NULL)
{ {
DBUG_PRINT("No frm data found", DBUG_PRINT("error", ("No frm data found."));
("Table is probably created via NdbApi")); DBUG_RETURN(1);
DBUG_RETURN(2);
} }
if (unpackfrm(&data, &len, tab->getFrmData())) if (unpackfrm(&data, &len, tab->getFrmData()))
DBUG_RETURN(3); {
DBUG_PRINT("error", ("Could not unpack table"));
DBUG_RETURN(1);
}
*frmlen= len; *frmlen= len;
*frmblob= data; *frmblob= data;
...@@ -4327,13 +4329,13 @@ int ndbcluster_discover(THD* thd, const char *db, const char *name, ...@@ -4327,13 +4329,13 @@ int ndbcluster_discover(THD* thd, const char *db, const char *name,
*/ */
int ndbcluster_table_exists(THD* thd, const char *db, const char *name) int ndbcluster_table_exists_in_engine(THD* thd, const char *db, const char *name)
{ {
uint len; uint len;
const void* data; const void* data;
const NDBTAB* tab; const NDBTAB* tab;
Ndb* ndb; Ndb* ndb;
DBUG_ENTER("ndbcluster_table_exists"); DBUG_ENTER("ndbcluster_table_exists_in_engine");
DBUG_PRINT("enter", ("db: %s, name: %s", db, name)); DBUG_PRINT("enter", ("db: %s, name: %s", db, name));
if (!(ndb= check_ndb_in_thd(thd))) if (!(ndb= check_ndb_in_thd(thd)))
...@@ -4512,7 +4514,7 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path, ...@@ -4512,7 +4514,7 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path,
DBUG_PRINT("info", ("%s existed on disk", name)); DBUG_PRINT("info", ("%s existed on disk", name));
// The .ndb file exists on disk, but it's not in list of tables in ndb // The .ndb file exists on disk, but it's not in list of tables in ndb
// Verify that handler agrees table is gone. // Verify that handler agrees table is gone.
if (ndbcluster_table_exists(thd, db, file_name) == 0) if (ndbcluster_table_exists_in_engine(thd, db, file_name) == 0)
{ {
DBUG_PRINT("info", ("NDB says %s does not exists", file_name)); DBUG_PRINT("info", ("NDB says %s does not exists", file_name));
it.remove(); it.remove();
...@@ -4563,7 +4565,7 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path, ...@@ -4563,7 +4565,7 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path,
while ((file_name=it2++)) while ((file_name=it2++))
{ {
DBUG_PRINT("info", ("Table %s need discovery", name)); DBUG_PRINT("info", ("Table %s need discovery", name));
if (ha_create_table_from_engine(thd, db, file_name, TRUE) == 0) if (ha_create_table_from_engine(thd, db, file_name) == 0)
files->push_back(thd->strdup(file_name)); files->push_back(thd->strdup(file_name));
} }
...@@ -4639,11 +4641,8 @@ bool ndbcluster_init() ...@@ -4639,11 +4641,8 @@ bool ndbcluster_init()
pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST); pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
ndbcluster_inited= 1; ndbcluster_inited= 1;
#ifdef USE_DISCOVER_ON_STARTUP
if (ndb_discover_tables() != 0)
goto ndbcluster_init_error;
#endif
DBUG_RETURN(FALSE); DBUG_RETURN(FALSE);
ndbcluster_init_error: ndbcluster_init_error:
ndbcluster_end(); ndbcluster_end();
DBUG_RETURN(TRUE); DBUG_RETURN(TRUE);
......
...@@ -275,7 +275,8 @@ int ndbcluster_discover(THD* thd, const char* dbname, const char* name, ...@@ -275,7 +275,8 @@ int ndbcluster_discover(THD* thd, const char* dbname, const char* name,
const void** frmblob, uint* frmlen); const void** frmblob, uint* frmlen);
int ndbcluster_find_files(THD *thd,const char *db,const char *path, int ndbcluster_find_files(THD *thd,const char *db,const char *path,
const char *wild, bool dir, List<char> *files); const char *wild, bool dir, List<char> *files);
int ndbcluster_table_exists(THD* thd, const char *db, const char *name); int ndbcluster_table_exists_in_engine(THD* thd,
const char *db, const char *name);
int ndbcluster_drop_database(const char* path); int ndbcluster_drop_database(const char* path);
void ndbcluster_print_error(int error, const NdbOperation *error_op); void ndbcluster_print_error(int error, const NdbOperation *error_op);
...@@ -1340,17 +1340,14 @@ int ha_create_table(const char *name, HA_CREATE_INFO *create_info, ...@@ -1340,17 +1340,14 @@ int ha_create_table(const char *name, HA_CREATE_INFO *create_info,
if found, write the frm file to disk. if found, write the frm file to disk.
RETURN VALUES: RETURN VALUES:
0 : Table existed in engine and created 0 : Table created ok
on disk if so requested 1 : Table could not be created
1 : Table does not exist
>1 : error
*/ */
int ha_create_table_from_engine(THD* thd, int ha_create_table_from_engine(THD* thd,
const char *db, const char *db,
const char *name, const char *name)
bool create_if_found)
{ {
int error; int error;
const void *frmblob; const void *frmblob;
...@@ -1359,24 +1356,29 @@ int ha_create_table_from_engine(THD* thd, ...@@ -1359,24 +1356,29 @@ int ha_create_table_from_engine(THD* thd,
HA_CREATE_INFO create_info; HA_CREATE_INFO create_info;
TABLE table; TABLE table;
DBUG_ENTER("ha_create_table_from_engine"); DBUG_ENTER("ha_create_table_from_engine");
DBUG_PRINT("enter", ("name '%s'.'%s' create_if_found: %d", DBUG_PRINT("enter", ("name '%s'.'%s'",
db, name, create_if_found)); db, name));
bzero((char*) &create_info,sizeof(create_info)); bzero((char*) &create_info,sizeof(create_info));
if ((error= ha_discover(thd, db, name, &frmblob, &frmlen))) if(ha_discover(thd, db, name, &frmblob, &frmlen))
DBUG_RETURN(error); {
// Table could not be discovered and thus not created
DBUG_RETURN(1);
}
/* /*
Table exists in handler Table exists in handler and could be discovered
frmblob and frmlen are set frmblob and frmlen are set, write the frm to disk
*/ */
if (create_if_found)
{
(void)strxnmov(path,FN_REFLEN,mysql_data_home,"/",db,"/",name,NullS); (void)strxnmov(path,FN_REFLEN,mysql_data_home,"/",db,"/",name,NullS);
// Save the frm file // Save the frm file
if ((error = writefrm(path, frmblob, frmlen))) if (writefrm(path, frmblob, frmlen))
goto err_end; {
my_free((char*) frmblob, MYF(MY_ALLOW_ZERO_PTR));
DBUG_RETURN(1);
}
if (openfrm(path,"",0,(uint) READ_ALL, 0, &table)) if (openfrm(path,"",0,(uint) READ_ALL, 0, &table))
DBUG_RETURN(1); DBUG_RETURN(1);
...@@ -1390,14 +1392,11 @@ int ha_create_table_from_engine(THD* thd, ...@@ -1390,14 +1392,11 @@ int ha_create_table_from_engine(THD* thd,
/* Ensure that handler gets name in lower case */ /* Ensure that handler gets name in lower case */
my_casedn_str(files_charset_info, path); my_casedn_str(files_charset_info, path);
} }
error=table.file->create(path,&table,&create_info); error=table.file->create(path,&table,&create_info);
VOID(closefrm(&table)); VOID(closefrm(&table));
}
err_end:
my_free((char*) frmblob, MYF(MY_ALLOW_ZERO_PTR)); my_free((char*) frmblob, MYF(MY_ALLOW_ZERO_PTR));
DBUG_RETURN(error);
DBUG_RETURN(error != 0);
} }
static int NEAR_F delete_file(const char *name,const char *ext,int extflag) static int NEAR_F delete_file(const char *name,const char *ext,int extflag)
...@@ -1508,7 +1507,7 @@ int ha_change_key_cache(KEY_CACHE *old_key_cache, ...@@ -1508,7 +1507,7 @@ int ha_change_key_cache(KEY_CACHE *old_key_cache,
RETURN RETURN
0 ok. In this case *frmblob and *frmlen are set 0 ok. In this case *frmblob and *frmlen are set
1 error. frmblob and frmlen may not be set >=1 error. frmblob and frmlen may not be set
*/ */
int ha_discover(THD *thd, const char *db, const char *name, int ha_discover(THD *thd, const char *db, const char *name,
...@@ -1546,11 +1545,8 @@ ha_find_files(THD *thd,const char *db,const char *path, ...@@ -1546,11 +1545,8 @@ ha_find_files(THD *thd,const char *db,const char *path,
error= ndbcluster_find_files(thd, db, path, wild, dir, files); error= ndbcluster_find_files(thd, db, path, wild, dir, files);
#endif #endif
DBUG_RETURN(error); DBUG_RETURN(error);
} }
#ifdef NOT_YET_USED
/* /*
Ask handler if the table exists in engine Ask handler if the table exists in engine
...@@ -1561,20 +1557,19 @@ ha_find_files(THD *thd,const char *db,const char *path, ...@@ -1561,20 +1557,19 @@ ha_find_files(THD *thd,const char *db,const char *path,
# Error code # Error code
*/ */
int ha_table_exists(THD* thd, const char* db, const char* name) int ha_table_exists_in_engine(THD* thd, const char* db, const char* name)
{ {
int error= 2; int error= 0;
DBUG_ENTER("ha_table_exists"); DBUG_ENTER("ha_table_exists_in_engine");
DBUG_PRINT("enter", ("db: %s, name: %s", db, name)); DBUG_PRINT("enter", ("db: %s, name: %s", db, name));
#ifdef HAVE_NDBCLUSTER_DB #ifdef HAVE_NDBCLUSTER_DB
if (have_ndbcluster == SHOW_OPTION_YES) if (have_ndbcluster == SHOW_OPTION_YES)
error= ndbcluster_table_exists(thd, db, name); error= ndbcluster_table_exists_in_engine(thd, db, name);
#endif #endif
DBUG_PRINT("exit", ("error: %d", error));
DBUG_RETURN(error); DBUG_RETURN(error);
} }
#endif
/* /*
Read first row between two ranges. Read first row between two ranges.
......
...@@ -547,8 +547,7 @@ enum db_type ha_checktype(enum db_type database_type); ...@@ -547,8 +547,7 @@ enum db_type ha_checktype(enum db_type database_type);
my_bool ha_storage_engine_is_enabled(enum db_type database_type); my_bool ha_storage_engine_is_enabled(enum db_type database_type);
int ha_create_table(const char *name, HA_CREATE_INFO *create_info, int ha_create_table(const char *name, HA_CREATE_INFO *create_info,
bool update_create_info); bool update_create_info);
int ha_create_table_from_engine(THD* thd, const char *db, const char *name, int ha_create_table_from_engine(THD* thd, const char *db, const char *name);
bool create_if_found);
int ha_delete_table(enum db_type db_type, const char *path); int ha_delete_table(enum db_type db_type, const char *path);
void ha_drop_database(char* path); void ha_drop_database(char* path);
int ha_init_key_cache(const char *name, KEY_CACHE *key_cache); int ha_init_key_cache(const char *name, KEY_CACHE *key_cache);
...@@ -574,6 +573,6 @@ int ha_discover(THD* thd, const char* dbname, const char* name, ...@@ -574,6 +573,6 @@ int ha_discover(THD* thd, const char* dbname, const char* name,
const void** frmblob, uint* frmlen); const void** frmblob, uint* frmlen);
int ha_find_files(THD *thd,const char *db,const char *path, int ha_find_files(THD *thd,const char *db,const char *path,
const char *wild, bool dir,List<char>* files); const char *wild, bool dir,List<char>* files);
int ha_table_exists(THD* thd, const char* db, const char* name); int ha_table_exists_in_engine(THD* thd, const char* db, const char* name);
TYPELIB *ha_known_exts(void); TYPELIB *ha_known_exts(void);
int ha_start_consistent_snapshot(THD *thd); int ha_start_consistent_snapshot(THD *thd);
...@@ -1376,8 +1376,19 @@ static int open_unireg_entry(THD *thd, TABLE *entry, const char *db, ...@@ -1376,8 +1376,19 @@ static int open_unireg_entry(THD *thd, TABLE *entry, const char *db,
*/ */
if (discover_retry_count++ != 0) if (discover_retry_count++ != 0)
goto err; goto err;
if (ha_create_table_from_engine(thd, db, name, TRUE) != 0) if (ha_table_exists_in_engine(thd, db, name) &&
ha_create_table_from_engine(thd, db, name))
{
/* Give right error message */
thd->clear_error();
DBUG_PRINT("error", ("Dicovery of %s/%s failed", db, name));
my_printf_error(ER_UNKNOWN_ERROR,
"Failed to open '%-.64s', error while "
"unpacking from engine",
MYF(0), name);
goto err; goto err;
}
thd->clear_error(); // Clear error message thd->clear_error(); // Clear error message
continue; continue;
......
...@@ -218,14 +218,17 @@ int mysql_rm_table_part2(THD *thd, TABLE_LIST *tables, bool if_exists, ...@@ -218,14 +218,17 @@ int mysql_rm_table_part2(THD *thd, TABLE_LIST *tables, bool if_exists,
(void) unpack_filename(path,path); (void) unpack_filename(path,path);
} }
if (drop_temporary || if (drop_temporary ||
(access(path,F_OK) && ha_create_table_from_engine(thd,db,alias,TRUE))) (access(path,F_OK) &&
ha_create_table_from_engine(thd, db, alias)))
{ {
// Table was not found on disk and table can't be created from engine
if (if_exists) if (if_exists)
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
ER_BAD_TABLE_ERROR, ER(ER_BAD_TABLE_ERROR), ER_BAD_TABLE_ERROR, ER(ER_BAD_TABLE_ERROR),
table->real_name); table->real_name);
else else
error= 1; error= 1;
} }
else else
{ {
...@@ -1371,10 +1374,9 @@ int mysql_create_table(THD *thd,const char *db, const char *table_name, ...@@ -1371,10 +1374,9 @@ int mysql_create_table(THD *thd,const char *db, const char *table_name,
{ {
bool create_if_not_exists = bool create_if_not_exists =
create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS; create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS;
if (!ha_create_table_from_engine(thd, db, table_name, if (ha_table_exists_in_engine(thd, db, table_name))
create_if_not_exists))
{ {
DBUG_PRINT("info", ("Table already existed in handler")); DBUG_PRINT("info", ("Table with same name already existed in handler"));
if (create_if_not_exists) if (create_if_not_exists)
{ {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment