Bug #18595 repeated create, insert, drop can cause MySQL table definition cache to corrupt

- add infinite retry on drop table temporary error
parent c30e7916
This diff is collapsed.
This diff is collapsed.
...@@ -4468,12 +4468,13 @@ int ha_ndbcluster::create(const char *name, ...@@ -4468,12 +4468,13 @@ int ha_ndbcluster::create(const char *name,
TABLE *form, TABLE *form,
HA_CREATE_INFO *info) HA_CREATE_INFO *info)
{ {
THD *thd= current_thd;
NDBTAB tab; NDBTAB tab;
NDBCOL col; NDBCOL col;
uint pack_length, length, i, pk_length= 0; uint pack_length, length, i, pk_length= 0;
const void *data, *pack_data; const void *data, *pack_data;
bool create_from_engine= (info->table_options & HA_OPTION_CREATE_FROM_ENGINE); bool create_from_engine= (info->table_options & HA_OPTION_CREATE_FROM_ENGINE);
bool is_truncate= (current_thd->lex->sql_command == SQLCOM_TRUNCATE); bool is_truncate= (thd->lex->sql_command == SQLCOM_TRUNCATE);
DBUG_ENTER("ha_ndbcluster::create"); DBUG_ENTER("ha_ndbcluster::create");
DBUG_PRINT("enter", ("name: %s", name)); DBUG_PRINT("enter", ("name: %s", name));
...@@ -4661,10 +4662,20 @@ int ha_ndbcluster::create(const char *name, ...@@ -4661,10 +4662,20 @@ int ha_ndbcluster::create(const char *name,
Failed to create an index, Failed to create an index,
drop the table (and all it's indexes) drop the table (and all it's indexes)
*/ */
if (dict->dropTableGlobal(*m_table) == 0) while (dict->dropTableGlobal(*m_table))
{ {
m_table = 0; switch (dict->getNdbError().status)
{
case NdbError::TemporaryError:
if (!thd->killed)
continue; // retry indefinitly
break;
default:
break;
}
} }
m_table = 0;
DBUG_RETURN(my_errno);
} }
#ifdef HAVE_NDB_BINLOG #ifdef HAVE_NDB_BINLOG
...@@ -4727,8 +4738,8 @@ int ha_ndbcluster::create(const char *name, ...@@ -4727,8 +4738,8 @@ int ha_ndbcluster::create(const char *name,
*/ */
if (share && !do_event_op) if (share && !do_event_op)
share->flags|= NSF_NO_BINLOG; share->flags|= NSF_NO_BINLOG;
ndbcluster_log_schema_op(current_thd, share, ndbcluster_log_schema_op(thd, share,
current_thd->query, current_thd->query_length, thd->query, thd->query_length,
share->db, share->table_name, share->db, share->table_name,
m_table->getObjectId(), m_table->getObjectId(),
m_table->getObjectVersion(), m_table->getObjectVersion(),
...@@ -5177,9 +5188,9 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb, ...@@ -5177,9 +5188,9 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,
THD *thd= current_thd; THD *thd= current_thd;
DBUG_ENTER("ha_ndbcluster::ndbcluster_delete_table"); DBUG_ENTER("ha_ndbcluster::ndbcluster_delete_table");
NDBDICT *dict= ndb->getDictionary(); NDBDICT *dict= ndb->getDictionary();
#ifdef HAVE_NDB_BINLOG
int ndb_table_id= 0; int ndb_table_id= 0;
int ndb_table_version= 0; int ndb_table_version= 0;
#ifdef HAVE_NDB_BINLOG
/* /*
Don't allow drop table unless Don't allow drop table unless
schema distribution table is setup schema distribution table is setup
...@@ -5197,15 +5208,25 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb, ...@@ -5197,15 +5208,25 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,
int res= 0; int res= 0;
if (h && h->m_table) if (h && h->m_table)
{ {
if (dict->dropTableGlobal(*h->m_table)) retry_temporary_error1:
res= ndb_to_mysql_error(&dict->getNdbError()); if (dict->dropTableGlobal(*h->m_table) == 0)
#ifdef HAVE_NDB_BINLOG
if (res == 0)
{ {
ndb_table_id= h->m_table->getObjectId(); ndb_table_id= h->m_table->getObjectId();
ndb_table_version= h->m_table->getObjectVersion(); ndb_table_version= h->m_table->getObjectVersion();
} }
#endif else
{
switch (dict->getNdbError().status)
{
case NdbError::TemporaryError:
if (!thd->killed)
goto retry_temporary_error1; // retry indefinitly
break;
default:
break;
}
res= ndb_to_mysql_error(&dict->getNdbError());
}
h->release_metadata(thd, ndb); h->release_metadata(thd, ndb);
} }
else else
...@@ -5216,17 +5237,28 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb, ...@@ -5216,17 +5237,28 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,
Ndb_table_guard ndbtab_g(dict, table_name); Ndb_table_guard ndbtab_g(dict, table_name);
if (ndbtab_g.get_table()) if (ndbtab_g.get_table())
{ {
retry_temporary_error2:
if (dict->dropTableGlobal(*ndbtab_g.get_table()) == 0) if (dict->dropTableGlobal(*ndbtab_g.get_table()) == 0)
{ {
#ifdef HAVE_NDB_BINLOG
ndb_table_id= ndbtab_g.get_table()->getObjectId(); ndb_table_id= ndbtab_g.get_table()->getObjectId();
ndb_table_version= ndbtab_g.get_table()->getObjectVersion(); ndb_table_version= ndbtab_g.get_table()->getObjectVersion();
#endif
} }
else if (dict->getNdbError().code == NDB_INVALID_SCHEMA_OBJECT) else
{ {
ndbtab_g.invalidate(); switch (dict->getNdbError().status)
continue; {
case NdbError::TemporaryError:
if (!thd->killed)
goto retry_temporary_error2; // retry indefinitly
break;
default:
if (dict->getNdbError().code == NDB_INVALID_SCHEMA_OBJECT)
{
ndbtab_g.invalidate();
continue;
}
break;
}
} }
} }
else else
...@@ -9747,7 +9779,19 @@ uint ha_ndbcluster::set_up_partition_info(partition_info *part_info, ...@@ -9747,7 +9779,19 @@ uint ha_ndbcluster::set_up_partition_info(partition_info *part_info,
} }
else else
{ {
/* #ifdef NOT_YET
if (!current_thd->variables.new_mode)
{
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_ILLEGAL_HA_CREATE_OPTION,
ER(ER_ILLEGAL_HA_CREATE_OPTION),
ndbcluster_hton_name,
"LIST, RANGE and HASH partition disabled by default,"
" use --new option to enable");
return HA_ERR_UNSUPPORTED;
}
#endif
/*
Create a shadow field for those tables that have user defined Create a shadow field for those tables that have user defined
partitioning. This field stores the value of the partition partitioning. This field stores the value of the partition
function such that NDB can handle reorganisations of the data function such that NDB can handle reorganisations of the data
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment