Add extra ordered index in when UNIQUE and PRIMARY KEY are specified.

Extra ordered indexes are created primarily 
to support partial key scan/read and range scans of hash indexes.
I.e. the ordered index are used instead of the hash indexes for 
these queries.
parent 025b29b8
......@@ -41,6 +41,7 @@
#define USE_DISCOVER_ON_STARTUP
//#define USE_NDB_POOL
//#define USE_EXTRA_ORDERED_INDEX
// Default value for parallelism
static const int parallelism= 240;
......@@ -64,6 +65,10 @@ typedef NdbDictionary::Dictionary NDBDICT;
bool ndbcluster_inited= false;
#ifdef USE_EXTRA_ORDERED_INDEX
static const char* unique_suffix= "$unique";
#endif
// Handler synchronization
pthread_mutex_t ndbcluster_mutex;
......@@ -95,6 +100,7 @@ static const err_code_mapping err_map[]=
{ 630, HA_ERR_FOUND_DUPP_KEY },
{ 893, HA_ERR_FOUND_DUPP_UNIQUE },
{ 721, HA_ERR_TABLE_EXIST },
{ 4244, HA_ERR_TABLE_EXIST },
{ 241, HA_ERR_OLD_METADATA },
{ -1, -1 }
};
......@@ -354,15 +360,37 @@ int ha_ndbcluster::get_metadata(const char *path)
m_table= (void*)tab;
for (i= 0; i < MAX_KEY; i++)
{
m_indextype[i]= UNDEFINED_INDEX;
m_unique_index_name[i]= NULL;
}
// Save information about all known indexes
for (i= 0; i < table->keys; i++)
m_indextype[i] = get_index_type_from_table(i);
{
m_indextype[i]= get_index_type_from_table(i);
#ifdef USE_EXTRA_ORDERED_INDEX
if (m_indextype[i] == UNIQUE_INDEX)
{
char *name;
const char *index_name= get_index_name(i);
int name_len= strlen(index_name)+strlen(unique_suffix)+1;
if (!(name= my_malloc(name_len, MYF(MY_WME))))
DBUG_RETURN(2);
strxnmov(name, name_len, index_name, unique_suffix, NullS);
m_unique_index_name[i]= name;
DBUG_PRINT("info", ("Created unique index name: %s for index %d",
name, i));
}
#endif
}
DBUG_RETURN(0);
}
/*
Decode the type of an index from information
provided in table object
......@@ -380,11 +408,19 @@ NDB_INDEX_TYPE ha_ndbcluster::get_index_type_from_table(uint index_no) const
void ha_ndbcluster::release_metadata()
{
int i;
DBUG_ENTER("release_metadata");
DBUG_PRINT("enter", ("m_tabname: %s", m_tabname));
m_table= NULL;
for (i= 0; i < MAX_KEY; i++)
{
my_free((char*)m_unique_index_name[i], MYF(MY_ALLOW_ZERO_PTR));
m_unique_index_name[i]= NULL;
}
DBUG_VOID_RETURN;
}
......@@ -416,6 +452,18 @@ inline const char* ha_ndbcluster::get_index_name(uint idx_no) const
return table->keynames.type_names[idx_no];
}
inline const char* ha_ndbcluster::get_unique_index_name(uint idx_no) const
{
#ifdef USE_EXTRA_ORDERED_INDEX
DBUG_ASSERT(idx_no < MAX_KEY);
DBUG_ASSERT(m_unique_index_name[idx_no]);
return m_unique_index_name[idx_no];
#else
return get_index_name(idx_no);
#endif
}
inline NDB_INDEX_TYPE ha_ndbcluster::get_index_type(uint idx_no) const
{
DBUG_ASSERT(idx_no < MAX_KEY);
......@@ -550,7 +598,6 @@ int ha_ndbcluster::unique_index_read(const byte *key,
uint key_len, byte *buf)
{
NdbConnection *trans= m_active_trans;
const char *index_name;
NdbIndexOperation *op;
THD *thd= current_thd;
byte *key_ptr;
......@@ -560,9 +607,10 @@ int ha_ndbcluster::unique_index_read(const byte *key,
DBUG_ENTER("unique_index_read");
DBUG_PRINT("enter", ("key_len: %u, index: %u", key_len, active_index));
DBUG_DUMP("key", (char*)key, key_len);
DBUG_PRINT("enter", ("name: %s", get_unique_index_name(active_index)));
index_name= get_index_name(active_index);
if (!(op= trans->getNdbIndexOperation(index_name, m_tabname)) ||
if (!(op= trans->getNdbIndexOperation(get_unique_index_name(active_index),
m_tabname)) ||
op->readTuple() != 0)
ERR_RETURN(trans->getNdbError());
......@@ -1349,15 +1397,35 @@ int ha_ndbcluster::index_read(byte *buf,
DBUG_PRINT("enter", ("active_index: %u, key_len: %u, find_flag: %d",
active_index, key_len, find_flag));
KEY* key_info;
int error= 1;
statistic_increment(ha_read_key_count, &LOCK_status);
switch (get_index_type(active_index)){
case PRIMARY_KEY_INDEX:
#ifdef USE_EXTRA_ORDERED_INDEX
key_info= table->key_info + active_index;
if (key_len < key_info->key_length ||
find_flag != HA_READ_KEY_EXACT)
{
error= ordered_index_scan(key, key_len, buf, find_flag);
break;
}
#endif
error= pk_read(key, key_len, buf);
break;
case UNIQUE_INDEX:
#ifdef USE_EXTRA_ORDERED_INDEX
key_info= table->key_info + active_index;
if (key_len < key_info->key_length ||
find_flag != HA_READ_KEY_EXACT)
{
error= ordered_index_scan(key, key_len, buf, find_flag);
break;
}
#endif
error= unique_index_read(key, key_len, buf);
break;
......@@ -1391,7 +1459,7 @@ int ha_ndbcluster::index_next(byte *buf)
int error = 1;
statistic_increment(ha_read_next_count,&LOCK_status);
if (get_index_type(active_index) == PRIMARY_KEY_INDEX)
if (!m_active_cursor)
error= HA_ERR_END_OF_FILE;
else
error = next_result(buf);
......@@ -2041,9 +2109,8 @@ int ha_ndbcluster::create(const char *name,
NdbDictionary::Column::Type ndb_type;
NDBCOL col;
uint pack_length, length, i;
int res;
const void *data, *pack_data;
const char **key_name= form->keynames.type_names;
const char **key_names= form->keynames.type_names;
char name2[FN_HEADLEN];
DBUG_ENTER("create");
......@@ -2086,13 +2153,11 @@ int ha_ndbcluster::create(const char *name,
col.setPrimaryKey(field->flags & PRI_KEY_FLAG);
if (field->flags & AUTO_INCREMENT_FLAG)
{
DBUG_PRINT("info", ("Found auto_increment key"));
col.setAutoIncrement(TRUE);
ulonglong value = info->auto_increment_value ?
info->auto_increment_value -1 :
(ulonglong) 0;
DBUG_PRINT("info", ("initial value=%ld", value));
// col.setInitialAutIncValue(value);
ulonglong value= info->auto_increment_value ?
info->auto_increment_value -1 : (ulonglong) 0;
DBUG_PRINT("info", ("Autoincrement key, initial: %d", value));
col.setAutoIncrementInitialValue(value);
}
else
col.setAutoIncrement(false);
......@@ -2143,41 +2208,81 @@ int ha_ndbcluster::create(const char *name,
}
// Create secondary indexes
for (i= 0; i < form->keys; i++)
KEY* key_info= form->key_info;
const char** key_name= key_names;
for (i= 0; i < form->keys; i++, key_info++, key_name++)
{
DBUG_PRINT("info", ("Found index %u: %s", i, key_name[i]));
int error= 0;
DBUG_PRINT("info", ("Index %u: %s", i, *key_name));
if (i == form->primary_key)
{
DBUG_PRINT("info", ("Skipping it, PK already created"));
continue;
#ifdef USE_EXTRA_ORDERED_INDEX
error= create_ordered_index(*key_name, key_info);
#endif
}
else if (key_info->flags & HA_NOSAME)
error= create_unique_index(*key_name, key_info);
else
error= create_ordered_index(*key_name, key_info);
DBUG_PRINT("info", ("Creating index %u: %s", i, key_name[i]));
res= create_index(key_name[i],
form->key_info + i);
switch(res){
case 0:
// OK
break;
default:
if (error)
{
DBUG_PRINT("error", ("Failed to create index %u", i));
drop_table();
my_errno= res;
goto err_end;
my_errno= error;
break;
}
}
err_end:
DBUG_RETURN(my_errno);
}
int ha_ndbcluster::create_ordered_index(const char *name,
KEY *key_info)
{
DBUG_ENTER("create_ordered_index");
DBUG_RETURN(create_index(name, key_info, false));
}
int ha_ndbcluster::create_unique_index(const char *name,
KEY *key_info)
{
int error;
const char* unique_name= name;
DBUG_ENTER("create_unique_index");
#ifdef USE_EXTRA_ORDERED_INDEX
char buf[FN_HEADLEN];
strxnmov(buf, FN_HEADLEN, name, unique_suffix, NullS);
unique_name= buf;
#endif
error= create_index(unique_name, key_info, true);
if (error)
DBUG_RETURN(error);
#ifdef USE_EXTRA_ORDERED_INDEX
/*
If unique index contains more then one attribute
an ordered index should be created to support
partial key search
*/
error= create_ordered_index(name, key_info);
#endif
DBUG_RETURN(error);
}
/*
Create an index in NDB Cluster
*/
int ha_ndbcluster::create_index(const char *name,
KEY *key_info){
KEY *key_info,
bool unique)
{
NdbDictionary::Dictionary *dict= m_ndb->getDictionary();
KEY_PART_INFO *key_part= key_info->key_part;
KEY_PART_INFO *end= key_part + key_info->key_parts;
......@@ -2185,12 +2290,8 @@ int ha_ndbcluster::create_index(const char *name,
DBUG_ENTER("create_index");
DBUG_PRINT("enter", ("name: %s ", name));
// Check that an index with the same name do not already exist
if (dict->getIndex(name, m_tabname))
ERR_RETURN(dict->getNdbError());
NdbDictionary::Index ndb_index(name);
if (key_info->flags & HA_NOSAME)
if (unique)
ndb_index.setType(NdbDictionary::Index::UniqueHashIndex);
else
{
......@@ -2349,6 +2450,7 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
HA_NOT_READ_AFTER_KEY),
m_use_write(false)
{
int i;
DBUG_ENTER("ha_ndbcluster");
......@@ -2360,6 +2462,12 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
records= 100;
block_size= 1024;
for (i= 0; i < MAX_KEY; i++)
{
m_indextype[i]= UNDEFINED_INDEX;
m_unique_index_name[i]= NULL;
}
DBUG_VOID_RETURN;
}
......@@ -2765,6 +2873,7 @@ ha_ndbcluster::records_in_range(int inx,
DBUG_PRINT("enter", ("end_key: %x, end_key_len: %d", end_key, end_key_len));
DBUG_PRINT("enter", ("end_search_flag: %d", end_search_flag));
#ifndef USE_EXTRA_ORDERED_INDEX
/*
Check that start_key_len is equal to
the length of the used index and
......@@ -2779,6 +2888,14 @@ ha_ndbcluster::records_in_range(int inx,
key_length));
records= HA_POS_ERROR;
}
#else
/*
Extra ordered indexes are created primarily
to support partial key scan/read and range scans of hash indexes.
I.e. the ordered index are used instead of the hash indexes for
these queries.
*/
#endif
DBUG_RETURN(records);
}
......
......@@ -135,11 +135,14 @@ class ha_ndbcluster: public handler
private:
int alter_table_name(const char *from, const char *to);
int drop_table();
int create_index(const char *name, KEY *key_info);
int create_index(const char *name, KEY *key_info, bool unique);
int create_ordered_index(const char *name, KEY *key_info);
int create_unique_index(const char *name, KEY *key_info);
int initialize_autoincrement(const void* table);
int get_metadata(const char* path);
void release_metadata();
const char* get_index_name(uint idx_no) const;
const char* get_unique_index_name(uint idx_no) const;
NDB_INDEX_TYPE get_index_type(uint idx_no) const;
NDB_INDEX_TYPE get_index_type_from_table(uint index_no) const;
......@@ -193,6 +196,7 @@ class ha_ndbcluster: public handler
THR_LOCK_DATA m_lock;
NDB_SHARE *m_share;
NDB_INDEX_TYPE m_indextype[MAX_KEY];
const char* m_unique_index_name[MAX_KEY];
NdbRecAttr *m_value[NDB_MAX_ATTRIBUTES_IN_TABLE];
bool m_use_write;
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment