Commit 9e348383 authored by unknown's avatar unknown

Improvement of on-line discovery in injector thread

parent 0ea979e4
...@@ -519,6 +519,7 @@ void ha_ndbcluster::invalidate_dictionary_cache(bool global) ...@@ -519,6 +519,7 @@ void ha_ndbcluster::invalidate_dictionary_cache(bool global)
{ {
NDBINDEX *index = (NDBINDEX *) m_index[i].index; NDBINDEX *index = (NDBINDEX *) m_index[i].index;
NDBINDEX *unique_index = (NDBINDEX *) m_index[i].unique_index; NDBINDEX *unique_index = (NDBINDEX *) m_index[i].unique_index;
if (!index && !unique_index) continue;
NDB_INDEX_TYPE idx_type= m_index[i].type; NDB_INDEX_TYPE idx_type= m_index[i].type;
switch (idx_type) { switch (idx_type) {
...@@ -1076,7 +1077,7 @@ int ha_ndbcluster::get_metadata(const char *path) ...@@ -1076,7 +1077,7 @@ int ha_ndbcluster::get_metadata(const char *path)
m_table= (void *)tab; m_table= (void *)tab;
m_table_info= NULL; // Set in external lock m_table_info= NULL; // Set in external lock
DBUG_RETURN(open_indexes(ndb, table)); DBUG_RETURN(open_indexes(ndb, table, FALSE));
} }
static int fix_unique_index_attr_order(NDB_INDEX_DATA &data, static int fix_unique_index_attr_order(NDB_INDEX_DATA &data,
...@@ -1249,7 +1250,7 @@ int ha_ndbcluster::add_index_handle(THD *thd, NDBDICT *dict, KEY *key_info, ...@@ -1249,7 +1250,7 @@ int ha_ndbcluster::add_index_handle(THD *thd, NDBDICT *dict, KEY *key_info,
/* /*
Associate index handles for each index of a table Associate index handles for each index of a table
*/ */
int ha_ndbcluster::open_indexes(Ndb *ndb, TABLE *tab) int ha_ndbcluster::open_indexes(Ndb *ndb, TABLE *tab, bool ignore_error)
{ {
uint i; uint i;
int error= 0; int error= 0;
...@@ -1263,7 +1264,10 @@ int ha_ndbcluster::open_indexes(Ndb *ndb, TABLE *tab) ...@@ -1263,7 +1264,10 @@ int ha_ndbcluster::open_indexes(Ndb *ndb, TABLE *tab)
for (i= 0; i < tab->s->keys; i++, key_info++, key_name++) for (i= 0; i < tab->s->keys; i++, key_info++, key_name++)
{ {
if ((error= add_index_handle(thd, dict, key_info, *key_name, i))) if ((error= add_index_handle(thd, dict, key_info, *key_name, i)))
break; if (ignore_error)
m_index[i].index= m_index[i].unique_index= NULL;
else
break;
} }
DBUG_RETURN(error); DBUG_RETURN(error);
...@@ -3699,7 +3703,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ...@@ -3699,7 +3703,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
{ {
m_table= (void *)tab; m_table= (void *)tab;
m_table_version = tab->getObjectVersion(); m_table_version = tab->getObjectVersion();
if (!(my_errno= open_indexes(ndb, table))) if (!(my_errno= open_indexes(ndb, table, FALSE)))
DBUG_RETURN(my_errno); DBUG_RETURN(my_errno);
} }
m_table_info= tab_info; m_table_info= tab_info;
......
...@@ -691,6 +691,9 @@ static void set_tabname(const char *pathname, char *tabname); ...@@ -691,6 +691,9 @@ static void set_tabname(const char *pathname, char *tabname);
private: private:
friend int ndbcluster_drop_database_impl(const char *path); friend int ndbcluster_drop_database_impl(const char *path);
friend int ndb_handle_schema_change(THD *thd,
Ndb *ndb, NdbEventOperation *pOp,
NDB_SHARE *share);
int alter_table_name(const char *to); int alter_table_name(const char *to);
static int delete_table(ha_ndbcluster *h, Ndb *ndb, static int delete_table(ha_ndbcluster *h, Ndb *ndb,
const char *path, const char *path,
...@@ -708,7 +711,7 @@ static void set_tabname(const char *pathname, char *tabname); ...@@ -708,7 +711,7 @@ static void set_tabname(const char *pathname, char *tabname);
int create_indexes(Ndb *ndb, TABLE *tab); int create_indexes(Ndb *ndb, TABLE *tab);
void clear_index(int i); void clear_index(int i);
void clear_indexes(); void clear_indexes();
int open_indexes(Ndb *ndb, TABLE *tab); int open_indexes(Ndb *ndb, TABLE *tab, bool ignore_error);
void renumber_indexes(Ndb *ndb, TABLE *tab); void renumber_indexes(Ndb *ndb, TABLE *tab);
int drop_indexes(Ndb *ndb, TABLE *tab); int drop_indexes(Ndb *ndb, TABLE *tab);
int add_index_handle(THD *thd, NdbDictionary::Dictionary *dict, int add_index_handle(THD *thd, NdbDictionary::Dictionary *dict,
......
...@@ -233,6 +233,72 @@ static void run_query(THD *thd, char *buf, char *end, ...@@ -233,6 +233,72 @@ static void run_query(THD *thd, char *buf, char *end,
} }
} }
int
ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share,
TABLE_SHARE *table_share, TABLE *table)
{
int error;
MEM_ROOT *mem_root= &share->mem_root;
DBUG_ENTER("ndbcluster_binlog_open_table");
init_tmp_table_share(table_share, share->db, 0, share->table_name,
share->key);
if ((error= open_table_def(thd, table_share, 0)))
{
sql_print_error("Unable to get table share for %s, error=%d",
share->key, error);
DBUG_PRINT("error", ("open_table_def failed %d", error));
my_free((gptr) table_share, MYF(0));
table_share= 0;
my_free((gptr) table, MYF(0));
table= 0;
DBUG_RETURN(error);
}
if ((error= open_table_from_share(thd, table_share, "", 0,
(uint) READ_ALL, 0, table, FALSE)))
{
sql_print_error("Unable to open table for %s, error=%d(%d)",
share->key, error, my_errno);
DBUG_PRINT("error", ("open_table_from_share failed %d", error));
my_free((gptr) table_share, MYF(0));
table_share= 0;
my_free((gptr) table, MYF(0));
table= 0;
DBUG_RETURN(error);
}
assign_new_table_id(table);
if (!table->record[1] || table->record[1] == table->record[0])
{
table->record[1]= alloc_root(&table->mem_root,
table->s->rec_buff_length);
}
table->in_use= injector_thd;
table->s->db.str= share->db;
table->s->db.length= strlen(share->db);
table->s->table_name.str= share->table_name;
table->s->table_name.length= strlen(share->table_name);
share->table_share= table_share;
share->table= table;
#ifndef DBUG_OFF
dbug_print_table("table", table);
#endif
/*
! do not touch the contents of the table
it may be in use by the injector thread
*/
share->ndb_value[0]= (NdbValue*)
alloc_root(mem_root, sizeof(NdbValue) * table->s->fields
+ 1 /*extra for hidden key*/);
share->ndb_value[1]= (NdbValue*)
alloc_root(mem_root, sizeof(NdbValue) * table->s->fields
+1 /*extra for hidden key*/);
DBUG_RETURN(0);
}
/* /*
Initialize the binlog part of the NDB_SHARE Initialize the binlog part of the NDB_SHARE
*/ */
...@@ -260,64 +326,12 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table) ...@@ -260,64 +326,12 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
} }
while (1) while (1)
{ {
int error;
TABLE_SHARE *table_share= TABLE_SHARE *table_share=
(TABLE_SHARE *) my_malloc(sizeof(*table_share), MYF(MY_WME)); (TABLE_SHARE *) my_malloc(sizeof(*table_share), MYF(MY_WME));
TABLE *table= (TABLE*) my_malloc(sizeof(*table), MYF(MY_WME)); TABLE *table= (TABLE*) my_malloc(sizeof(*table), MYF(MY_WME));
int error; if ((error= ndbcluster_binlog_open_table(thd, share, table_share, table)))
init_tmp_table_share(table_share, share->db, 0, share->table_name,
share->key);
if ((error= open_table_def(thd, table_share, 0)))
{
sql_print_error("Unable to get table share for %s, error=%d",
share->key, error);
DBUG_PRINT("error", ("open_table_def failed %d", error));
my_free((gptr) table_share, MYF(0));
table_share= 0;
my_free((gptr) table, MYF(0));
table= 0;
break; break;
}
if ((error= open_table_from_share(thd, table_share, "", 0,
(uint) READ_ALL, 0, table, FALSE)))
{
sql_print_error("Unable to open table for %s, error=%d(%d)",
share->key, error, my_errno);
DBUG_PRINT("error", ("open_table_from_share failed %d", error));
my_free((gptr) table_share, MYF(0));
table_share= 0;
my_free((gptr) table, MYF(0));
table= 0;
break;
}
assign_new_table_id(table);
if (!table->record[1] || table->record[1] == table->record[0])
{
table->record[1]= alloc_root(&table->mem_root,
table->s->rec_buff_length);
}
table->in_use= injector_thd;
table->s->db.str= share->db;
table->s->db.length= strlen(share->db);
table->s->table_name.str= share->table_name;
table->s->table_name.length= strlen(share->table_name);
share->table_share= table_share;
share->table= table;
#ifndef DBUG_OFF
dbug_print_table("table", table);
#endif
/*
! do not touch the contents of the table
it may be in use by the injector thread
*/
share->ndb_value[0]= (NdbValue*)
alloc_root(mem_root, sizeof(NdbValue) * table->s->fields
+ 1 /*extra for hidden key*/);
share->ndb_value[1]= (NdbValue*)
alloc_root(mem_root, sizeof(NdbValue) * table->s->fields
+1 /*extra for hidden key*/);
{ {
int i, no_nodes= g_ndb_cluster_connection->no_db_nodes(); int i, no_nodes= g_ndb_cluster_connection->no_db_nodes();
share->subscriber_bitmap= (MY_BITMAP*) share->subscriber_bitmap= (MY_BITMAP*)
...@@ -651,10 +665,10 @@ static int ndbcluster_create_apply_status_table(THD *thd) ...@@ -651,10 +665,10 @@ static int ndbcluster_create_apply_status_table(THD *thd)
if so, remove it since there is none in Ndb if so, remove it since there is none in Ndb
*/ */
{ {
strxnmov(buf, sizeof(buf), build_table_filename(buf, sizeof(buf),
mysql_data_home, NDB_REP_DB,
"/" NDB_REP_DB "/" NDB_APPLY_TABLE, NDB_APPLY_TABLE,
reg_ext, NullS); reg_ext);
unpack_filename(buf,buf); unpack_filename(buf,buf);
my_delete(buf, MYF(0)); my_delete(buf, MYF(0));
} }
...@@ -703,10 +717,10 @@ static int ndbcluster_create_schema_table(THD *thd) ...@@ -703,10 +717,10 @@ static int ndbcluster_create_schema_table(THD *thd)
if so, remove it since there is none in Ndb if so, remove it since there is none in Ndb
*/ */
{ {
strxnmov(buf, sizeof(buf), build_table_filename(buf, sizeof(buf),
mysql_data_home, NDB_REP_DB,
"/" NDB_REP_DB "/" NDB_SCHEMA_TABLE, NDB_SCHEMA_TABLE,
reg_ext, NullS); reg_ext);
unpack_filename(buf,buf); unpack_filename(buf,buf);
my_delete(buf, MYF(0)); my_delete(buf, MYF(0));
} }
...@@ -1287,7 +1301,8 @@ ndbcluster_update_slock(THD *thd, ...@@ -1287,7 +1301,8 @@ ndbcluster_update_slock(THD *thd,
/* /*
Handle _non_ data events from the storage nodes Handle _non_ data events from the storage nodes
*/ */
static int //static int
int
ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
NDB_SHARE *share) NDB_SHARE *share)
{ {
...@@ -1299,50 +1314,37 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, ...@@ -1299,50 +1314,37 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
pOp->tableFrmChanged()); pOp->tableFrmChanged());
if (pOp->getEventType() != NDBEVENT::TE_CLUSTER_FAILURE && if (pOp->getEventType() != NDBEVENT::TE_CLUSTER_FAILURE &&
pOp->getReqNodeId() != g_ndb_cluster_connection->node_id()) (uint) pOp->getReqNodeId() != g_ndb_cluster_connection->node_id())
{ {
NDBDICT *dict= ndb->getDictionary(); TABLE_SHARE *table_share= share->table->s; //share->table_share;
NdbDictionary::Dictionary::List index_list; TABLE* table= share->table;
ndb->setDatabaseName(dbname); /*
// Invalidating indexes Invalidate table and all it's indexes
if (! dict->listIndexes(index_list, tabname)) */
{ ha_ndbcluster table_handler(table_share);
for (unsigned i = 0; i < index_list.count; i++) { table_handler.set_dbname(share->key);
NdbDictionary::Dictionary::List::Element& index= table_handler.set_tabname(share->key);
index_list.elements[i]; table_handler.open_indexes(ndb, table, TRUE);
DBUG_PRINT("info", ("Invalidating index %s.%s", table_handler.invalidate_dictionary_cache(TRUE);
index.database, index.name));
dict->invalidateIndex(index.name, tabname);
}
}
// Invalidate table
ha_ndbcluster::invalidate_dictionary_cache(share->table->s,
ndb,
dbname,
tabname,
TRUE);
if (online_alter_table) if (online_alter_table)
{ {
char key[FN_REFLEN]; char key[FN_REFLEN];
const void *data= 0, *pack_data= 0; const void *data= 0, *pack_data= 0;
uint length, pack_length; uint length, pack_length;
int error; int error;
NDBDICT *dict= ndb->getDictionary();
const NDBTAB *altered_table= pOp->getTable();
DBUG_PRINT("info", ("Detected frm change of table %s.%s", DBUG_PRINT("info", ("Detected frm change of table %s.%s",
dbname, tabname)); dbname, tabname));
const NDBTAB *altered_table= pOp->getEvent()->getTable(); build_table_filename(key, FN_LEN-1, dbname, tabname, NullS);
bool remote_event=
pOp->getReqNodeId() != g_ndb_cluster_connection->node_id();
strxnmov(key, FN_LEN-1, mysql_data_home, "/",
dbname, "/", tabname, NullS);
/* /*
If the frm of the altered table is different than the one on If the frm of the altered table is different than the one on
disk then overwrite it with the new table definition disk then overwrite it with the new table definition
*/ */
if (remote_event && if (readfrm(key, &data, &length) == 0 &&
readfrm(key, &data, &length) == 0 &&
packfrm(data, length, &pack_data, &pack_length) == 0 && packfrm(data, length, &pack_data, &pack_length) == 0 &&
cmp_frm(altered_table, pack_data, pack_length)) cmp_frm(altered_table, pack_data, pack_length))
{ {
...@@ -1359,6 +1361,12 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, ...@@ -1359,6 +1361,12 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
} }
pthread_mutex_unlock(&LOCK_open); pthread_mutex_unlock(&LOCK_open);
close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0); close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0);
/*
if ((error= ndbcluster_binlog_open_table(thd, share,
table_share, table)))
sql_print_information("NDB: Failed to re-open table %s.%s",
dbname, tabname);
*/
} }
} }
remote_drop_table= 1; remote_drop_table= 1;
...@@ -1838,6 +1846,7 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, ...@@ -1838,6 +1846,7 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
/* Handle any trailing share */ /* Handle any trailing share */
NDB_SHARE *share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables, NDB_SHARE *share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables,
(byte*) key, key_len); (byte*) key, key_len);
if (share && share_may_exist) if (share && share_may_exist)
{ {
if (share->flags & NSF_NO_BINLOG || if (share->flags & NSF_NO_BINLOG ||
......
...@@ -66,6 +66,7 @@ ...@@ -66,6 +66,7 @@
#define MAX_FRAGMENT_DATA_BYTES (4+(2 * 8 * MAX_REPLICAS * MAX_NDB_NODES)) #define MAX_FRAGMENT_DATA_BYTES (4+(2 * 8 * MAX_REPLICAS * MAX_NDB_NODES))
#define MAX_NDB_PARTITIONS 1024 #define MAX_NDB_PARTITIONS 1024
#define MAX_RANGE_DATA (131072+MAX_NDB_PARTITIONS) //0.5 MByte of list data #define MAX_RANGE_DATA (131072+MAX_NDB_PARTITIONS) //0.5 MByte of list data
#define MAX_WORDS_META_FILE 16382
#define MIN_ATTRBUF ((MAX_ATTRIBUTES_IN_TABLE/24) + 1) #define MIN_ATTRBUF ((MAX_ATTRIBUTES_IN_TABLE/24) + 1)
/* /*
......
...@@ -39,6 +39,7 @@ class AlterTableReq { ...@@ -39,6 +39,7 @@ class AlterTableReq {
friend class NdbEventOperationImpl; friend class NdbEventOperationImpl;
friend class NdbDictInterface; friend class NdbDictInterface;
friend class Dbdict; friend class Dbdict;
friend class Suma;
/** /**
* For printing * For printing
......
...@@ -530,7 +530,7 @@ public: ...@@ -530,7 +530,7 @@ public:
Config c_defaults; Config c_defaults;
Uint32 m_diskless; Uint32 m_diskless;
STATIC_CONST(NO_OF_PAGES_META_FILE = 2); STATIC_CONST(NO_OF_PAGES_META_FILE = MAX_WORDS_META_FILE/BACKUP_WORDS_PER_PAGE);
/** /**
* Pools * Pools
......
...@@ -42,6 +42,7 @@ ...@@ -42,6 +42,7 @@
#include <signaldata/GCPSave.hpp> #include <signaldata/GCPSave.hpp>
#include <signaldata/CreateTab.hpp> #include <signaldata/CreateTab.hpp>
#include <signaldata/DropTab.hpp> #include <signaldata/DropTab.hpp>
#include <signaldata/AlterTable.hpp>
#include <signaldata/AlterTab.hpp> #include <signaldata/AlterTab.hpp>
#include <signaldata/DihFragCount.hpp> #include <signaldata/DihFragCount.hpp>
#include <signaldata/SystemError.hpp> #include <signaldata/SystemError.hpp>
...@@ -3440,7 +3441,7 @@ Suma::execDROP_TAB_CONF(Signal *signal) ...@@ -3440,7 +3441,7 @@ Suma::execDROP_TAB_CONF(Signal *signal)
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
static Uint32 b_dti_buf[10000]; static Uint32 b_dti_buf[MAX_WORDS_META_FILE];
void void
Suma::execALTER_TAB_REQ(Signal *signal) Suma::execALTER_TAB_REQ(Signal *signal)
...@@ -3462,7 +3463,7 @@ Suma::execALTER_TAB_REQ(Signal *signal) ...@@ -3462,7 +3463,7 @@ Suma::execALTER_TAB_REQ(Signal *signal)
} }
DBUG_PRINT("info",("alter table id: %d[i=%u]", tableId, tabPtr.i)); DBUG_PRINT("info",("alter table id: %d[i=%u]", tableId, tabPtr.i));
Table::State old_state = tabPtr.p->m_state;
tabPtr.p->m_state = Table::ALTERED; tabPtr.p->m_state = Table::ALTERED;
// triggers must be removed, waiting for sub stop req for that // triggers must be removed, waiting for sub stop req for that
...@@ -3520,6 +3521,11 @@ Suma::execALTER_TAB_REQ(Signal *signal) ...@@ -3520,6 +3521,11 @@ Suma::execALTER_TAB_REQ(Signal *signal)
DBUG_PRINT("info",("sent to subscriber %d", subbPtr.i)); DBUG_PRINT("info",("sent to subscriber %d", subbPtr.i));
} }
} }
if (AlterTableReq::getFrmFlag(changeMask))
{
// Frm changes only are handled on-line
tabPtr.p->m_state = old_state;
}
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
......
...@@ -1320,6 +1320,8 @@ NdbDictionaryImpl::putTable(NdbTableImpl *impl) ...@@ -1320,6 +1320,8 @@ NdbDictionaryImpl::putTable(NdbTableImpl *impl)
m_localHash.put(impl->m_internalName.c_str(), info); m_localHash.put(impl->m_internalName.c_str(), info);
addBlobTables(*impl);
m_ndb.theFirstTupleId[impl->getTableId()] = ~0; m_ndb.theFirstTupleId[impl->getTableId()] = ~0;
m_ndb.theLastTupleId[impl->getTableId()] = ~0; m_ndb.theLastTupleId[impl->getTableId()] = ~0;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment