Commit b2571980 authored by unknown's avatar unknown

Added on-line handling of altered frm in binlog thread

parent 6ba820f2
...@@ -3,10 +3,19 @@ create table t1 ( a int primary key, b varchar(10), c varchar(10), index (b) ) ...@@ -3,10 +3,19 @@ create table t1 ( a int primary key, b varchar(10), c varchar(10), index (b) )
engine=ndb; engine=ndb;
insert into t1 values (1,'one','one'), (2,'two','two'), (3,'three','three'); insert into t1 values (1,'one','one'), (2,'two','two'), (3,'three','three');
create index c on t1(c); create index c on t1(c);
show indexes from t1;
Table Non_unique Key_name Seq_in_index Column_name Collation Cardinality Sub_part Packed Null Index_type Comment
t1 0 PRIMARY 1 a A 3 NULL NULL BTREE
t1 1 b 1 b A 3 NULL NULL YES BTREE
t1 1 c 1 c A 3 NULL NULL YES BTREE
select * from t1 where c = 'two'; select * from t1 where c = 'two';
a b c a b c
2 two two 2 two two
alter table t1 drop index c; alter table t1 drop index c;
show indexes from t1;
Table Non_unique Key_name Seq_in_index Column_name Collation Cardinality Sub_part Packed Null Index_type Comment
t1 0 PRIMARY 1 a A 3 NULL NULL BTREE
t1 1 b 1 b A 3 NULL NULL YES BTREE
select * from t1 where c = 'two'; select * from t1 where c = 'two';
a b c a b c
2 two two 2 two two
......
...@@ -13,10 +13,12 @@ engine=ndb; ...@@ -13,10 +13,12 @@ engine=ndb;
insert into t1 values (1,'one','one'), (2,'two','two'), (3,'three','three'); insert into t1 values (1,'one','one'), (2,'two','two'), (3,'three','three');
create index c on t1(c); create index c on t1(c);
connection server2; connection server2;
show indexes from t1;
select * from t1 where c = 'two'; select * from t1 where c = 'two';
connection server1; connection server1;
alter table t1 drop index c; alter table t1 drop index c;
connection server2; connection server2;
show indexes from t1;
select * from t1 where c = 'two'; select * from t1 where c = 'two';
connection server1; connection server1;
drop table t1; drop table t1;
......
...@@ -993,7 +993,7 @@ bool ha_ndbcluster::uses_blob_value() ...@@ -993,7 +993,7 @@ bool ha_ndbcluster::uses_blob_value()
-2 Meta data has changed; Re-read data and try again -2 Meta data has changed; Re-read data and try again
*/ */
static int cmp_frm(const NDBTAB *ndbtab, const void *pack_data, int cmp_frm(const NDBTAB *ndbtab, const void *pack_data,
uint pack_length) uint pack_length)
{ {
DBUG_ENTER("cmp_frm"); DBUG_ENTER("cmp_frm");
......
...@@ -1291,20 +1291,87 @@ static int ...@@ -1291,20 +1291,87 @@ static int
ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
NDB_SHARE *share) NDB_SHARE *share)
{ {
DBUG_ENTER("ndb_handle_schema_change");
int remote_drop_table= 0, do_close_cached_tables= 0; int remote_drop_table= 0, do_close_cached_tables= 0;
const char *dbname= share->table->s->db.str;
const char *tabname= share->table->s->table_name.str;
bool online_alter_table= (pOp->getEventType() == NDBEVENT::TE_ALTER &&
pOp->tableFrmChanged());
if (pOp->getEventType() != NDBEVENT::TE_CLUSTER_FAILURE && if (pOp->getEventType() != NDBEVENT::TE_CLUSTER_FAILURE &&
pOp->getReqNodeId() != g_ndb_cluster_connection->node_id()) pOp->getReqNodeId() != g_ndb_cluster_connection->node_id())
{ {
ndb->setDatabaseName(share->table->s->db.str); NDBDICT *dict= ndb->getDictionary();
NdbDictionary::Dictionary::List index_list;
ndb->setDatabaseName(dbname);
// Invalidating indexes
if (! dict->listIndexes(index_list, tabname))
{
for (unsigned i = 0; i < index_list.count; i++) {
NdbDictionary::Dictionary::List::Element& index=
index_list.elements[i];
DBUG_PRINT("info", ("Invalidating index %s.%s",
index.database, index.name));
dict->invalidateIndex(index.name, tabname);
}
}
// Invalidate table
ha_ndbcluster::invalidate_dictionary_cache(share->table->s, ha_ndbcluster::invalidate_dictionary_cache(share->table->s,
ndb, ndb,
share->table->s->db.str, dbname,
share->table->s->table_name.str, tabname,
TRUE); TRUE);
if (online_alter_table)
{
char key[FN_REFLEN];
const void *data= 0, *pack_data= 0;
uint length, pack_length;
int error;
DBUG_PRINT("info", ("Detected frm change of table %s.%s",
dbname, tabname));
const NDBTAB *altered_table= pOp->getEvent()->getTable();
bool remote_event=
pOp->getReqNodeId() != g_ndb_cluster_connection->node_id();
strxnmov(key, FN_LEN-1, mysql_data_home, "/",
dbname, "/", tabname, NullS);
/*
If the frm of the altered table is different than the one on
disk then overwrite it with the new table definition
*/
if (remote_event &&
readfrm(key, &data, &length) == 0 &&
packfrm(data, length, &pack_data, &pack_length) == 0 &&
cmp_frm(altered_table, pack_data, pack_length))
{
DBUG_DUMP("frm", (char*)altered_table->getFrmData(),
altered_table->getFrmLength());
pthread_mutex_lock(&LOCK_open);
dict->putTable(altered_table);
if ((error= unpackfrm(&data, &length, altered_table->getFrmData())) ||
(error= writefrm(key, data, length)))
{
sql_print_information("NDB: Failed write frm for %s.%s, error %d",
dbname, tabname, error);
}
pthread_mutex_unlock(&LOCK_open);
close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0);
}
}
remote_drop_table= 1; remote_drop_table= 1;
} }
// If only frm was changed continue replicating
if (online_alter_table)
{
/* Signal ha_ndbcluster::alter_table that drop is done */
(void) pthread_cond_signal(&injector_cond);
DBUG_RETURN(0);
}
(void) pthread_mutex_lock(&share->mutex); (void) pthread_mutex_lock(&share->mutex);
DBUG_ASSERT(share->op == pOp || share->op_old == pOp); DBUG_ASSERT(share->op == pOp || share->op_old == pOp);
if (share->op_old == pOp) if (share->op_old == pOp)
...@@ -1385,7 +1452,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, ...@@ -1385,7 +1452,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
/* fall through */ /* fall through */
case SOT_ALTER_TABLE: case SOT_ALTER_TABLE:
/* fall through */ /* fall through */
if (!ndb_binlog_running) if (ndb_binlog_running)
{ {
log_query= 1; log_query= 1;
break; /* discovery will be handled by binlog */ break; /* discovery will be handled by binlog */
...@@ -1482,11 +1549,16 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, ...@@ -1482,11 +1549,16 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
// skip // skip
break; break;
case NDBEVENT::TE_ALTER: case NDBEVENT::TE_ALTER:
if (pOp->tableNameChanged())
{
DBUG_PRINT("info", ("Detected name change of table %s.%s",
share->db, share->table_name));
/* do the rename of the table in the share */ /* do the rename of the table in the share */
share->table->s->db.str= share->db; share->table->s->db.str= share->db;
share->table->s->db.length= strlen(share->db); share->table->s->db.length= strlen(share->db);
share->table->s->table_name.str= share->table_name; share->table->s->table_name.str= share->table_name;
share->table->s->table_name.length= strlen(share->table_name); share->table->s->table_name.length= strlen(share->table_name);
}
ndb_handle_schema_change(thd, ndb, pOp, share); ndb_handle_schema_change(thd, ndb, pOp, share);
break; break;
case NDBEVENT::TE_CLUSTER_FAILURE: case NDBEVENT::TE_CLUSTER_FAILURE:
...@@ -2357,6 +2429,10 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp, ...@@ -2357,6 +2429,10 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp,
share->key, share, pOp, share->op, share->op_old)); share->key, share, pOp, share->op, share->op_old));
break; break;
case NDBEVENT::TE_ALTER: case NDBEVENT::TE_ALTER:
if (pOp->tableNameChanged())
{
DBUG_PRINT("info", ("Detected name change of table %s.%s",
share->db, share->table_name));
/* ToDo: remove printout */ /* ToDo: remove printout */
if (ndb_extra_logging) if (ndb_extra_logging)
sql_print_information("NDB Binlog: rename table %s%s/%s -> %s.", sql_print_information("NDB Binlog: rename table %s%s/%s -> %s.",
...@@ -2368,6 +2444,7 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp, ...@@ -2368,6 +2444,7 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp,
share->table->s->db.length= strlen(share->db); share->table->s->db.length= strlen(share->db);
share->table->s->table_name.str= share->table_name; share->table->s->table_name.str= share->table_name;
share->table->s->table_name.length= strlen(share->table_name); share->table->s->table_name.length= strlen(share->table_name);
}
goto drop_alter_common; goto drop_alter_common;
case NDBEVENT::TE_DROP: case NDBEVENT::TE_DROP:
if (apply_status_share == share) if (apply_status_share == share)
......
...@@ -122,6 +122,8 @@ ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print, ...@@ -122,6 +122,8 @@ ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print,
prototypes for ndb handler utility function also needed by prototypes for ndb handler utility function also needed by
the ndb binlog code the ndb binlog code
*/ */
int cmp_frm(const NDBTAB *ndbtab, const void *pack_data,
uint pack_length);
int ndbcluster_find_all_files(THD *thd); int ndbcluster_find_all_files(THD *thd);
#endif /* HAVE_NDB_BINLOG */ #endif /* HAVE_NDB_BINLOG */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment