Commit e7f4e54c authored by unknown's avatar unknown

Merge tulin@bk-internal.mysql.com:/home/bk/mysql-4.1-ndb

into poseidon.(none):/home/tomas/mysql-4.1-ndb-merge


sql/ha_ndbcluster.cc:
  Auto merged
parents b39ddcf9 2e43e470
......@@ -18,12 +18,12 @@ col5 enum('PENDING', 'ACTIVE', 'DISABLED') not null,
col6 int not null, to_be_deleted int) ENGINE=ndbcluster;
show table status;
Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment
t1 ndbcluster 9 Dynamic 100 0 0 NULL 0 0 1 NULL NULL NULL latin1_swedish_ci NULL
t1 ndbcluster 9 Dynamic 0 0 0 NULL 0 0 1 NULL NULL NULL latin1_swedish_ci NULL
insert into t1 values
(0,4,3,5,"PENDING",1,7),(NULL,4,3,5,"PENDING",1,7),(31,4,3,5,"PENDING",1,7), (7,4,3,5,"PENDING",1,7), (NULL,4,3,5,"PENDING",1,7), (100,4,3,5,"PENDING",1,7), (99,4,3,5,"PENDING",1,7), (8,4,3,5,"PENDING",1,7), (NULL,4,3,5,"PENDING",1,7);
show table status;
Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment
t1 ndbcluster 9 Dynamic 100 0 0 NULL 0 0 102 NULL NULL NULL latin1_swedish_ci NULL
t1 ndbcluster 9 Dynamic 9 0 0 NULL 0 0 102 NULL NULL NULL latin1_swedish_ci NULL
select * from t1 order by col1;
col1 col2 col3 col4 col5 col6 to_be_deleted
0 4 3 5 PENDING 1 7
......@@ -43,7 +43,7 @@ change column col2 fourth varchar(30) not null after col3,
modify column col6 int not null first;
show table status;
Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment
t1 ndbcluster 9 Dynamic 100 0 0 NULL 0 0 102 NULL NULL NULL latin1_swedish_ci NULL
t1 ndbcluster 9 Dynamic 9 0 0 NULL 0 0 102 NULL NULL NULL latin1_swedish_ci NULL
select * from t1 order by col1;
col6 col1 col3 fourth col4 col4_5 col5 col7 col8
1 0 3 4 5 PENDING 0000-00-00 00:00:00
......@@ -58,7 +58,7 @@ col6 col1 col3 fourth col4 col4_5 col5 col7 col8
insert into t1 values (2, NULL,4,3,5,99,"PENDING","EXTRA",'2004-01-01 00:00:00');
show table status;
Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment
t1 ndbcluster 9 Dynamic 100 0 0 NULL 0 0 103 NULL NULL NULL latin1_swedish_ci NULL
t1 ndbcluster 9 Dynamic 10 0 0 NULL 0 0 103 NULL NULL NULL latin1_swedish_ci NULL
select * from t1 order by col1;
col6 col1 col3 fourth col4 col4_5 col5 col7 col8
1 0 3 4 5 PENDING 0000-00-00 00:00:00
......
......@@ -150,7 +150,7 @@ insert into t1 values(9,'b9',999,'dd9');
commit;
explain select * from t1;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t1 ALL NULL NULL NULL NULL 100
1 SIMPLE t1 ALL NULL NULL NULL NULL 9
select * from t1 order by a;
a b c d
1 b1 111 dd1
......@@ -185,7 +185,7 @@ insert into t1 values(2,@b2,222,@d2);
commit;
explain select * from t1;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t1 ALL NULL NULL NULL NULL 100
1 SIMPLE t1 ALL NULL NULL NULL NULL 2
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 order by a;
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
......
......@@ -1066,6 +1066,8 @@ public:
Dictionary(NdbDictionaryImpl&);
const Table * getIndexTable(const char * indexName,
const char * tableName);
public:
const Table * getTable(const char * name, void **data);
};
};
......
......@@ -21,6 +21,21 @@
#include <NdbCondition.h>
#include <NdbSleep.h>
Ndb_local_table_info::Ndb_local_table_info(NdbTableImpl *table_impl, Uint32 sz)
{
m_table_impl= table_impl;
if (sz)
m_local_data= malloc(sz);
else
m_local_data= 0;
}
Ndb_local_table_info::~Ndb_local_table_info()
{
if (m_local_data)
free(m_local_data);
}
LocalDictCache::LocalDictCache(){
m_tableHash.createHashTable();
}
......@@ -29,22 +44,24 @@ LocalDictCache::~LocalDictCache(){
m_tableHash.releaseHashTable();
}
NdbTableImpl *
Ndb_local_table_info *
LocalDictCache::get(const char * name){
const Uint32 len = strlen(name);
return m_tableHash.getData(name, len);
}
void
LocalDictCache::put(const char * name, NdbTableImpl * tab){
const Uint32 id = tab->m_tableId;
LocalDictCache::put(const char * name, Ndb_local_table_info * tab_info){
const Uint32 id = tab_info->m_table_impl->m_tableId;
m_tableHash.insertKey(name, strlen(name), id, tab);
m_tableHash.insertKey(name, strlen(name), id, tab_info);
}
void
LocalDictCache::drop(const char * name){
m_tableHash.deleteKey(name, strlen(name));
Ndb_local_table_info *info= m_tableHash.deleteKey(name, strlen(name));
DBUG_ASSERT(info != 0);
delete info;
}
/*****************************************************************
......
......@@ -27,6 +27,16 @@
#include <Ndb.hpp>
#include "NdbLinHash.hpp"
class Ndb_local_table_info {
public:
Ndb_local_table_info(NdbTableImpl *table_impl, Uint32 sz=0);
~Ndb_local_table_info();
NdbTableImpl *m_table_impl;
Uint64 m_first_tuple_id;
Uint64 m_last_tuple_id;
void *m_local_data;
};
/**
* A non thread safe dict cache
*/
......@@ -35,12 +45,12 @@ public:
LocalDictCache();
~LocalDictCache();
NdbTableImpl * get(const char * name);
Ndb_local_table_info * get(const char * name);
void put(const char * name, NdbTableImpl *);
void put(const char * name, Ndb_local_table_info *);
void drop(const char * name);
NdbLinHash<NdbTableImpl> m_tableHash; // On name
NdbLinHash<Ndb_local_table_info> m_tableHash; // On name
};
/**
......
......@@ -753,9 +753,11 @@ Uint64
Ndb::getAutoIncrementValue(const char* aTableName, Uint32 cacheSize)
{
DEBUG_TRACE("getAutoIncrementValue");
const NdbTableImpl* table = theDictionary->getTable(aTableName);
if (table == 0)
const char * internalTableName = internalizeTableName(aTableName);
Ndb_local_table_info *info= theDictionary->get_local_table_info(internalTableName);
if (info == 0)
return ~0;
const NdbTableImpl *table= info->m_table_impl;
Uint64 tupleId = getTupleIdFromNdb(table->m_tableId, cacheSize);
return tupleId;
}
......@@ -832,11 +834,13 @@ bool
Ndb::setAutoIncrementValue(const char* aTableName, Uint64 val, bool increase)
{
DEBUG_TRACE("setAutoIncrementValue " << val);
const NdbTableImpl* table = theDictionary->getTable(aTableName);
if (table == 0) {
const char * internalTableName= internalizeTableName(aTableName);
Ndb_local_table_info *info= theDictionary->get_local_table_info(internalTableName);
if (info == 0) {
theError= theDictionary->getNdbError();
return false;
}
const NdbTableImpl* table= info->m_table_impl;
return setTupleIdInNdb(table->m_tableId, val, increase);
}
......
......@@ -681,13 +681,18 @@ NdbDictionary::Dictionary::alterTable(const Table & t){
}
const NdbDictionary::Table *
NdbDictionary::Dictionary::getTable(const char * name){
NdbTableImpl * t = m_impl.getTable(name);
NdbDictionary::Dictionary::getTable(const char * name, void **data){
NdbTableImpl * t = m_impl.getTable(name, data);
if(t)
return t->m_facade;
return 0;
}
const NdbDictionary::Table *
NdbDictionary::Dictionary::getTable(const char * name){
return getTable(name, 0);
}
void
NdbDictionary::Dictionary::invalidateTable(const char * name){
NdbTableImpl * t = m_impl.getTable(name);
......
......@@ -595,11 +595,12 @@ static int f_dictionary_count = 0;
NdbDictionaryImpl::~NdbDictionaryImpl()
{
NdbElement_t<NdbTableImpl> * curr = m_localHash.m_tableHash.getNext(0);
NdbElement_t<Ndb_local_table_info> * curr = m_localHash.m_tableHash.getNext(0);
if(m_globalHash){
while(curr != 0){
m_globalHash->lock();
m_globalHash->release(curr->theData);
m_globalHash->release(curr->theData->m_table_impl);
delete curr->theData;
m_globalHash->unlock();
curr = m_localHash.m_tableHash.getNext(curr);
......@@ -620,7 +621,39 @@ NdbDictionaryImpl::~NdbDictionaryImpl()
}
}
Ndb_local_table_info *
NdbDictionaryImpl::fetchGlobalTableImpl(const char * internalTableName)
{
NdbTableImpl *impl;
m_globalHash->lock();
impl = m_globalHash->get(internalTableName);
m_globalHash->unlock();
if (impl == 0){
impl = m_receiver.getTable(internalTableName, m_ndb.usingFullyQualifiedNames());
m_globalHash->lock();
m_globalHash->put(internalTableName, impl);
m_globalHash->unlock();
if(impl == 0){
return 0;
}
}
Ndb_local_table_info *info= new Ndb_local_table_info(impl, 32);
info->m_first_tuple_id= ~0;
info->m_last_tuple_id= ~0;
m_localHash.put(internalTableName, info);
m_ndb.theFirstTupleId[impl->getTableId()] = ~0;
m_ndb.theLastTupleId[impl->getTableId()] = ~0;
addBlobTables(*impl);
return info;
}
#if 0
bool
......@@ -1504,7 +1537,6 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb,
: createTable(&tSignal, ptr);
if (!alter && haveAutoIncrement) {
// if (!ndb.setAutoIncrementValue(impl.m_internalName.c_str(), autoIncrementValue)) {
if (!ndb.setAutoIncrementValue(impl.m_externalName.c_str(), autoIncrementValue)) {
if (ndb.theError.code == 0) {
m_error.code = 4336;
......@@ -1775,11 +1807,12 @@ NdbIndexImpl*
NdbDictionaryImpl::getIndexImpl(const char * externalName,
const char * internalName)
{
NdbTableImpl* tab = getTableImpl(internalName);
if(tab == 0){
Ndb_local_table_info * info = get_local_table_info(internalName);
if(info == 0){
m_error.code = 4243;
return 0;
}
NdbTableImpl * tab = info->m_table_impl;
if(tab->m_indexType == NdbDictionary::Index::Undefined){
// Not an index
......
......@@ -390,8 +390,8 @@ public:
int listObjects(List& list, NdbDictionary::Object::Type type);
int listIndexes(List& list, const char * tableName);
NdbTableImpl * getTable(const char * tableName);
NdbTableImpl * getTableImpl(const char * internalName);
NdbTableImpl * getTable(const char * tableName, void **data= 0);
Ndb_local_table_info * get_local_table_info(const char * internalName);
NdbIndexImpl * getIndex(const char * indexName,
const char * tableName);
NdbIndexImpl * getIndexImpl(const char * name, const char * internalName);
......@@ -410,6 +410,8 @@ public:
NdbDictInterface m_receiver;
Ndb & m_ndb;
private:
Ndb_local_table_info * fetchGlobalTableImpl(const char * internalName);
};
inline
......@@ -598,45 +600,28 @@ NdbDictionaryImpl::getImpl(const NdbDictionary::Dictionary & t){
inline
NdbTableImpl *
NdbDictionaryImpl::getTable(const char * tableName)
NdbDictionaryImpl::getTable(const char * tableName, void **data)
{
const char * internalTableName = m_ndb.internalizeTableName(tableName);
return getTableImpl(internalTableName);
Ndb_local_table_info *info= get_local_table_info(internalTableName);
if (info == 0) {
return 0;
}
if (data) {
*data= info->m_local_data;
}
return info->m_table_impl;
}
inline
NdbTableImpl *
NdbDictionaryImpl::getTableImpl(const char * internalTableName)
Ndb_local_table_info *
NdbDictionaryImpl::get_local_table_info(const char * internalTableName)
{
NdbTableImpl *ret = m_localHash.get(internalTableName);
if (ret != 0) {
return ret; // autoincrement already initialized
Ndb_local_table_info *info= m_localHash.get(internalTableName);
if (info != 0) {
return info; // autoincrement already initialized
}
m_globalHash->lock();
ret = m_globalHash->get(internalTableName);
m_globalHash->unlock();
if (ret == 0){
ret = m_receiver.getTable(internalTableName, m_ndb.usingFullyQualifiedNames());
m_globalHash->lock();
m_globalHash->put(internalTableName, ret);
m_globalHash->unlock();
if(ret == 0){
return 0;
}
}
m_localHash.put(internalTableName, ret);
m_ndb.theFirstTupleId[ret->getTableId()] = ~0;
m_ndb.theLastTupleId[ret->getTableId()] = ~0;
addBlobTables(*ret);
return ret;
return fetchGlobalTableImpl(internalTableName);
}
inline
......@@ -654,9 +639,9 @@ NdbDictionaryImpl::getIndex(const char * indexName,
internalIndexName = m_ndb.internalizeTableName(indexName); // Index is also a table
}
if (internalIndexName) {
NdbTableImpl * tab = getTableImpl(internalIndexName);
if (tab) {
Ndb_local_table_info * info = get_local_table_info(internalIndexName);
if (info) {
NdbTableImpl * tab = info->m_table_impl;
if (tab->m_index == 0)
tab->m_index = getIndexImpl(indexName, internalIndexName);
if (tab->m_index != 0)
......
......@@ -59,7 +59,7 @@ public:
void releaseHashTable(void);
int insertKey(const char * str, Uint32 len, Uint32 lkey1, C* data);
int deleteKey(const char * str, Uint32 len);
C *deleteKey(const char * str, Uint32 len);
C* getData(const char *, Uint32);
Uint32* getKey(const char *, Uint32);
......@@ -277,7 +277,7 @@ NdbLinHash<C>::getData( const char* str, Uint32 len ){
template <class C>
inline
int
C *
NdbLinHash<C>::deleteKey ( const char* str, Uint32 len){
const Uint32 hash = Hash(str, len);
int dir, seg;
......@@ -288,19 +288,21 @@ NdbLinHash<C>::deleteKey ( const char* str, Uint32 len){
for(NdbElement_t<C> * chain = *chainp; chain != 0; chain = chain->next){
if(chain->len == len && !memcmp(chain->str, str, len)){
if (oldChain == 0) {
C *data= chain->theData;
delete chain;
* chainp = 0;
return 1;
return data;
} else {
C *data= chain->theData;
oldChain->next = chain->next;
delete chain;
return 1;
return data;
}
} else {
oldChain = chain;
}
}
return -1; /* Element doesn't exist */
return 0; /* Element doesn't exist */
}
template <class C>
......
......@@ -87,7 +87,8 @@ static int unpackfrm(const void **data, uint *len,
const void* pack_data);
static int ndb_get_table_statistics(Ndb*, const char *,
Uint64* rows, Uint64* commits);
Uint64* rows, Uint64* commits);
/*
Error handling functions
......@@ -137,6 +138,93 @@ static int ndb_to_mysql_error(const NdbError *err)
}
/*
Place holder for ha_ndbcluster thread specific data
*/
class Thd_ndb {
public:
Thd_ndb();
~Thd_ndb();
Ndb *ndb;
ulong count;
uint lock_count;
};
Thd_ndb::Thd_ndb()
{
ndb= 0;
lock_count= 0;
count= 0;
}
Thd_ndb::~Thd_ndb()
{
}
/*
* manage uncommitted insert/deletes during transactio to get records correct
*/
struct Ndb_table_local_info {
int no_uncommitted_rows_count;
ulong transaction_count;
ha_rows records;
};
void ha_ndbcluster::records_update()
{
DBUG_ENTER("ha_ndbcluster::records_update");
struct Ndb_table_local_info *info= (struct Ndb_table_local_info *)m_table_info;
DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d",
((const NDBTAB *)m_table)->getTableId(),
info->no_uncommitted_rows_count));
if (info->records == ~(ha_rows)0)
{
Uint64 rows;
if(ndb_get_table_statistics(m_ndb, m_tabname, &rows, 0) == 0){
info->records= rows;
}
}
records= info->records+ info->no_uncommitted_rows_count;
DBUG_VOID_RETURN;
}
void ha_ndbcluster::no_uncommitted_rows_init(THD *thd)
{
DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_init");
struct Ndb_table_local_info *info= (struct Ndb_table_local_info *)m_table_info;
Thd_ndb *thd_ndb= (Thd_ndb *)thd->transaction.thd_ndb;
if (info->transaction_count != thd_ndb->count)
{
info->transaction_count = thd_ndb->count;
info->no_uncommitted_rows_count= 0;
info->records= ~(ha_rows)0;
DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d",
((const NDBTAB *)m_table)->getTableId(),
info->no_uncommitted_rows_count));
}
DBUG_VOID_RETURN;
}
void ha_ndbcluster::no_uncommitted_rows_update(int c)
{
DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_update");
struct Ndb_table_local_info *info= (struct Ndb_table_local_info *)m_table_info;
info->no_uncommitted_rows_count+= c;
DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d",
((const NDBTAB *)m_table)->getTableId(),
info->no_uncommitted_rows_count));
DBUG_VOID_RETURN;
}
void ha_ndbcluster::no_uncommitted_rows_reset(THD *thd)
{
DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_reset");
((Thd_ndb*)(thd->transaction.thd_ndb))->count++;
DBUG_VOID_RETURN;
}
/*
Take care of the error that occured in NDB
......@@ -145,6 +233,7 @@ static int ndb_to_mysql_error(const NdbError *err)
# The mapped error code
*/
int ha_ndbcluster::ndb_err(NdbConnection *trans)
{
int res;
......@@ -506,7 +595,7 @@ int ha_ndbcluster::get_metadata(const char *path)
DBUG_ENTER("get_metadata");
DBUG_PRINT("enter", ("m_tabname: %s, path: %s", m_tabname, path));
if (!(tab= dict->getTable(m_tabname)))
if (!(tab= dict->getTable(m_tabname, &m_table_info)))
ERR_RETURN(dict->getNdbError());
DBUG_PRINT("info", ("Table schema version: %d", tab->getObjectVersion()));
......@@ -556,10 +645,6 @@ int ha_ndbcluster::get_metadata(const char *path)
// All checks OK, lets use the table
m_table= (void*)tab;
Uint64 rows;
if(false && ndb_get_table_statistics(m_ndb, m_tabname, &rows, 0) == 0){
records= rows;
}
DBUG_RETURN(build_index_list(table, ILBP_OPEN));
}
......@@ -1480,6 +1565,7 @@ int ha_ndbcluster::write_row(byte *record)
Find out how this is detected!
*/
rows_inserted++;
no_uncommitted_rows_update(1);
bulk_insert_not_flushed= true;
if ((rows_to_insert == 1) ||
((rows_inserted % bulk_insert_rows) == 0) ||
......@@ -1701,6 +1787,8 @@ int ha_ndbcluster::delete_row(const byte *record)
ERR_RETURN(trans->getNdbError());
ops_pending++;
no_uncommitted_rows_update(-1);
// If deleting from cursor, NoCommit will be handled in next_result
DBUG_RETURN(0);
}
......@@ -1711,6 +1799,8 @@ int ha_ndbcluster::delete_row(const byte *record)
op->deleteTuple() != 0)
ERR_RETURN(trans->getNdbError());
no_uncommitted_rows_update(-1);
if (table->primary_key == MAX_KEY)
{
// This table has no primary key, use "hidden" primary key
......@@ -2259,7 +2349,10 @@ void ha_ndbcluster::info(uint flag)
if (flag & HA_STATUS_CONST)
DBUG_PRINT("info", ("HA_STATUS_CONST"));
if (flag & HA_STATUS_VARIABLE)
{
DBUG_PRINT("info", ("HA_STATUS_VARIABLE"));
records_update();
}
if (flag & HA_STATUS_ERRKEY)
{
DBUG_PRINT("info", ("HA_STATUS_ERRKEY"));
......@@ -2558,9 +2651,6 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
NdbConnection* trans= NULL;
DBUG_ENTER("external_lock");
DBUG_PRINT("enter", ("transaction.ndb_lock_count: %d",
thd->transaction.ndb_lock_count));
/*
Check that this handler instance has a connection
set up to the Ndb object of thd
......@@ -2568,10 +2658,15 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
if (check_ndb_connection())
DBUG_RETURN(1);
Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb;
DBUG_PRINT("enter", ("transaction.thd_ndb->lock_count: %d",
thd_ndb->lock_count));
if (lock_type != F_UNLCK)
{
DBUG_PRINT("info", ("lock_type != F_UNLCK"));
if (!thd->transaction.ndb_lock_count++)
if (!thd_ndb->lock_count++)
{
PRINT_OPTION_FLAGS(thd);
......@@ -2584,6 +2679,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
trans= m_ndb->startTransaction();
if (trans == NULL)
ERR_RETURN(m_ndb->getNdbError());
no_uncommitted_rows_reset(thd);
thd->transaction.stmt.ndb_tid= trans;
}
else
......@@ -2597,6 +2693,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
trans= m_ndb->startTransaction();
if (trans == NULL)
ERR_RETURN(m_ndb->getNdbError());
no_uncommitted_rows_reset(thd);
/*
If this is the start of a LOCK TABLE, a table look
......@@ -2633,11 +2730,12 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
// Start of transaction
retrieve_all_fields= FALSE;
ops_pending= 0;
no_uncommitted_rows_init(thd);
}
else
{
DBUG_PRINT("info", ("lock_type == F_UNLCK"));
if (!--thd->transaction.ndb_lock_count)
if (!--thd_ndb->lock_count)
{
DBUG_PRINT("trans", ("Last external_lock"));
PRINT_OPTION_FLAGS(thd);
......@@ -2696,6 +2794,7 @@ int ha_ndbcluster::start_stmt(THD *thd)
trans= m_ndb->startTransaction();
if (trans == NULL)
ERR_RETURN(m_ndb->getNdbError());
no_uncommitted_rows_reset(thd);
thd->transaction.stmt.ndb_tid= trans;
}
m_active_trans= trans;
......@@ -2715,7 +2814,7 @@ int ha_ndbcluster::start_stmt(THD *thd)
int ndbcluster_commit(THD *thd, void *ndb_transaction)
{
int res= 0;
Ndb *ndb= (Ndb*)thd->transaction.ndb;
Ndb *ndb= ((Thd_ndb*)thd->transaction.thd_ndb)->ndb;
NdbConnection *trans= (NdbConnection*)ndb_transaction;
DBUG_ENTER("ndbcluster_commit");
......@@ -2733,7 +2832,7 @@ int ndbcluster_commit(THD *thd, void *ndb_transaction)
if (res != -1)
ndbcluster_print_error(res, error_op);
}
ndb->closeTransaction(trans);
ndb->closeTransaction(trans);
DBUG_RETURN(res);
}
......@@ -2745,7 +2844,7 @@ int ndbcluster_commit(THD *thd, void *ndb_transaction)
int ndbcluster_rollback(THD *thd, void *ndb_transaction)
{
int res= 0;
Ndb *ndb= (Ndb*)thd->transaction.ndb;
Ndb *ndb= ((Thd_ndb*)thd->transaction.thd_ndb)->ndb;
NdbConnection *trans= (NdbConnection*)ndb_transaction;
DBUG_ENTER("ndbcluster_rollback");
......@@ -3222,9 +3321,9 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
m_active_cursor(NULL),
m_ndb(NULL),
m_table(NULL),
m_table_info(NULL),
m_table_flags(HA_REC_NOT_IN_SEQ |
HA_NULL_IN_KEY |
HA_NOT_EXACT_COUNT |
HA_NO_PREFIX_CHAR_KEYS),
m_share(0),
m_use_write(false),
......@@ -3249,7 +3348,8 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
// TODO Adjust number of records and other parameters for proper
// selection of scan/pk access
records= 100;
// records= 100;
records= 0;
block_size= 1024;
for (i= 0; i < MAX_KEY; i++)
......@@ -3401,25 +3501,30 @@ int ha_ndbcluster::check_ndb_connection()
Ndb* ndb;
DBUG_ENTER("check_ndb_connection");
if (!thd->transaction.ndb)
if (!thd->transaction.thd_ndb)
{
ndb= seize_ndb();
if (!ndb)
DBUG_RETURN(2);
thd->transaction.ndb= ndb;
thd->transaction.thd_ndb= new Thd_ndb();
((Thd_ndb *)thd->transaction.thd_ndb)->ndb= ndb;
}
m_ndb= (Ndb*)thd->transaction.ndb;
m_ndb= ((Thd_ndb*)thd->transaction.thd_ndb)->ndb;
m_ndb->setDatabaseName(m_dbname);
DBUG_RETURN(0);
}
void ndbcluster_close_connection(THD *thd)
{
Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb;
Ndb* ndb;
DBUG_ENTER("ndbcluster_close_connection");
ndb= (Ndb*)thd->transaction.ndb;
ha_ndbcluster::release_ndb(ndb);
thd->transaction.ndb= NULL;
if (thd_ndb)
{
ha_ndbcluster::release_ndb(thd_ndb->ndb);
delete thd_ndb;
thd->transaction.thd_ndb= NULL;
}
DBUG_VOID_RETURN;
}
......@@ -3553,6 +3658,7 @@ bool ndbcluster_init()
(void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0,
(hash_get_key) ndbcluster_get_key,0,0);
pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
ndbcluster_inited= 1;
#ifdef USE_DISCOVER_ON_STARTUP
if (ndb_discover_tables() != 0)
......
......@@ -214,7 +214,8 @@ class ha_ndbcluster: public handler
NdbConnection *m_active_trans;
NdbResultSet *m_active_cursor;
Ndb *m_ndb;
void *m_table;
void *m_table;
void *m_table_info;
char m_dbname[FN_HEADLEN];
//char m_schemaname[FN_HEADLEN];
char m_tabname[FN_HEADLEN];
......@@ -238,6 +239,11 @@ class ha_ndbcluster: public handler
char *blobs_buffer;
uint32 blobs_buffer_size;
uint dupkey;
void records_update();
void no_uncommitted_rows_update(int);
void no_uncommitted_rows_init(THD *);
void no_uncommitted_rows_reset(THD *);
};
bool ndbcluster_init(void);
......
......@@ -764,9 +764,8 @@ class THD :public ilink,
THD_TRANS all; // Trans since BEGIN WORK
THD_TRANS stmt; // Trans for current statement
uint bdb_lock_count;
uint ndb_lock_count;
#ifdef HAVE_NDBCLUSTER_DB
void* ndb;
void* thd_ndb;
#endif
bool on;
/*
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment