Ndb handler cleanup:

  - removed some returns on ndb internal error codes, return ndb cause in warnings
  - moved all errorcode mapping mysqld-ndb to ndberror.c
  - ndb util thread to discover all ndb tables at startup
  - ndb util thread to wait for mysqld startup
parent 4caff6eb
......@@ -6,7 +6,7 @@ a b
4 5
show status like 'handler_discover%';
Variable_name Value
Handler_discover 1
Handler_discover 0
drop table t9;
select * from t10;
ERROR HY000: Got error 4263 'Invalid blob attributes or invalid blob parts table' from NDBCLUSTER
......
......@@ -201,10 +201,18 @@ create table t1 (
pk1 bit(9) not null primary key,
b int
) engine=ndbcluster;
ERROR HY000: Can't create table './test/t1.frm' (errno: 739)
ERROR HY000: Can't create table './test/t1.frm' (errno: 140)
show warnings;
Level Code Message
Error 1296 Got error 739 'Unsupported primary key length' from NDB
Error 1005 Can't create table './test/t1.frm' (errno: 140)
create table t1 (
pk1 int not null primary key,
b bit(9),
key(b)
) engine=ndbcluster;
ERROR HY000: Can't create table './test/t1.frm' (errno: 743)
ERROR HY000: Can't create table './test/t1.frm' (errno: 140)
show warnings;
Level Code Message
Error 1296 Got error 743 'Unsupported character set in table or index' from NDB
Error 1005 Can't create table './test/t1.frm' (errno: 140)
......@@ -29,7 +29,12 @@ drop table t1;
create table t1 (a int) engine=ndbcluster;
insert into t1 value (2);
select * from t1;
ERROR HY000: Got error 241 'Invalid schema object version' from NDBCLUSTER
ERROR HY000: Table definition has changed, please retry transaction
show warnings;
Level Code Message
Error 1296 Got error 241 'Invalid schema object version' from NDB
Error 1412 Table definition has changed, please retry transaction
Error 1105 Unknown error
select * from t1;
a
2
......
......@@ -11,7 +11,11 @@ partitions 3
(partition x1 values less than (5) nodegroup 12,
partition x2 values less than (10) nodegroup 13,
partition x3 values less than (20) nodegroup 14);
ERROR HY000: Can't create table './test/t1.frm' (errno: 771)
ERROR HY000: Can't create table './test/t1.frm' (errno: 140)
show warnings;
Level Code Message
Error 1296 Got error 771 'Given NODEGROUP doesn't exist in this cluster' from NDB
Error 1005 Can't create table './test/t1.frm' (errno: 140)
CREATE TABLE t1 (
a int not null,
b int not null,
......
DROP TABLE IF EXISTS t1, r1;
DROP TABLE IF EXISTS t1, t2, r1;
create table t1 (
a int primary key,
b int not null,
......
......@@ -104,6 +104,7 @@ create table t1 (
pk1 bit(9) not null primary key,
b int
) engine=ndbcluster;
show warnings;
--error 1005
create table t1 (
......@@ -111,4 +112,4 @@ create table t1 (
b bit(9),
key(b)
) engine=ndbcluster;
show warnings;
......@@ -38,8 +38,9 @@ create table t1 (a int) engine=ndbcluster;
insert into t1 value (2);
connection server1;
# Currently a retry is required remotely
--error 1296
--error 1412
select * from t1;
show warnings;
select * from t1;
# Connect to server2 and use the tables from there
......
......@@ -26,6 +26,7 @@ partitions 3
(partition x1 values less than (5) nodegroup 12,
partition x2 values less than (10) nodegroup 13,
partition x3 values less than (20) nodegroup 14);
show warnings;
#
# Partition by range, create normal valid table
......
......@@ -2,7 +2,7 @@
-- source include/not_embedded.inc
--disable_warnings
DROP TABLE IF EXISTS t1, r1;
DROP TABLE IF EXISTS t1, t2, r1;
--enable_warnings
#
......
......@@ -46,6 +46,7 @@ static const int parallelism= 0;
static const int max_transactions= 3; // should really be 2 but there is a transaction to much allocated when loch table is used
static const char *ha_ndb_ext=".ndb";
static const char share_prefix[]= "./";
static int ndbcluster_close_connection(THD *thd);
static int ndbcluster_commit(THD *thd, bool all);
......@@ -99,12 +100,15 @@ handlerton ndbcluster_hton = {
}
// Typedefs for long names
typedef NdbDictionary::Object NDBOBJ;
typedef NdbDictionary::Column NDBCOL;
typedef NdbDictionary::Table NDBTAB;
typedef NdbDictionary::Index NDBINDEX;
typedef NdbDictionary::Dictionary NDBDICT;
typedef NdbDictionary::Event NDBEVENT;
bool ndbcluster_inited= FALSE;
static int ndbcluster_inited= 0;
static int ndbcluster_util_inited= 0;
static Ndb* g_ndb= NULL;
static Ndb_cluster_connection* g_ndb_cluster_connection= NULL;
......@@ -117,9 +121,12 @@ static HASH ndbcluster_open_tables;
static byte *ndbcluster_get_key(NDB_SHARE *share,uint *length,
my_bool not_used __attribute__((unused)));
static void ndb_set_fragmentation(NDBTAB & tab, TABLE *table, uint pk_len);
static NDB_SHARE *get_share(const char *table_name);
static void free_share(NDB_SHARE *share);
static NDB_SHARE *get_share(const char *key,
bool create_if_not_exists= TRUE,
bool have_lock= FALSE);
static void free_share(NDB_SHARE **share, bool have_lock= FALSE);
static void real_free_share(NDB_SHARE **share);
static void ndb_set_fragmentation(NDBTAB &tab, TABLE *table, uint pk_len);
static int packfrm(const void *data, uint len, const void **pack_data, uint *pack_len);
static int unpackfrm(const void **data, uint *len,
......@@ -128,6 +135,33 @@ static int unpackfrm(const void **data, uint *len,
static int ndb_get_table_statistics(Ndb*, const char *,
struct Ndb_statistics *);
#ifndef DBUG_OFF
void print_records(TABLE *table, const char *record)
{
if (_db_on_)
{
for (uint j= 0; j < table->s->fields; j++)
{
char buf[40];
int pos= 0;
Field *field= table->field[j];
const byte* field_ptr= field->ptr - table->record[0] + record;
int pack_len= field->pack_length();
int n= pack_len < 10 ? pack_len : 10;
for (int i= 0; i < n && pos < 20; i++)
{
pos+= sprintf(&buf[pos]," %x", (int) (unsigned char) field_ptr[i]);
}
buf[pos]= 0;
DBUG_PRINT("info",("[%u]field_ptr[0->%d]: %s", j, n, buf));
}
}
}
#else
#define print_records(a,b)
#endif
// Util thread variables
static pthread_t ndb_util_thread;
pthread_mutex_t LOCK_ndb_util_thread;
......@@ -179,65 +213,70 @@ struct show_var_st ndb_status_variables[]= {
{NullS, NullS, SHOW_LONG}
};
/* instantiated in storage/ndb/src/ndbapi/Ndbif.cpp */
extern Uint64 g_latest_trans_gci;
/*
Error handling functions
*/
struct err_code_mapping
{
int ndb_err;
int my_err;
int show_warning;
};
/* Note for merge: old mapping table, moved to storage/ndb/ndberror.c */
static const err_code_mapping err_map[]=
static int ndb_to_mysql_error(const NdbError *ndberr)
{
{ 626, HA_ERR_KEY_NOT_FOUND, 0 },
{ 630, HA_ERR_FOUND_DUPP_KEY, 0 },
{ 893, HA_ERR_FOUND_DUPP_KEY, 0 },
{ 721, HA_ERR_TABLE_EXIST, 1 },
{ 4244, HA_ERR_TABLE_EXIST, 1 },
{ 709, HA_ERR_NO_SUCH_TABLE, 0 },
/* read the mysql mapped error code */
int error= ndberr->mysql_code;
{ 266, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
{ 274, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
{ 296, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
{ 297, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
{ 237, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
{ 623, HA_ERR_RECORD_FILE_FULL, 1 },
{ 624, HA_ERR_RECORD_FILE_FULL, 1 },
{ 625, HA_ERR_RECORD_FILE_FULL, 1 },
{ 826, HA_ERR_RECORD_FILE_FULL, 1 },
{ 827, HA_ERR_RECORD_FILE_FULL, 1 },
{ 832, HA_ERR_RECORD_FILE_FULL, 1 },
{ 284, HA_ERR_TABLE_DEF_CHANGED, 0 },
{ 0, 1, 0 },
{ -1, -1, 1 }
};
switch (error)
{
/* errors for which we do not add warnings, just return mapped error code
*/
case HA_ERR_NO_SUCH_TABLE:
case HA_ERR_KEY_NOT_FOUND:
case HA_ERR_FOUND_DUPP_KEY:
return error;
/* Mapping missing, go with the ndb error code*/
case -1:
error= ndberr->code;
break;
/* Mapping exists, go with the mapped code */
default:
break;
}
static int ndb_to_mysql_error(const NdbError *err)
{
uint i;
for (i=0; err_map[i].ndb_err != err->code && err_map[i].my_err != -1; i++);
if (err_map[i].show_warning)
{
// Push the NDB error message as warning
/*
Push the NDB error message as warning
- Used to be able to use SHOW WARNINGS toget more info on what the error is
- Used by replication to see if the error was temporary
*/
if (ndberr->status == NdbError::TemporaryError)
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
err->code, err->message, "NDB");
}
if (err_map[i].my_err == -1)
return err->code;
return err_map[i].my_err;
ER_GET_TEMPORARY_ERRMSG, ER(ER_GET_TEMPORARY_ERRMSG),
ndberr->code, ndberr->message, "NDB");
else
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
ndberr->code, ndberr->message, "NDB");
return error;
}
int execute_no_commit_ignore_no_key(ha_ndbcluster *h, NdbTransaction *trans)
{
int res= trans->execute(NdbTransaction::NoCommit,
NdbTransaction::AO_IgnoreError,
h->m_force_send);
if (res == 0)
return 0;
const NdbError &err= trans->getNdbError();
if (err.classification != NdbError::ConstraintViolation &&
err.classification != NdbError::NoDataFound)
return res;
return 0;
}
inline
int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans)
......@@ -247,9 +286,11 @@ int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans)
if (m_batch_execute)
return 0;
#endif
return trans->execute(NdbTransaction::NoCommit,
NdbTransaction::AbortOnError,
h->m_force_send);
return h->m_ignore_no_key ?
execute_no_commit_ignore_no_key(h,trans) :
trans->execute(NdbTransaction::NoCommit,
NdbTransaction::AbortOnError,
h->m_force_send);
}
inline
......@@ -437,29 +478,38 @@ void ha_ndbcluster::no_uncommitted_rows_reset(THD *thd)
# The mapped error code
*/
void ha_ndbcluster::invalidate_dictionary_cache(bool global)
void
ha_ndbcluster::invalidate_dictionary_cache(TABLE *table, Ndb *ndb,
const char *tabname, bool global)
{
NDBDICT *dict= get_ndb()->getDictionary();
NDBDICT *dict= ndb->getDictionary();
DBUG_ENTER("invalidate_dictionary_cache");
DBUG_PRINT("info", ("invalidating %s", m_tabname));
DBUG_PRINT("info", ("invalidating %s", tabname));
if (global)
{
const NDBTAB *tab= dict->getTable(m_tabname);
const NDBTAB *tab= dict->getTable(tabname);
if (!tab)
DBUG_VOID_RETURN;
if (tab->getObjectStatus() == NdbDictionary::Object::Invalid)
{
// Global cache has already been invalidated
dict->removeCachedTable(m_tabname);
dict->removeCachedTable(tabname);
global= FALSE;
}
else
dict->invalidateTable(m_tabname);
dict->invalidateTable(tabname);
}
else
dict->removeCachedTable(m_tabname);
dict->removeCachedTable(tabname);
table->s->version=0L; /* Free when thread is ready */
DBUG_VOID_RETURN;
}
void ha_ndbcluster::invalidate_dictionary_cache(bool global)
{
NDBDICT *dict= get_ndb()->getDictionary();
invalidate_dictionary_cache(table, get_ndb(), m_tabname, global);
/* Invalidate indexes */
for (uint i= 0; i < table->s->keys; i++)
{
......@@ -491,7 +541,6 @@ void ha_ndbcluster::invalidate_dictionary_cache(bool global)
break;
}
}
DBUG_VOID_RETURN;
}
int ha_ndbcluster::ndb_err(NdbTransaction *trans)
......@@ -648,14 +697,15 @@ int ha_ndbcluster::set_ndb_key(NdbOperation *ndb_op, Field *field,
*/
int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field,
uint fieldnr, bool *set_blob_value)
uint fieldnr, int row_offset,
bool *set_blob_value)
{
const byte* field_ptr= field->ptr;
uint32 pack_len= field->pack_length();
const byte* field_ptr= field->ptr + row_offset;
uint32 pack_len= field->pack_length();
DBUG_ENTER("set_ndb_value");
DBUG_PRINT("enter", ("%d: %s, type: %u, len=%d, is_null=%s",
DBUG_PRINT("enter", ("%d: %s type: %u len=%d is_null=%s",
fieldnr, field->field_name, field->type(),
pack_len, field->is_null()?"Y":"N"));
pack_len, field->is_null(row_offset) ? "Y" : "N"));
DBUG_DUMP("value", (char*) field_ptr, pack_len);
DBUG_ASSERT(ndb_supported_type(field->type()));
......@@ -666,7 +716,7 @@ int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field,
{
pack_len= sizeof(empty_field);
field_ptr= (byte *)&empty_field;
if (field->is_null())
if (field->is_null(row_offset))
empty_field= 0;
else
empty_field= 1;
......@@ -675,12 +725,15 @@ int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field,
{
if (field->type() != MYSQL_TYPE_BIT)
{
if (field->is_null())
if (field->is_null(row_offset))
{
DBUG_PRINT("info", ("field is NULL"));
// Set value to NULL
DBUG_RETURN((ndb_op->setValue(fieldnr,
(char*)NULL, pack_len) != 0));
}
// Common implementation for most field types
DBUG_RETURN(ndb_op->setValue(fieldnr,
DBUG_RETURN(ndb_op->setValue(fieldnr,
(char*)field_ptr, pack_len) != 0);
}
else // if (field->type() == MYSQL_TYPE_BIT)
......@@ -690,7 +743,7 @@ int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field,
// Round up bit field length to nearest word boundry
pack_len= ((pack_len + 3) >> 2) << 2;
DBUG_ASSERT(pack_len <= 8);
if (field->is_null())
if (field->is_null(row_offset))
// Set value to NULL
DBUG_RETURN((ndb_op->setValue(fieldnr, (char*)NULL, pack_len) != 0));
DBUG_PRINT("info", ("bit field"));
......@@ -709,7 +762,7 @@ int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field,
NdbBlob *ndb_blob= ndb_op->getBlobHandle(fieldnr);
if (ndb_blob != NULL)
{
if (field->is_null())
if (field->is_null(row_offset))
DBUG_RETURN(ndb_blob->setNull() != 0);
Field_blob *field_blob= (Field_blob*)field;
......@@ -790,8 +843,8 @@ int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob)
{
char *buf= m_blobs_buffer + offset;
uint32 len= 0xffffffff; // Max uint32
DBUG_PRINT("value", ("read blob ptr=%x len=%u",
(UintPtr)buf, (uint)blob_len));
DBUG_PRINT("value", ("read blob ptr=%lx len=%u",
buf, (uint) blob_len));
if (ndb_blob->readData(buf, len) != 0)
DBUG_RETURN(-1);
DBUG_ASSERT(len == blob_len);
......@@ -902,6 +955,19 @@ bool ha_ndbcluster::uses_blob_value()
of table accessed in NDB
*/
static int cmp_frm(const NDBTAB *ndbtab, const void *pack_data,
uint pack_length)
{
DBUG_ENTER("cmp_frm");
/*
Compare FrmData in NDB with frm file from disk.
*/
if ((pack_length != ndbtab->getFrmLength()) ||
(memcmp(pack_data, ndbtab->getFrmData(), pack_length)))
DBUG_RETURN(1);
DBUG_RETURN(0);
}
int ha_ndbcluster::get_metadata(const char *path)
{
Ndb *ndb= get_ndb();
......@@ -939,8 +1005,7 @@ int ha_ndbcluster::get_metadata(const char *path)
DBUG_RETURN(1);
}
if ((pack_length != tab->getFrmLength()) ||
(memcmp(pack_data, tab->getFrmData(), pack_length)))
if (cmp_frm(tab, pack_data, pack_length))
{
if (!invalidating_ndb_table)
{
......@@ -951,9 +1016,9 @@ int ha_ndbcluster::get_metadata(const char *path)
else
{
DBUG_PRINT("error",
("metadata, pack_length: %d getFrmLength: %d memcmp: %d",
("metadata, pack_length: %d getFrmLength: %d memcmp: %d",
pack_length, tab->getFrmLength(),
memcmp(pack_data, tab->getFrmData(), pack_length)));
memcmp(pack_data, tab->getFrmData(), pack_length)));
DBUG_DUMP("pack_data", (char*)pack_data, pack_length);
DBUG_DUMP("frm", (char*)tab->getFrmData(), tab->getFrmLength());
error= 3;
......@@ -1980,7 +2045,7 @@ int ha_ndbcluster::write_row(byte *record)
DBUG_ENTER("write_row");
if (m_ignore_dup_key && table->s->primary_key != MAX_KEY)
if (!m_use_write && m_ignore_dup_key && table->s->primary_key != MAX_KEY)
{
int peek_res= peek_row(record);
......@@ -2057,7 +2122,8 @@ int ha_ndbcluster::write_row(byte *record)
{
Field *field= table->field[i];
if (!(field->flags & PRI_KEY_FLAG) &&
set_ndb_value(op, field, i, &set_blob_value))
(ha_get_bit_in_write_set(i + 1) || !m_use_write) &&
set_ndb_value(op, field, i, record-table->record[0], &set_blob_value))
{
m_skip_auto_increment= TRUE;
ERR_RETURN(op->getNdbError());
......@@ -2299,7 +2365,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
Field *field= table->field[i];
if (ha_get_bit_in_write_set(i+1) &&
(!(field->flags & PRI_KEY_FLAG)) &&
set_ndb_value(op, field, i))
set_ndb_value(op, field, i, new_data - table->record[0]))
ERR_RETURN(op->getNdbError());
}
......@@ -2414,51 +2480,73 @@ int ha_ndbcluster::delete_row(const byte *record)
set to null.
*/
void ha_ndbcluster::unpack_record(byte* buf)
static void ndb_unpack_record(TABLE *table, NdbValue *value,
MY_BITMAP *defined, byte *buf)
{
Field **p_field= table->field, *field= *p_field;
uint row_offset= (uint) (buf - table->record[0]);
Field **field, **end;
NdbValue *value= m_value;
DBUG_ENTER("unpack_record");
DBUG_ENTER("ndb_unpack_record");
end= table->field + table->s->fields;
// Set null flag(s)
bzero(buf, table->s->null_bytes);
for (field= table->field;
field < end;
field++, value++)
for ( ; field;
p_field++, value++, field= *p_field)
{
if ((*value).ptr)
{
if (! ((*field)->flags & BLOB_FLAG))
if (!(field->flags & BLOB_FLAG))
{
if ((*value).rec->isNULL())
(*field)->set_null(row_offset);
else if ((*field)->type() == MYSQL_TYPE_BIT)
int is_null= (*value).rec->isNULL();
if (is_null)
{
uint pack_len= (*field)->pack_length();
if (pack_len < 5)
if (is_null > 0)
{
DBUG_PRINT("info",("[%u] NULL",
(*value).rec->getColumn()->getColumnNo()));
field->set_null(row_offset);
}
else
{
DBUG_PRINT("info",("[%u] UNDEFINED",
(*value).rec->getColumn()->getColumnNo()));
bitmap_clear_bit(defined,
(*value).rec->getColumn()->getColumnNo());
}
}
else if (field->type() == MYSQL_TYPE_BIT)
{
byte *save_field_ptr= field->ptr;
field->ptr= save_field_ptr + row_offset;
if (field->pack_length() < 5)
{
DBUG_PRINT("info", ("bit field H'%.8X",
(*value).rec->u_32_value()));
((Field_bit *) *field)->store((longlong)
(*value).rec->u_32_value(),
FALSE);
((Field_bit*) field)->store((longlong)
(*value).rec->u_32_value(), FALSE);
}
else
{
DBUG_PRINT("info", ("bit field H'%.8X%.8X",
*(Uint32 *)(*value).rec->aRef(),
*((Uint32 *)(*value).rec->aRef()+1)));
((Field_bit *) *field)->store((longlong)
(*value).rec->u_64_value(), TRUE);
*(Uint32*) (*value).rec->aRef(),
*((Uint32*) (*value).rec->aRef()+1)));
((Field_bit*) field)->store((longlong)
(*value).rec->u_64_value(),TRUE);
}
field->ptr= save_field_ptr;
DBUG_PRINT("info",("[%u] SET",
(*value).rec->getColumn()->getColumnNo()));
DBUG_DUMP("info", (const char*) field->ptr, field->field_length);
}
else
{
DBUG_PRINT("info",("[%u] SET",
(*value).rec->getColumn()->getColumnNo()));
DBUG_DUMP("info", (const char*) field->ptr, field->field_length);
}
}
else
{
NdbBlob* ndb_blob= (*value).blob;
NdbBlob *ndb_blob= (*value).blob;
bool isNull= TRUE;
#ifndef DBUG_OFF
int ret=
......@@ -2466,11 +2554,16 @@ void ha_ndbcluster::unpack_record(byte* buf)
ndb_blob->getNull(isNull);
DBUG_ASSERT(ret == 0);
if (isNull)
(*field)->set_null(row_offset);
field->set_null(row_offset);
}
}
}
DBUG_VOID_RETURN;
}
void ha_ndbcluster::unpack_record(byte *buf)
{
ndb_unpack_record(table, m_value, 0, buf);
#ifndef DBUG_OFF
// Read and print all values that was fetched
if (table->s->primary_key == MAX_KEY)
......@@ -2486,7 +2579,6 @@ void ha_ndbcluster::unpack_record(byte* buf)
}
//print_results();
#endif
DBUG_VOID_RETURN;
}
/*
......@@ -3207,8 +3299,9 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
Thd_ndb *thd_ndb= get_thd_ndb(thd);
Ndb *ndb= thd_ndb->ndb;
DBUG_PRINT("enter", ("thd: %x, thd_ndb: %x, thd_ndb->lock_count: %d",
thd, thd_ndb, thd_ndb->lock_count));
DBUG_PRINT("enter", ("this: %x thd: %lx thd_ndb: %lx "
"thd_ndb->lock_count: %d",
this, thd, thd_ndb, thd_ndb->lock_count));
if (lock_type != F_UNLCK)
{
......@@ -3461,7 +3554,8 @@ int ndbcluster_commit(THD *thd, bool all)
while ((share= it++))
{
pthread_mutex_lock(&share->mutex);
DBUG_PRINT("info", ("Invalidate commit_count for %s, share->commit_count: %d ", share->table_name, share->commit_count));
DBUG_PRINT("info", ("Invalidate commit_count for %s, share->commit_count: %d ",
share->key, share->commit_count));
share->commit_count= 0;
share->commit_count_lock++;
pthread_mutex_unlock(&share->mutex);
......@@ -3790,6 +3884,10 @@ static int create_ndb_column(NDBCOL &col,
return 0;
}
/*
Create a table in NDB Cluster
*/
int ha_ndbcluster::create(const char *name,
TABLE *form,
HA_CREATE_INFO *info)
......@@ -3815,7 +3913,8 @@ int ha_ndbcluster::create(const char *name,
caller.
Do Ndb specific stuff, such as create a .ndb file
*/
my_errno= write_ndb_file();
if ((my_errno= write_ndb_file()))
DBUG_RETURN(my_errno);
DBUG_RETURN(my_errno);
}
......@@ -3829,7 +3928,7 @@ int ha_ndbcluster::create(const char *name,
if (packfrm(data, length, &pack_data, &pack_length))
DBUG_RETURN(2);
DBUG_PRINT("info", ("setFrm data=%x, len=%d", pack_data, pack_length));
DBUG_PRINT("info", ("setFrm data=%lx len=%d", pack_data, pack_length));
tab.setFrm(pack_data, pack_length);
my_free((char*)data, MYF(0));
my_free((char*)pack_data, MYF(0));
......@@ -4027,14 +4126,22 @@ int ha_ndbcluster::rename_table(const char *from, const char *to)
if (!(orig_tab= dict->getTable(m_tabname)))
ERR_RETURN(dict->getNdbError());
}
m_table= (void *)orig_tab;
// Change current database to that of target table
set_dbname(to);
ndb->setDatabaseName(m_dbname);
if (!(result= alter_table_name(new_tabname)))
if ((result= alter_table_name(new_tabname)))
{
DBUG_RETURN(result);
}
// Rename .ndb file
if ((result= handler::rename_table(from, to)))
{
// Rename .ndb file
result= handler::rename_table(from, to);
// ToDo in 4.1 should rollback alter table...
DBUG_RETURN(result);
}
DBUG_RETURN(result);
......@@ -4050,7 +4157,7 @@ int ha_ndbcluster::alter_table_name(const char *to)
Ndb *ndb= get_ndb();
NDBDICT *dict= ndb->getDictionary();
const NDBTAB *orig_tab= (const NDBTAB *) m_table;
DBUG_ENTER("alter_table_name_table");
DBUG_ENTER("alter_table_name");
NdbDictionary::Table new_tab= *orig_tab;
new_tab.setName(to);
......@@ -4069,6 +4176,38 @@ int ha_ndbcluster::alter_table_name(const char *to)
*/
/* static version which does not need a handler */
int
ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,
const char *path,
const char *db,
const char *table_name)
{
DBUG_ENTER("ha_ndbcluster::ndbcluster_delete_table");
NDBDICT *dict= ndb->getDictionary();
/* Drop the table from NDB */
int res;
if (h)
{
res= h->drop_table();
}
else
{
ndb->setDatabaseName(db);
res= dict->dropTable(table_name);
}
if (res)
{
DBUG_RETURN(res);
}
DBUG_RETURN(0);
}
int ha_ndbcluster::delete_table(const char *name)
{
DBUG_ENTER("ha_ndbcluster::delete_table");
......@@ -4081,9 +4220,8 @@ int ha_ndbcluster::delete_table(const char *name)
/* Call ancestor function to delete .ndb file */
handler::delete_table(name);
/* Drop the table from NDB */
DBUG_RETURN(drop_table());
DBUG_RETURN(delete_table(this, get_ndb(),name, m_dbname, m_tabname));
}
......@@ -4149,6 +4287,15 @@ ulonglong ha_ndbcluster::get_auto_increment()
Constructor for the NDB Cluster table handler
*/
#define HA_NDBCLUSTER_TABLE_FLAGS \
HA_REC_NOT_IN_SEQ | \
HA_NULL_IN_KEY | \
HA_AUTO_PART_KEY | \
HA_NO_PREFIX_CHAR_KEYS | \
HA_NEED_READ_RANGE_BUFFER | \
HA_CAN_GEOMETRY | \
HA_CAN_BIT_FIELD
ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
handler(&ndbcluster_hton, table_arg),
m_active_trans(NULL),
......@@ -4156,13 +4303,7 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
m_table(NULL),
m_table_version(-1),
m_table_info(NULL),
m_table_flags(HA_REC_NOT_IN_SEQ |
HA_NULL_IN_KEY |
HA_AUTO_PART_KEY |
HA_NO_PREFIX_CHAR_KEYS |
HA_NEED_READ_RANGE_BUFFER |
HA_CAN_GEOMETRY |
HA_CAN_BIT_FIELD),
m_table_flags(HA_NDBCLUSTER_TABLE_FLAGS),
m_share(0),
m_part_info(NULL),
m_use_partition_function(FALSE),
......@@ -4170,6 +4311,7 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
m_use_write(FALSE),
m_ignore_dup_key(FALSE),
m_primary_key_update(FALSE),
m_ignore_no_key(FALSE),
m_rows_to_insert((ha_rows) 1),
m_rows_inserted((ha_rows) 0),
m_bulk_insert_rows((ha_rows) 1024),
......@@ -4223,7 +4365,9 @@ ha_ndbcluster::~ha_ndbcluster()
DBUG_ENTER("~ha_ndbcluster");
if (m_share)
free_share(m_share);
{
free_share(&m_share);
}
release_metadata();
my_free(m_blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
m_blobs_buffer= 0;
......@@ -4256,8 +4400,8 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked)
int res;
KEY *key;
DBUG_ENTER("open");
DBUG_PRINT("enter", ("name: %s mode: %d test_if_locked: %d",
name, mode, test_if_locked));
DBUG_PRINT("enter", ("this: %d name: %s mode: %d test_if_locked: %d",
this, name, mode, test_if_locked));
// Setup ref_length to make room for the whole
// primary key to be written in the ref variable
......@@ -4277,7 +4421,8 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked)
set_tabname(name);
if (check_ndb_connection()) {
free_share(m_share); m_share= 0;
free_share(&m_share);
m_share= 0;
DBUG_RETURN(HA_ERR_NO_CONNECTION);
}
......@@ -4306,7 +4451,8 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked)
int ha_ndbcluster::close(void)
{
DBUG_ENTER("close");
free_share(m_share); m_share= 0;
free_share(&m_share);
m_share= 0;
release_metadata();
DBUG_RETURN(0);
}
......@@ -4509,21 +4655,24 @@ int ndbcluster_drop_database(const char *path)
ERR_RETURN(dict->getNdbError());
for (i= 0 ; i < list.count ; i++)
{
NdbDictionary::Dictionary::List::Element& t= list.elements[i];
DBUG_PRINT("info", ("Found %s/%s in NDB", t.database, t.name));
NdbDictionary::Dictionary::List::Element& elmt= list.elements[i];
DBUG_PRINT("info", ("Found %s/%s in NDB", elmt.database, elmt.name));
// Add only tables that belongs to db
if (my_strcasecmp(system_charset_info, t.database, dbname))
if (my_strcasecmp(system_charset_info, elmt.database, dbname))
continue;
DBUG_PRINT("info", ("%s must be dropped", t.name));
drop_list.push_back(thd->strdup(t.name));
DBUG_PRINT("info", ("%s must be dropped", elmt.name));
drop_list.push_back(thd->strdup(elmt.name));
}
// Drop any tables belonging to database
char full_path[FN_REFLEN];
char *tmp= strxnmov(full_path, FN_REFLEN, share_prefix, dbname, "/", NullS);
ndb->setDatabaseName(dbname);
List_iterator_fast<char> it(drop_list);
while ((tabname=it++))
{
if (dict->dropTable(tabname))
strxnmov(tmp, FN_REFLEN - (tmp - full_path), tabname, NullS);
if (ha_ndbcluster::delete_table(0, ndb, full_path, dbname, tabname))
{
const NdbError err= dict->getNdbError();
if (err.code != 709)
......@@ -4536,6 +4685,92 @@ int ndbcluster_drop_database(const char *path)
DBUG_RETURN(ret);
}
/*
find all tables in ndb and discover those needed
*/
static int ndbcluster_find_all_files(THD *thd)
{
DBUG_ENTER("ndbcluster_find_all_files");
Ndb* ndb;
char key[FN_REFLEN];
NdbDictionary::Dictionary::List list;
if (!(ndb= check_ndb_in_thd(thd)))
DBUG_RETURN(HA_ERR_NO_CONNECTION);
NDBDICT *dict= ndb->getDictionary();
int unhandled, retries= 5;
do
{
if (dict->listObjects(list, NdbDictionary::Object::UserTable) != 0)
ERR_RETURN(dict->getNdbError());
unhandled= 0;
for (uint i= 0 ; i < list.count ; i++)
{
NDBDICT::List::Element& elmt= list.elements[i];
DBUG_PRINT("info", ("Found %s.%s in NDB", elmt.database, elmt.name));
if (!(elmt.state == NDBOBJ::StateBuilding ||
elmt.state == NDBOBJ::StateOnline))
{
sql_print_information("NDB: skipping setup table %s.%s, in state %d",
elmt.database, elmt.name, elmt.state);
continue;
}
ndb->setDatabaseName(elmt.database);
const NDBTAB *ndbtab;
if (!(ndbtab= dict->getTable(elmt.name)))
{
sql_print_error("NDB: failed to setup table %s.%s, error: %d, %s",
elmt.database, elmt.name,
dict->getNdbError().code,
dict->getNdbError().message);
unhandled++;
continue;
}
if (ndbtab->getFrmLength() == 0)
continue;
strxnmov(key, FN_LEN, mysql_data_home, "/",
elmt.database, "/", elmt.name, NullS);
const void *data= 0, *pack_data= 0;
uint length, pack_length;
int discover= 0;
if (readfrm(key, &data, &length) ||
packfrm(data, length, &pack_data, &pack_length))
{
discover= 1;
sql_print_information("NDB: missing frm for %s.%s, discovering...",
elmt.database, elmt.name);
}
else if (cmp_frm(ndbtab, pack_data, pack_length))
{
discover= 1;
sql_print_information("NDB: mismatch in frm for %s.%s, discovering...",
elmt.database, elmt.name);
}
my_free((char*) data, MYF(MY_ALLOW_ZERO_PTR));
my_free((char*) pack_data, MYF(MY_ALLOW_ZERO_PTR));
if (discover)
{
/* ToDo 4.1 database needs to be created if missing */
pthread_mutex_lock(&LOCK_open);
if (ha_create_table_from_engine(thd, elmt.database, elmt.name))
{
/* ToDo 4.1 handle error */
}
pthread_mutex_unlock(&LOCK_open);
}
}
}
while (unhandled && retries--);
DBUG_RETURN(0);
}
int ndbcluster_find_files(THD *thd,const char *db,const char *path,
const char *wild, bool dir, List<char> *files)
......@@ -4547,7 +4782,7 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path,
Ndb* ndb;
char name[FN_REFLEN];
HASH ndb_tables, ok_tables;
NdbDictionary::Dictionary::List list;
NDBDICT::List list;
if (!(ndb= check_ndb_in_thd(thd)))
DBUG_RETURN(HA_ERR_NO_CONNECTION);
......@@ -4578,11 +4813,11 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path,
for (i= 0 ; i < list.count ; i++)
{
NdbDictionary::Dictionary::List::Element& t= list.elements[i];
DBUG_PRINT("info", ("Found %s/%s in NDB", t.database, t.name));
NDBDICT::List::Element& elmt= list.elements[i];
DBUG_PRINT("info", ("Found %s/%s in NDB", elmt.database, elmt.name));
// Add only tables that belongs to db
if (my_strcasecmp(system_charset_info, t.database, db))
if (my_strcasecmp(system_charset_info, elmt.database, db))
continue;
// Apply wildcard to list of tables in NDB
......@@ -4590,14 +4825,14 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path,
{
if (lower_case_table_names)
{
if (wild_case_compare(files_charset_info, t.name, wild))
if (wild_case_compare(files_charset_info, elmt.name, wild))
continue;
}
else if (wild_compare(t.name,wild,0))
else if (wild_compare(elmt.name,wild,0))
continue;
}
DBUG_PRINT("info", ("Inserting %s into ndb_tables hash", t.name));
my_hash_insert(&ndb_tables, (byte*)thd->strdup(t.name));
DBUG_PRINT("info", ("Inserting %s into ndb_tables hash", elmt.name));
my_hash_insert(&ndb_tables, (byte*)thd->strdup(elmt.name));
}
char *file_name;
......@@ -4645,10 +4880,15 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path,
file_name= hash_element(&ndb_tables, i);
if (!hash_search(&ok_tables, file_name, strlen(file_name)))
{
DBUG_PRINT("info", ("%s must be discovered", file_name));
// File is in list of ndb tables and not in ok_tables
// This table need to be created
create_list.push_back(thd->strdup(file_name));
strxnmov(name, sizeof(name),
mysql_data_home, "/", db, "/", file_name, reg_ext, NullS);
if (access(name, F_OK))
{
DBUG_PRINT("info", ("%s must be discovered", file_name));
// File is in list of ndb tables and not in ok_tables
// This table need to be created
create_list.push_back(thd->strdup(file_name));
}
}
}
......@@ -4704,6 +4944,7 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path,
static int connect_callback()
{
update_status_variables(g_ndb_cluster_connection);
pthread_cond_signal(&COND_ndb_util_thread);
return 0;
}
......@@ -4778,6 +5019,7 @@ bool ndbcluster_init()
(void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0,
(hash_get_key) ndbcluster_get_key,0,0);
pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
pthread_mutex_init(&LOCK_ndb_util_thread, MY_MUTEX_INIT_FAST);
pthread_cond_init(&COND_ndb_util_thread, NULL);
......@@ -4840,6 +5082,7 @@ bool ndbcluster_end()
pthread_mutex_destroy(&LOCK_ndb_util_thread);
pthread_cond_destroy(&COND_ndb_util_thread);
ndbcluster_inited= 0;
ndbcluster_util_inited= 0;
DBUG_RETURN(0);
}
......@@ -5112,7 +5355,7 @@ uint ndb_get_commitcount(THD *thd, char *dbname, char *tabname,
char name[FN_REFLEN];
NDB_SHARE *share;
(void)strxnmov(name, FN_REFLEN, "./",dbname,"/",tabname,NullS);
(void)strxnmov(name, FN_REFLEN, share_prefix, dbname, "/", tabname, NullS);
DBUG_PRINT("enter", ("name: %s", name));
pthread_mutex_lock(&ndbcluster_mutex);
if (!(share=(NDB_SHARE*) hash_search(&ndbcluster_open_tables,
......@@ -5120,8 +5363,7 @@ uint ndb_get_commitcount(THD *thd, char *dbname, char *tabname,
strlen(name))))
{
pthread_mutex_unlock(&ndbcluster_mutex);
DBUG_PRINT("info", ("Table %s not found in ndbcluster_open_tables",
name));
DBUG_PRINT("info", ("Table %s not found in ndbcluster_open_tables", name));
DBUG_RETURN(1);
}
share->use_count++;
......@@ -5136,7 +5378,7 @@ uint ndb_get_commitcount(THD *thd, char *dbname, char *tabname,
DBUG_PRINT("info", ("Getting commit_count: %llu from share",
share->commit_count));
pthread_mutex_unlock(&share->mutex);
free_share(share);
free_share(&share);
DBUG_RETURN(0);
}
}
......@@ -5151,7 +5393,7 @@ uint ndb_get_commitcount(THD *thd, char *dbname, char *tabname,
struct Ndb_statistics stat;
if (ndb_get_table_statistics(ndb, tabname, &stat))
{
free_share(share);
free_share(&share);
DBUG_RETURN(1);
}
......@@ -5168,7 +5410,7 @@ uint ndb_get_commitcount(THD *thd, char *dbname, char *tabname,
*commit_count= 0;
}
pthread_mutex_unlock(&share->mutex);
free_share(share);
free_share(&share);
DBUG_RETURN(0);
}
......@@ -5305,6 +5547,60 @@ ha_ndbcluster::register_query_cache_table(THD *thd,
}
#ifndef DBUG_OFF
static void dbug_print_table(const char *info, TABLE *table)
{
if (table == 0)
{
DBUG_PRINT("info",("%s: (null)", info));
return;
}
DBUG_PRINT("info",
("%s: %s.%s s->fields: %d "
"reclength: %d rec_buff_length: %d record[0]: %lx "
"record[1]: %lx",
info,
table->s->db,
table->s->table_name,
table->s->fields,
table->s->reclength,
table->s->rec_buff_length,
table->record[0],
table->record[1]));
for (unsigned int i= 0; i < table->s->fields; i++)
{
Field *f= table->field[i];
DBUG_PRINT("info",
("[%d] \"%s\"(0x%lx:%s%s%s%s%s%s) type: %d pack_length: %d "
"ptr: 0x%lx[+%d] null_bit: %u null_ptr: 0x%lx[+%d]",
i,
f->field_name,
f->flags,
(f->flags & PRI_KEY_FLAG) ? "pri" : "attr",
(f->flags & NOT_NULL_FLAG) ? "" : ",nullable",
(f->flags & UNSIGNED_FLAG) ? ",unsigned" : ",signed",
(f->flags & ZEROFILL_FLAG) ? ",zerofill" : "",
(f->flags & BLOB_FLAG) ? ",blob" : "",
(f->flags & BINARY_FLAG) ? ",binary" : "",
f->real_type(),
f->pack_length(),
f->ptr, f->ptr - table->record[0],
f->null_bit,
f->null_ptr, (byte*) f->null_ptr - table->record[0]));
if (f->type() == MYSQL_TYPE_BIT)
{
Field_bit *g= (Field_bit*) f;
DBUG_PRINT("MYSQL_TYPE_BIT",("field_length: %d bit_ptr: 0x%lx[+%d] "
"bit_ofs: %u bit_len: %u",
g->field_length, g->bit_ptr,
(byte*) g->bit_ptr-table->record[0],
g->bit_ofs, g->bit_len));
}
}
}
#endif
/*
Handling the shared NDB_SHARE structure that is needed to
provide table locking.
......@@ -5313,68 +5609,195 @@ ha_ndbcluster::register_query_cache_table(THD *thd,
data we want to or can share.
*/
static byte* ndbcluster_get_key(NDB_SHARE *share,uint *length,
static byte *ndbcluster_get_key(NDB_SHARE *share,uint *length,
my_bool not_used __attribute__((unused)))
{
*length=share->table_name_length;
return (byte*) share->table_name;
*length= share->key_length;
return (byte*) share->key;
}
static NDB_SHARE* get_share(const char *table_name)
#ifndef DBUG_OFF
static void dbug_print_open_tables()
{
DBUG_ENTER("dbug_print_open_tables");
for (uint i= 0; i < ndbcluster_open_tables.records; i++)
{
NDB_SHARE *share= (NDB_SHARE*) hash_element(&ndbcluster_open_tables, i);
DBUG_PRINT("share",
("[%d] 0x%lx key: %s key_length: %d",
i, share, share->key, share->key_length));
DBUG_PRINT("share",
("db.tablename: %s.%s use_count: %d commit_count: %d",
share->db, share->table_name,
share->use_count, share->commit_count));
}
DBUG_VOID_RETURN;
}
#else
#define dbug_print_open_tables()
#endif
/*
Increase refcount on existing share.
Always returns share and cannot fail.
*/
static NDB_SHARE *get_share(NDB_SHARE *share)
{
NDB_SHARE *share;
pthread_mutex_lock(&ndbcluster_mutex);
uint length=(uint) strlen(table_name);
if (!(share=(NDB_SHARE*) hash_search(&ndbcluster_open_tables,
(byte*) table_name,
length)))
share->use_count++;
dbug_print_open_tables();
DBUG_PRINT("get_share",
("0x%lx key: %s key_length: %d",
share, share->key, share->key_length));
DBUG_PRINT("get_share",
("db.tablename: %s.%s use_count: %d commit_count: %d",
share->db, share->table_name,
share->use_count, share->commit_count));
pthread_mutex_unlock(&ndbcluster_mutex);
return share;
}
/*
Get a share object for key
Returns share for key, and increases the refcount on the share.
create_if_not_exists == TRUE:
creates share if it does not alreade exist
returns 0 only due to out of memory, and then sets my_error
create_if_not_exists == FALSE:
returns 0 if share does not exist
have_lock == TRUE, pthread_mutex_lock(&ndbcluster_mutex) already taken
*/
static NDB_SHARE *get_share(const char *key, bool create_if_not_exists,
bool have_lock)
{
NDB_SHARE *share;
if (!have_lock)
pthread_mutex_lock(&ndbcluster_mutex);
uint length= (uint) strlen(key);
if (!(share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables,
(byte*) key,
length)))
{
if ((share=(NDB_SHARE *) my_malloc(sizeof(*share)+length+1,
if (!create_if_not_exists)
{
DBUG_PRINT("error", ("get_share: %s does not exist", key));
if (!have_lock)
pthread_mutex_unlock(&ndbcluster_mutex);
return 0;
}
if ((share= (NDB_SHARE*) my_malloc(sizeof(*share),
MYF(MY_WME | MY_ZEROFILL))))
{
share->table_name_length=length;
share->table_name=(char*) (share+1);
strmov(share->table_name,table_name);
MEM_ROOT **root_ptr=
my_pthread_getspecific_ptr(MEM_ROOT**, THR_MALLOC);
MEM_ROOT *old_root= *root_ptr;
init_sql_alloc(&share->mem_root, 1024, 0);
*root_ptr= &share->mem_root; // remember to reset before return
/* enough space for key, db, and table_name */
share->key= alloc_root(*root_ptr, 2 * (length + 1));
share->key_length= length;
strmov(share->key, key);
if (my_hash_insert(&ndbcluster_open_tables, (byte*) share))
{
pthread_mutex_unlock(&ndbcluster_mutex);
my_free((gptr) share,0);
free_root(&share->mem_root, MYF(0));
my_free((gptr) share, 0);
*root_ptr= old_root;
if (!have_lock)
pthread_mutex_unlock(&ndbcluster_mutex);
return 0;
}
thr_lock_init(&share->lock);
pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
pthread_mutex_init(&share->mutex, MY_MUTEX_INIT_FAST);
share->commit_count= 0;
share->commit_count_lock= 0;
share->db= share->key + length + 1;
ha_ndbcluster::set_dbname(key, share->db);
share->table_name= share->db + strlen(share->db) + 1;
ha_ndbcluster::set_tabname(key, share->table_name);
*root_ptr= old_root;
}
else
{
DBUG_PRINT("error", ("Failed to alloc share"));
pthread_mutex_unlock(&ndbcluster_mutex);
DBUG_PRINT("error", ("get_share: failed to alloc share"));
if (!have_lock)
pthread_mutex_unlock(&ndbcluster_mutex);
my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*share));
return 0;
}
}
share->use_count++;
DBUG_PRINT("share",
("table_name: %s, length: %d, use_count: %d, commit_count: %d",
share->table_name, share->table_name_length, share->use_count,
share->commit_count));
pthread_mutex_unlock(&ndbcluster_mutex);
dbug_print_open_tables();
DBUG_PRINT("get_share",
("0x%lx key: %s key_length: %d key: %s",
share, share->key, share->key_length, key));
DBUG_PRINT("get_share",
("db.tablename: %s.%s use_count: %d commit_count: %d",
share->db, share->table_name,
share->use_count, share->commit_count));
if (!have_lock)
pthread_mutex_unlock(&ndbcluster_mutex);
return share;
}
static void real_free_share(NDB_SHARE **share)
{
DBUG_PRINT("real_free_share",
("0x%lx key: %s key_length: %d",
(*share), (*share)->key, (*share)->key_length));
DBUG_PRINT("real_free_share",
("db.tablename: %s.%s use_count: %d commit_count: %d",
(*share)->db, (*share)->table_name,
(*share)->use_count, (*share)->commit_count));
hash_delete(&ndbcluster_open_tables, (byte*) *share);
thr_lock_delete(&(*share)->lock);
pthread_mutex_destroy(&(*share)->mutex);
free_root(&(*share)->mem_root, MYF(0));
my_free((gptr) *share, MYF(0));
*share= 0;
dbug_print_open_tables();
}
/*
decrease refcount of share
calls real_free_share when refcount reaches 0
static void free_share(NDB_SHARE *share)
have_lock == TRUE, pthread_mutex_lock(&ndbcluster_mutex) already taken
*/
static void free_share(NDB_SHARE **share, bool have_lock)
{
pthread_mutex_lock(&ndbcluster_mutex);
if (!--share->use_count)
if (!have_lock)
pthread_mutex_lock(&ndbcluster_mutex);
if ((*share)->util_lock == current_thd)
(*share)->util_lock= 0;
if (!--(*share)->use_count)
{
hash_delete(&ndbcluster_open_tables, (byte*) share);
thr_lock_delete(&share->lock);
pthread_mutex_destroy(&share->mutex);
my_free((gptr) share, MYF(0));
real_free_share(share);
}
pthread_mutex_unlock(&ndbcluster_mutex);
else
{
dbug_print_open_tables();
DBUG_PRINT("free_share",
("0x%lx key: %s key_length: %d",
*share, (*share)->key, (*share)->key_length));
DBUG_PRINT("free_share",
("db.tablename: %s.%s use_count: %d commit_count: %d",
(*share)->db, (*share)->table_name,
(*share)->use_count, (*share)->commit_count));
}
if (!have_lock)
pthread_mutex_unlock(&ndbcluster_mutex);
}
......@@ -5405,14 +5828,14 @@ static int packfrm(const void *data, uint len,
uint blob_len;
frm_blob_struct* blob;
DBUG_ENTER("packfrm");
DBUG_PRINT("enter", ("data: %x, len: %d", data, len));
DBUG_PRINT("enter", ("data: 0x%lx len: %d", data, len));
error= 1;
org_len= len;
if (my_compress((byte*)data, &org_len, &comp_len))
goto err;
DBUG_PRINT("info", ("org_len: %d, comp_len: %d", org_len, comp_len));
DBUG_PRINT("info", ("org_len: %d comp_len: %d", org_len, comp_len));
DBUG_DUMP("compressed", (char*)data, org_len);
error= 2;
......@@ -5432,7 +5855,8 @@ static int packfrm(const void *data, uint len,
*pack_len= blob_len;
error= 0;
DBUG_PRINT("exit", ("pack_data: %x, pack_len: %d", *pack_data, *pack_len));
DBUG_PRINT("exit",
("pack_data: 0x%lx pack_len: %d", *pack_data, *pack_len));
err:
DBUG_RETURN(error);
......@@ -5446,7 +5870,7 @@ static int unpackfrm(const void **unpack_data, uint *unpack_len,
byte *data;
ulong complen, orglen, ver;
DBUG_ENTER("unpackfrm");
DBUG_PRINT("enter", ("pack_data: %x", pack_data));
DBUG_PRINT("enter", ("pack_data: 0x%lx", pack_data));
complen= uint4korr((char*)&blob->head.complen);
orglen= uint4korr((char*)&blob->head.orglen);
......@@ -5471,7 +5895,7 @@ static int unpackfrm(const void **unpack_data, uint *unpack_len,
*unpack_data= data;
*unpack_len= complen;
DBUG_PRINT("exit", ("frmdata: %x, len: %d", *unpack_data, *unpack_len));
DBUG_PRINT("exit", ("frmdata: 0x%lx, len: %d", *unpack_data, *unpack_len));
DBUG_RETURN(0);
}
......@@ -5484,11 +5908,10 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
DBUG_ENTER("ndb_get_table_statistics");
DBUG_PRINT("enter", ("table: %s", table));
NdbTransaction* pTrans= ndb->startTransaction();
if (pTrans == NULL)
ERR_RETURN(ndb->getNdbError());
do
{
if (pTrans == NULL)
break;
NdbScanOperation* pOp= pTrans->getNdbScanOperation(table);
if (pOp == NULL)
break;
......@@ -6010,6 +6433,7 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
THD *thd; /* needs to be first for thread_stack */
Ndb* ndb;
struct timespec abstime;
List<NDB_SHARE> util_open_tables;
my_thread_init();
DBUG_ENTER("ndb_util_thread");
......@@ -6030,10 +6454,51 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
delete ndb;
DBUG_RETURN(NULL);
}
thd->init_for_queries();
thd->version=refresh_version;
thd->set_time();
thd->main_security_ctx.host_or_ip= "";
thd->client_capabilities = 0;
my_net_init(&thd->net, 0);
thd->main_security_ctx.master_access= ~0;
thd->main_security_ctx.priv_user = 0;
/*
wait for mysql server to start
*/
pthread_mutex_lock(&LOCK_server_started);
while (!mysqld_server_started)
pthread_cond_wait(&COND_server_started, &LOCK_server_started);
pthread_mutex_unlock(&LOCK_server_started);
/*
Wait for cluster to start
*/
pthread_mutex_lock(&LOCK_ndb_util_thread);
while (!ndb_cluster_node_id)
{
/* ndb not connected yet */
set_timespec(abstime, 1);
pthread_cond_timedwait(&COND_ndb_util_thread,
&LOCK_ndb_util_thread,
&abstime);
if (abort_loop)
{
pthread_mutex_unlock(&LOCK_ndb_util_thread);
goto ndb_util_thread_end;
}
}
pthread_mutex_unlock(&LOCK_ndb_util_thread);
/*
Get all table definitions from the storage node
*/
ndbcluster_find_all_files(thd);
ndbcluster_util_inited= 1;
List<NDB_SHARE> util_open_tables;
set_timespec(abstime, 0);
for (;;)
for (;!abort_loop;)
{
pthread_mutex_lock(&LOCK_ndb_util_thread);
......@@ -6041,10 +6506,10 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
&LOCK_ndb_util_thread,
&abstime);
pthread_mutex_unlock(&LOCK_ndb_util_thread);
#ifdef NDB_EXTRA_DEBUG_UTIL_THREAD
DBUG_PRINT("ndb_util_thread", ("Started, ndb_cache_check_time: %d",
ndb_cache_check_time));
#endif
if (abort_loop)
break; /* Shutting down server */
......@@ -6075,20 +6540,12 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
List_iterator_fast<NDB_SHARE> it(util_open_tables);
while ((share= it++))
{
/* Split tab- and dbname */
char buf[FN_REFLEN];
char *tabname, *db;
uint length= dirname_length(share->table_name);
tabname= share->table_name+length;
memcpy(buf, share->table_name, length-1);
buf[length-1]= 0;
db= buf+dirname_length(buf);
DBUG_PRINT("ndb_util_thread",
("Fetching commit count for: %s",
share->table_name));
share->key));
/* Contact NDB to get commit count for table */
ndb->setDatabaseName(db);
ndb->setDatabaseName(share->db);
struct Ndb_statistics stat;
uint lock;
......@@ -6096,17 +6553,17 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
lock= share->commit_count_lock;
pthread_mutex_unlock(&share->mutex);
if (ndb_get_table_statistics(ndb, tabname, &stat) == 0)
if (ndb_get_table_statistics(ndb, share->table_name, &stat) == 0)
{
DBUG_PRINT("ndb_util_thread",
("Table: %s, commit_count: %llu, rows: %llu",
share->table_name, stat.commit_count, stat.row_count));
share->key, stat.commit_count, stat.row_count));
}
else
{
DBUG_PRINT("ndb_util_thread",
("Error: Could not get commit count for table %s",
share->table_name));
share->key));
stat.commit_count= 0;
}
......@@ -6116,7 +6573,7 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
pthread_mutex_unlock(&share->mutex);
/* Decrease the use count and possibly free share */
free_share(share);
free_share(&share);
}
/* Clear the list of open tables */
......@@ -6143,7 +6600,8 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
abstime.tv_nsec-= 1000000000;
}
}
ndb_util_thread_end:
net_end(&thd->net);
thd->cleanup();
delete thd;
delete ndb;
......@@ -7480,6 +7938,9 @@ ha_ndbcluster::generate_scan_filter(Ndb_cond_stack *ndb_cond_stack,
DBUG_RETURN(0);
}
/*
Implements the SHOW NDB STATUS command.
*/
int
ndbcluster_show_status(THD* thd)
{
......
......@@ -25,6 +25,7 @@
#pragma interface /* gcc class implementation */
#endif
#include <ndbapi/NdbApi.hpp>
#include <ndbapi_limits.h>
class Ndb; // Forward declaration
......@@ -36,10 +37,13 @@ class NdbScanFilter;
class NdbIndexScanOperation;
class NdbBlob;
class NdbIndexStat;
class NdbEventOperation;
// connectstring to cluster if given by mysqld
extern const char *ndbcluster_connectstring;
extern ulong ndb_cache_check_time;
extern ulong ndb_report_thresh_binlog_epoch_slip;
extern ulong ndb_report_thresh_binlog_mem_usage;
typedef enum ndb_index_type {
UNDEFINED_INDEX = 0,
......@@ -63,13 +67,25 @@ typedef struct ndb_index_data {
uint index_stat_query_count;
} NDB_INDEX_DATA;
typedef union { const NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue;
typedef enum {
NSS_INITIAL= 0,
NSS_DROPPED
} NDB_SHARE_STATE;
typedef struct st_ndbcluster_share {
MEM_ROOT mem_root;
THR_LOCK lock;
pthread_mutex_t mutex;
char *table_name;
uint table_name_length,use_count;
char *key;
uint key_length;
THD *util_lock;
uint use_count;
uint commit_count_lock;
ulonglong commit_count;
char *db;
char *table_name;
} NDB_SHARE;
typedef enum ndb_item_type {
......@@ -595,9 +611,16 @@ static void set_tabname(const char *pathname, char *tabname);
bool check_if_incompatible_data(HA_CREATE_INFO *info,
uint table_changes);
static void invalidate_dictionary_cache(TABLE *table, Ndb *ndb,
const char *tabname, bool global);
private:
friend int ndbcluster_drop_database(const char *path);
int alter_table_name(const char *to);
static int delete_table(ha_ndbcluster *h, Ndb *ndb,
const char *path,
const char *db,
const char *table_name);
int drop_table();
int create_index(const char *name, KEY *key_info, bool unique);
int create_ordered_index(const char *name, KEY *key_info);
......@@ -643,7 +666,8 @@ static void set_tabname(const char *pathname, char *tabname);
uint fieldnr, const byte* field_ptr);
int set_ndb_key(NdbOperation*, Field *field,
uint fieldnr, const byte* field_ptr);
int set_ndb_value(NdbOperation*, Field *field, uint fieldnr, bool *set_blob_value= 0);
int set_ndb_value(NdbOperation*, Field *field, uint fieldnr,
int row_offset= 0, bool *set_blob_value= 0);
int get_ndb_value(NdbOperation*, Field *field, uint fieldnr, byte*);
friend int g_get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg);
int get_ndb_blobs_value(NdbBlob *last_ndb_blob);
......@@ -688,6 +712,7 @@ static void set_tabname(const char *pathname, char *tabname);
NdbScanOperation* op);
friend int execute_commit(ha_ndbcluster*, NdbTransaction*);
friend int execute_no_commit_ignore_no_key(ha_ndbcluster*, NdbTransaction*);
friend int execute_no_commit(ha_ndbcluster*, NdbTransaction*);
friend int execute_no_commit_ie(ha_ndbcluster*, NdbTransaction*);
......@@ -704,7 +729,6 @@ static void set_tabname(const char *pathname, char *tabname);
NDB_SHARE *m_share;
NDB_INDEX_DATA m_index[MAX_KEY];
// NdbRecAttr has no reference to blob
typedef union { const NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue;
NdbValue m_value[NDB_MAX_ATTRIBUTES_IN_TABLE];
partition_info *m_part_info;
byte *m_rec0;
......@@ -715,6 +739,7 @@ static void set_tabname(const char *pathname, char *tabname);
bool m_ignore_dup_key;
bool m_primary_key_update;
bool m_write_op;
bool m_ignore_no_key;
ha_rows m_rows_to_insert;
ha_rows m_rows_inserted;
ha_rows m_bulk_insert_rows;
......@@ -760,3 +785,4 @@ int ndbcluster_drop_database(const char* path);
void ndbcluster_print_error(int error, const NdbOperation *error_op);
int ndbcluster_show_status(THD*);
......@@ -1224,6 +1224,9 @@ extern pthread_mutex_t LOCK_mysql_create_db,LOCK_Acl,LOCK_open,
#ifdef HAVE_OPENSSL
extern pthread_mutex_t LOCK_des_key_file;
#endif
extern pthread_mutex_t LOCK_server_started;
extern pthread_cond_t COND_server_started;
extern int mysqld_server_started;
extern rw_lock_t LOCK_grant, LOCK_sys_init_connect, LOCK_sys_init_slave;
extern pthread_cond_t COND_refresh, COND_thread_count, COND_manager;
extern pthread_attr_t connection_attrib;
......
......@@ -501,6 +501,10 @@ rw_lock_t LOCK_grant, LOCK_sys_init_connect, LOCK_sys_init_slave;
pthread_cond_t COND_refresh,COND_thread_count;
pthread_t signal_thread;
pthread_attr_t connection_attrib;
pthread_mutex_t LOCK_server_started;
pthread_cond_t COND_server_started;
int mysqld_server_started= 0;
/* replication parameters, if master_host is not NULL, we are a slave */
uint master_port= MYSQL_PORT, master_connect_retry = 60;
......@@ -2765,6 +2769,8 @@ static int init_thread_environment()
(void) pthread_mutex_init(&LOCK_rpl_status, MY_MUTEX_INIT_FAST);
(void) pthread_cond_init(&COND_rpl_status, NULL);
#endif
(void) pthread_mutex_init(&LOCK_server_started, MY_MUTEX_INIT_FAST);
(void) pthread_cond_init(&COND_server_started,NULL);
sp_cache_init();
/* Parameter for threads created for connections */
(void) pthread_attr_init(&connection_attrib);
......@@ -3450,6 +3456,10 @@ we force server id to 2, but this MySQL server will not act as a slave.");
mysqld_port,
MYSQL_COMPILATION_COMMENT);
// Signal threads waiting for server to be started
mysqld_server_started= 1;
pthread_cond_signal(&COND_server_started);
#if defined(__NT__) || defined(HAVE_SMEM)
handle_connections_methods();
#else
......@@ -3497,6 +3507,7 @@ we force server id to 2, but this MySQL server will not act as a slave.");
CloseHandle(hEventShutdown);
}
#endif
clean_up(1);
wait_for_signal_thread_to_end();
clean_up_mutexes();
my_end(opt_endinfo ? MY_CHECK_ERROR | MY_GIVE_INFO : 0);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment