Commit bf31ceb7 authored by unknown's avatar unknown

Fix for bug#6935 table rename does not work with ndb tables

parent 948a2655
DROP TABLE IF EXISTS t1; DROP TABLE IF EXISTS t1;
drop database if exists mysqltest;
CREATE TABLE t1 ( CREATE TABLE t1 (
a INT NOT NULL, a INT NOT NULL,
b INT NOT NULL b INT NOT NULL
...@@ -9,6 +10,21 @@ SELECT * FROM t1; ...@@ -9,6 +10,21 @@ SELECT * FROM t1;
a b c a b c
9410 9412 0 9410 9412 0
DROP TABLE t1; DROP TABLE t1;
CREATE DATABASE mysqltest;
USE mysqltest;
CREATE TABLE t1 (
a INT NOT NULL,
b INT NOT NULL
) ENGINE=ndbcluster;
RENAME TABLE t1 TO test.t1;
SHOW TABLES;
Tables_in_mysqltest
DROP DATABASE mysqltest;
USE test;
SHOW TABLES;
Tables_in_test
t1
DROP TABLE t1;
create table t1 ( create table t1 (
col1 int not null auto_increment primary key, col1 int not null auto_increment primary key,
col2 varchar(30) not null, col2 varchar(30) not null,
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
--disable_warnings --disable_warnings
DROP TABLE IF EXISTS t1; DROP TABLE IF EXISTS t1;
drop database if exists mysqltest;
--enable_warnings --enable_warnings
# #
...@@ -20,6 +21,22 @@ SELECT * FROM t1; ...@@ -20,6 +21,22 @@ SELECT * FROM t1;
DROP TABLE t1; DROP TABLE t1;
#
# Verfify changing table names between databases
#
CREATE DATABASE mysqltest;
USE mysqltest;
CREATE TABLE t1 (
a INT NOT NULL,
b INT NOT NULL
) ENGINE=ndbcluster;
RENAME TABLE t1 TO test.t1;
SHOW TABLES;
DROP DATABASE mysqltest;
USE test;
SHOW TABLES;
DROP TABLE t1;
# #
# More advanced test # More advanced test
# #
......
...@@ -989,6 +989,13 @@ public: ...@@ -989,6 +989,13 @@ public:
*/ */
Table getTableForAlteration(const char * name); Table getTableForAlteration(const char * name);
/**
* Get copy a copy of a table for alteration.
* @param table Table object to alter
* @return table if successful. NULL if undefined
*/
Table getTableForAlteration(const Table &);
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
/** /**
* Invalidate cached table object * Invalidate cached table object
......
...@@ -775,12 +775,17 @@ NdbDictionary::Dictionary::removeCachedTable(const char * name){ ...@@ -775,12 +775,17 @@ NdbDictionary::Dictionary::removeCachedTable(const char * name){
NdbDictionary::Table NdbDictionary::Table
NdbDictionary::Dictionary::getTableForAlteration(const char * name){ NdbDictionary::Dictionary::getTableForAlteration(const char * name){
const NdbDictionary::Table * oldTable = getTable(name); const Table * oldTable = getTable(name);
return (oldTable) ? return (oldTable) ?
NdbDictionary::Table(*oldTable) NdbDictionary::Table(*oldTable)
: NdbDictionary::Table(); : NdbDictionary::Table();
} }
NdbDictionary::Table
NdbDictionary::Dictionary::getTableForAlteration(const Table & tab){
return NdbDictionary::Table(tab);
}
int int
NdbDictionary::Dictionary::createIndex(const Index & ind) NdbDictionary::Dictionary::createIndex(const Index & ind)
{ {
......
...@@ -1411,15 +1411,14 @@ int NdbDictionaryImpl::alterTable(NdbTableImpl &impl) ...@@ -1411,15 +1411,14 @@ int NdbDictionaryImpl::alterTable(NdbTableImpl &impl)
const char * originalInternalName = internalName.c_str(); const char * originalInternalName = internalName.c_str();
BaseString externalName = impl.m_externalName; BaseString externalName = impl.m_externalName;
const char * originalExternalName = externalName.c_str(); const char * originalExternalName = externalName.c_str();
NdbTableImpl * oldTab = getTable(originalExternalName);
DBUG_ENTER("NdbDictionaryImpl::alterTable");
if(!oldTab){ if(!get_local_table_info(originalInternalName, false)){
m_error.code = 709; m_error.code = 709;
return -1; DBUG_RETURN(-1);
} }
// Alter the table // Alter the table
int ret = m_receiver.alterTable(m_ndb, impl); int ret = m_receiver.alterTable(m_ndb, impl);
if(ret == 0){ if(ret == 0){
// Remove cached information and let it be refreshed at next access // Remove cached information and let it be refreshed at next access
if (m_localHash.get(originalInternalName) != NULL) { if (m_localHash.get(originalInternalName) != NULL) {
...@@ -1433,7 +1432,7 @@ int NdbDictionaryImpl::alterTable(NdbTableImpl &impl) ...@@ -1433,7 +1432,7 @@ int NdbDictionaryImpl::alterTable(NdbTableImpl &impl)
m_globalHash->unlock(); m_globalHash->unlock();
} }
} }
return ret; DBUG_RETURN(ret)
} }
int int
...@@ -1448,15 +1447,16 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb, ...@@ -1448,15 +1447,16 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb,
NdbTableImpl & impl, NdbTableImpl & impl,
bool alter) bool alter)
{ {
DBUG_ENTER("NdbDictInterface::createOrAlterTable");
unsigned i; unsigned i;
if((unsigned)impl.getNoOfPrimaryKeys() > NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY){ if((unsigned)impl.getNoOfPrimaryKeys() > NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY){
m_error.code = 4317; m_error.code = 4317;
return -1; DBUG_RETURN(-1);
} }
unsigned sz = impl.m_columns.size(); unsigned sz = impl.m_columns.size();
if (sz > NDB_MAX_ATTRIBUTES_IN_TABLE){ if (sz > NDB_MAX_ATTRIBUTES_IN_TABLE){
m_error.code = 4318; m_error.code = 4318;
return -1; DBUG_RETURN(-1);
} }
impl.copyNewProperties(); impl.copyNewProperties();
...@@ -1491,7 +1491,7 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb, ...@@ -1491,7 +1491,7 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb,
// Check max length of frm data // Check max length of frm data
if (impl.m_frm.length() > MAX_FRM_DATA_SIZE){ if (impl.m_frm.length() > MAX_FRM_DATA_SIZE){
m_error.code = 1229; m_error.code = 1229;
return -1; DBUG_RETURN(-1);
} }
tmpTab.FrmLen = impl.m_frm.length(); tmpTab.FrmLen = impl.m_frm.length();
memcpy(tmpTab.FrmData, impl.m_frm.get_data(), impl.m_frm.length()); memcpy(tmpTab.FrmData, impl.m_frm.get_data(), impl.m_frm.length());
...@@ -1543,12 +1543,12 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb, ...@@ -1543,12 +1543,12 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb,
// charset is defined exactly for char types // charset is defined exactly for char types
if (col->getCharType() != (col->m_cs != NULL)) { if (col->getCharType() != (col->m_cs != NULL)) {
m_error.code = 703; m_error.code = 703;
return -1; DBUG_RETURN(-1);
} }
// primary key type check // primary key type check
if (col->m_pk && ! NdbSqlUtil::usable_in_pk(col->m_type, col->m_cs)) { if (col->m_pk && ! NdbSqlUtil::usable_in_pk(col->m_type, col->m_cs)) {
m_error.code = 743; m_error.code = 743;
return -1; DBUG_RETURN(-1);
} }
// charset in upper half of precision // charset in upper half of precision
if (col->getCharType()) { if (col->getCharType()) {
...@@ -1616,7 +1616,7 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb, ...@@ -1616,7 +1616,7 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb,
} }
} }
} }
return ret; DBUG_RETURN(ret);
} }
int int
...@@ -1675,17 +1675,17 @@ NdbDictInterface::alterTable(NdbApiSignal* signal, LinearSectionPtr ptr[3]) ...@@ -1675,17 +1675,17 @@ NdbDictInterface::alterTable(NdbApiSignal* signal, LinearSectionPtr ptr[3])
int errCodes[noErrCodes] = int errCodes[noErrCodes] =
{AlterTableRef::NotMaster, {AlterTableRef::NotMaster,
AlterTableRef::Busy}; AlterTableRef::Busy};
int r = dictSignal(signal,ptr,1, int r = dictSignal(signal,ptr,1,
1/*use masternode id*/, 1/*use masternode id*/,
100,WAIT_ALTER_TAB_REQ, 100,WAIT_ALTER_TAB_REQ,
WAITFOR_RESPONSE_TIMEOUT, WAITFOR_RESPONSE_TIMEOUT,
errCodes, noErrCodes); errCodes, noErrCodes);
if(m_error.code == AlterTableRef::InvalidTableVersion) { if(m_error.code == AlterTableRef::InvalidTableVersion) {
// Clear caches and try again // Clear caches and try again
return INCOMPATIBLE_VERSION; return INCOMPATIBLE_VERSION;
} }
return r; return r;
} }
void void
......
...@@ -3627,9 +3627,13 @@ int ha_ndbcluster::create_index(const char *name, ...@@ -3627,9 +3627,13 @@ int ha_ndbcluster::create_index(const char *name,
int ha_ndbcluster::rename_table(const char *from, const char *to) int ha_ndbcluster::rename_table(const char *from, const char *to)
{ {
NDBDICT *dict;
char new_tabname[FN_HEADLEN]; char new_tabname[FN_HEADLEN];
const NDBTAB *orig_tab;
int result;
DBUG_ENTER("ha_ndbcluster::rename_table"); DBUG_ENTER("ha_ndbcluster::rename_table");
DBUG_PRINT("info", ("Renaming %s to %s", from, to));
set_dbname(from); set_dbname(from);
set_tabname(from); set_tabname(from);
set_tabname(to, new_tabname); set_tabname(to, new_tabname);
...@@ -3637,14 +3641,20 @@ int ha_ndbcluster::rename_table(const char *from, const char *to) ...@@ -3637,14 +3641,20 @@ int ha_ndbcluster::rename_table(const char *from, const char *to)
if (check_ndb_connection()) if (check_ndb_connection())
DBUG_RETURN(my_errno= HA_ERR_NO_CONNECTION); DBUG_RETURN(my_errno= HA_ERR_NO_CONNECTION);
dict= m_ndb->getDictionary();
if (!(orig_tab= dict->getTable(m_tabname)))
ERR_RETURN(dict->getNdbError());
int result= alter_table_name(m_tabname, new_tabname); m_table= (void *)orig_tab;
if (result == 0) // Change current database to that of target table
set_dbname(to);
m_ndb->setDatabaseName(m_dbname);
if (!(result= alter_table_name(new_tabname)))
{ {
set_tabname(to); // Rename .ndb file
handler::rename_table(from, to); result= handler::rename_table(from, to);
} }
DBUG_RETURN(result); DBUG_RETURN(result);
} }
...@@ -3653,19 +3663,16 @@ int ha_ndbcluster::rename_table(const char *from, const char *to) ...@@ -3653,19 +3663,16 @@ int ha_ndbcluster::rename_table(const char *from, const char *to)
Rename a table in NDB Cluster using alter table Rename a table in NDB Cluster using alter table
*/ */
int ha_ndbcluster::alter_table_name(const char *from, const char *to) int ha_ndbcluster::alter_table_name(const char *to)
{ {
NDBDICT *dict= m_ndb->getDictionary(); NDBDICT * dict= m_ndb->getDictionary();
const NDBTAB *orig_tab; const NDBTAB *orig_tab= (const NDBTAB *) m_table;
int ret;
DBUG_ENTER("alter_table_name_table"); DBUG_ENTER("alter_table_name_table");
DBUG_PRINT("enter", ("Renaming %s to %s", from, to));
if (!(orig_tab= dict->getTable(from))) NdbDictionary::Table new_tab= dict->getTableForAlteration(*orig_tab);
ERR_RETURN(dict->getNdbError()); new_tab.setName(to);
if (dict->alterTable(new_tab) != 0)
NdbDictionary::Table copy_tab= dict->getTableForAlteration(from);
copy_tab.setName(to);
if (dict->alterTable(copy_tab) != 0)
ERR_RETURN(dict->getNdbError()); ERR_RETURN(dict->getNdbError());
m_table= NULL; m_table= NULL;
...@@ -3688,7 +3695,7 @@ int ha_ndbcluster::delete_table(const char *name) ...@@ -3688,7 +3695,7 @@ int ha_ndbcluster::delete_table(const char *name)
if (check_ndb_connection()) if (check_ndb_connection())
DBUG_RETURN(HA_ERR_NO_CONNECTION); DBUG_RETURN(HA_ERR_NO_CONNECTION);
// Remove .ndb file
handler::delete_table(name); handler::delete_table(name);
DBUG_RETURN(drop_table()); DBUG_RETURN(drop_table());
} }
...@@ -3944,6 +3951,7 @@ Ndb* check_ndb_in_thd(THD* thd) ...@@ -3944,6 +3951,7 @@ Ndb* check_ndb_in_thd(THD* thd)
} }
int ha_ndbcluster::check_ndb_connection() int ha_ndbcluster::check_ndb_connection()
{ {
THD* thd= current_thd; THD* thd= current_thd;
......
...@@ -148,7 +148,7 @@ class ha_ndbcluster: public handler ...@@ -148,7 +148,7 @@ class ha_ndbcluster: public handler
uint8 table_cache_type(); uint8 table_cache_type();
private: private:
int alter_table_name(const char *from, const char *to); int alter_table_name(const char *to);
int drop_table(); int drop_table();
int create_index(const char *name, KEY *key_info, bool unique); int create_index(const char *name, KEY *key_info, bool unique);
int create_ordered_index(const char *name, KEY *key_info); int create_ordered_index(const char *name, KEY *key_info);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment