Commit a3e0b69b authored by unknown's avatar unknown

Fix for bug#5171

parent 4f1230a5
drop table if exists t1; drop table if exists t1;
drop database if exists test2;
set autocommit=0; set autocommit=0;
create table t1 ( create table t1 (
a int not null primary key, a int not null primary key,
...@@ -220,6 +221,55 @@ a b c d ...@@ -220,6 +221,55 @@ a b c d
7 7xb7 777 7xdd7 7 7xb7 777 7xdd7
8 8xb8 888 8xdd8 8 8xb8 888 8xdd8
9 9xb9 999 9xdd9 9 9xb9 999 9xdd9
select * from t1 order by a;
a b c d
1 1xb1 111 1xdd1
2 2xb2 222 2xdd2
3 3xb3 333 3xdd3
4 4xb4 444 4xdd4
5 5xb5 555 5xdd5
6 6xb6 666 6xdd6
7 7xb7 777 7xdd7
8 8xb8 888 8xdd8
9 9xb9 999 9xdd9
alter table t1 add x int;
select * from t1 order by a;
a b c d x
1 1xb1 111 1xdd1 NULL
2 2xb2 222 2xdd2 NULL
3 3xb3 333 3xdd3 NULL
4 4xb4 444 4xdd4 NULL
5 5xb5 555 5xdd5 NULL
6 6xb6 666 6xdd6 NULL
7 7xb7 777 7xdd7 NULL
8 8xb8 888 8xdd8 NULL
9 9xb9 999 9xdd9 NULL
alter table t1 drop x;
select * from t1 order by a;
a b c d
1 1xb1 111 1xdd1
2 2xb2 222 2xdd2
3 3xb3 333 3xdd3
4 4xb4 444 4xdd4
5 5xb5 555 5xdd5
6 6xb6 666 6xdd6
7 7xb7 777 7xdd7
8 8xb8 888 8xdd8
9 9xb9 999 9xdd9
create database test2;
use test2;
CREATE TABLE t2 (
a bigint unsigned NOT NULL PRIMARY KEY,
b int unsigned not null,
c int unsigned
) engine=ndbcluster;
insert into t2 values (1,1,1),(2,2,2);
select * from test.t1,t2 where test.t1.a = t2.a order by test.t1.a;
a b c d a b c
1 1xb1 111 1xdd1 1 1 1
2 2xb2 222 2xdd2 2 2 2
drop table t2;
use test;
delete from t1 where c >= 100; delete from t1 where c >= 100;
commit; commit;
select count(*) from t1; select count(*) from t1;
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
--disable_warnings --disable_warnings
drop table if exists t1; drop table if exists t1;
drop database if exists test2;
--enable_warnings --enable_warnings
# #
...@@ -203,6 +204,31 @@ where c >= 100; ...@@ -203,6 +204,31 @@ where c >= 100;
commit; commit;
select * from t1 where c >= 100 order by a; select * from t1 where c >= 100 order by a;
# alter table
select * from t1 order by a;
alter table t1 add x int;
select * from t1 order by a;
alter table t1 drop x;
select * from t1 order by a;
# multi db
create database test2;
use test2;
CREATE TABLE t2 (
a bigint unsigned NOT NULL PRIMARY KEY,
b int unsigned not null,
c int unsigned
) engine=ndbcluster;
insert into t2 values (1,1,1),(2,2,2);
select * from test.t1,t2 where test.t1.a = t2.a order by test.t1.a;
drop table t2;
use test;
# range scan delete # range scan delete
delete from t1 where c >= 100; delete from t1 where c >= 100;
commit; commit;
......
...@@ -234,14 +234,13 @@ private: ...@@ -234,14 +234,13 @@ private:
// define blob table // define blob table
static void getBlobTableName(char* btname, const NdbTableImpl* t, const NdbColumnImpl* c); static void getBlobTableName(char* btname, const NdbTableImpl* t, const NdbColumnImpl* c);
static void getBlobTable(NdbTableImpl& bt, const NdbTableImpl* t, const NdbColumnImpl* c); static void getBlobTable(NdbTableImpl& bt, const NdbTableImpl* t, const NdbColumnImpl* c);
// table name
char theBlobTableName[BlobTableNameSize];
// ndb api stuff // ndb api stuff
Ndb* theNdb; Ndb* theNdb;
NdbConnection* theNdbCon; NdbConnection* theNdbCon;
NdbOperation* theNdbOp; NdbOperation* theNdbOp;
NdbTableImpl* theTable; NdbTableImpl* theTable;
NdbTableImpl* theAccessTable; NdbTableImpl* theAccessTable;
NdbTableImpl* theBlobTable;
const NdbColumnImpl* theColumn; const NdbColumnImpl* theColumn;
char theFillChar; char theFillChar;
// sizes // sizes
......
...@@ -144,6 +144,8 @@ public: ...@@ -144,6 +144,8 @@ public:
FragAllLarge = 4 ///< Eight fragments per node group. FragAllLarge = 4 ///< Eight fragments per node group.
}; };
}; };
class Table; // forward declaration
/** /**
* @class Column * @class Column
...@@ -365,6 +367,8 @@ public: ...@@ -365,6 +367,8 @@ public:
void setIndexOnlyStorage(bool); void setIndexOnlyStorage(bool);
bool getIndexOnlyStorage() const; bool getIndexOnlyStorage() const;
const Table * getBlobTable() const;
/** /**
* @name ODBC Specific methods * @name ODBC Specific methods
* @{ * @{
......
...@@ -140,7 +140,6 @@ void ...@@ -140,7 +140,6 @@ void
NdbBlob::init() NdbBlob::init()
{ {
theState = Idle; theState = Idle;
theBlobTableName[0] = 0;
theNdb = NULL; theNdb = NULL;
theNdbCon = NULL; theNdbCon = NULL;
theNdbOp = NULL; theNdbOp = NULL;
...@@ -865,7 +864,7 @@ NdbBlob::readParts(char* buf, Uint32 part, Uint32 count) ...@@ -865,7 +864,7 @@ NdbBlob::readParts(char* buf, Uint32 part, Uint32 count)
DBG("readParts part=" << part << " count=" << count); DBG("readParts part=" << part << " count=" << count);
Uint32 n = 0; Uint32 n = 0;
while (n < count) { while (n < count) {
NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTableName); NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable);
if (tOp == NULL || if (tOp == NULL ||
tOp->readTuple() == -1 || tOp->readTuple() == -1 ||
setPartKeyValue(tOp, part + n) == -1 || setPartKeyValue(tOp, part + n) == -1 ||
...@@ -887,7 +886,7 @@ NdbBlob::insertParts(const char* buf, Uint32 part, Uint32 count) ...@@ -887,7 +886,7 @@ NdbBlob::insertParts(const char* buf, Uint32 part, Uint32 count)
DBG("insertParts part=" << part << " count=" << count); DBG("insertParts part=" << part << " count=" << count);
Uint32 n = 0; Uint32 n = 0;
while (n < count) { while (n < count) {
NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTableName); NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable);
if (tOp == NULL || if (tOp == NULL ||
tOp->insertTuple() == -1 || tOp->insertTuple() == -1 ||
setPartKeyValue(tOp, part + n) == -1 || setPartKeyValue(tOp, part + n) == -1 ||
...@@ -909,7 +908,7 @@ NdbBlob::updateParts(const char* buf, Uint32 part, Uint32 count) ...@@ -909,7 +908,7 @@ NdbBlob::updateParts(const char* buf, Uint32 part, Uint32 count)
DBG("updateParts part=" << part << " count=" << count); DBG("updateParts part=" << part << " count=" << count);
Uint32 n = 0; Uint32 n = 0;
while (n < count) { while (n < count) {
NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTableName); NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable);
if (tOp == NULL || if (tOp == NULL ||
tOp->updateTuple() == -1 || tOp->updateTuple() == -1 ||
setPartKeyValue(tOp, part + n) == -1 || setPartKeyValue(tOp, part + n) == -1 ||
...@@ -931,7 +930,7 @@ NdbBlob::deleteParts(Uint32 part, Uint32 count) ...@@ -931,7 +930,7 @@ NdbBlob::deleteParts(Uint32 part, Uint32 count)
DBG("deleteParts part=" << part << " count=" << count); DBG("deleteParts part=" << part << " count=" << count);
Uint32 n = 0; Uint32 n = 0;
while (n < count) { while (n < count) {
NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTableName); NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable);
if (tOp == NULL || if (tOp == NULL ||
tOp->deleteTuple() == -1 || tOp->deleteTuple() == -1 ||
setPartKeyValue(tOp, part + n) == -1) { setPartKeyValue(tOp, part + n) == -1) {
...@@ -1029,12 +1028,11 @@ NdbBlob::atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl* ...@@ -1029,12 +1028,11 @@ NdbBlob::atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl*
// sanity check // sanity check
assert((NDB_BLOB_HEAD_SIZE << 2) == sizeof(Head)); assert((NDB_BLOB_HEAD_SIZE << 2) == sizeof(Head));
assert(theColumn->m_attrSize * theColumn->m_arraySize == sizeof(Head) + theInlineSize); assert(theColumn->m_attrSize * theColumn->m_arraySize == sizeof(Head) + theInlineSize);
getBlobTableName(theBlobTableName, theTable, theColumn);
const NdbDictionary::Table* bt; const NdbDictionary::Table* bt;
const NdbDictionary::Column* bc; const NdbDictionary::Column* bc;
if (thePartSize > 0) { if (thePartSize > 0) {
if (theStripeSize == 0 || if (theStripeSize == 0 ||
(bt = theNdb->theDictionary->getTable(theBlobTableName)) == NULL || (bt = theColumn->getBlobTable()) == NULL ||
(bc = bt->getColumn("DATA")) == NULL || (bc = bt->getColumn("DATA")) == NULL ||
bc->getType() != partType || bc->getType() != partType ||
bc->getLength() != (int)thePartSize) { bc->getLength() != (int)thePartSize) {
...@@ -1042,6 +1040,7 @@ NdbBlob::atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl* ...@@ -1042,6 +1040,7 @@ NdbBlob::atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl*
return -1; return -1;
} }
} }
theBlobTable = & NdbTableImpl::getImpl(*bt);
// buffers // buffers
theKeyBuf.alloc(theTable->m_sizeOfKeysInWords << 2); theKeyBuf.alloc(theTable->m_sizeOfKeysInWords << 2);
theAccessKeyBuf.alloc(theAccessTable->m_sizeOfKeysInWords << 2); theAccessKeyBuf.alloc(theAccessTable->m_sizeOfKeysInWords << 2);
......
...@@ -174,6 +174,14 @@ NdbDictionary::Column::getIndexOnlyStorage() const { ...@@ -174,6 +174,14 @@ NdbDictionary::Column::getIndexOnlyStorage() const {
return m_impl.m_indexOnly; return m_impl.m_indexOnly;
} }
const NdbDictionary::Table *
NdbDictionary::Column::getBlobTable() const {
NdbTableImpl * t = m_impl.m_blobTable;
if (t)
return t->m_facade;
return 0;
}
void void
NdbDictionary::Column::setAutoIncrement(bool val){ NdbDictionary::Column::setAutoIncrement(bool val){
m_impl.m_autoIncrement = val; m_impl.m_autoIncrement = val;
......
...@@ -79,6 +79,7 @@ NdbColumnImpl::operator=(const NdbColumnImpl& col) ...@@ -79,6 +79,7 @@ NdbColumnImpl::operator=(const NdbColumnImpl& col)
m_attrSize = col.m_attrSize; m_attrSize = col.m_attrSize;
m_arraySize = col.m_arraySize; m_arraySize = col.m_arraySize;
m_keyInfoPos = col.m_keyInfoPos; m_keyInfoPos = col.m_keyInfoPos;
m_blobTable = col.m_blobTable;
// Do not copy m_facade !! // Do not copy m_facade !!
return *this; return *this;
...@@ -104,6 +105,7 @@ NdbColumnImpl::init() ...@@ -104,6 +105,7 @@ NdbColumnImpl::init()
m_arraySize = 1, m_arraySize = 1,
m_autoIncrement = false; m_autoIncrement = false;
m_autoIncrementInitialValue = 1; m_autoIncrementInitialValue = 1;
m_blobTable = NULL;
} }
NdbColumnImpl::~NdbColumnImpl() NdbColumnImpl::~NdbColumnImpl()
...@@ -1211,7 +1213,6 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret, ...@@ -1211,7 +1213,6 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret,
} }
if (col->getBlobType()) if (col->getBlobType())
blobCount++; blobCount++;
NdbColumnImpl * null = 0; NdbColumnImpl * null = 0;
impl->m_columns.fill(attrDesc.AttributeId, null); impl->m_columns.fill(attrDesc.AttributeId, null);
if(impl->m_columns[attrDesc.AttributeId] != 0){ if(impl->m_columns[attrDesc.AttributeId] != 0){
...@@ -1266,7 +1267,28 @@ NdbDictionaryImpl::createBlobTables(NdbTableImpl &t) ...@@ -1266,7 +1267,28 @@ NdbDictionaryImpl::createBlobTables(NdbTableImpl &t)
NdbBlob::getBlobTable(bt, &t, &c); NdbBlob::getBlobTable(bt, &t, &c);
if (createTable(bt) != 0) if (createTable(bt) != 0)
return -1; return -1;
// Save BLOB table handle
NdbTableImpl * cachedBlobTable = getTable(bt.m_externalName.c_str());
c.m_blobTable = cachedBlobTable;
} }
return 0;
}
int
NdbDictionaryImpl::addBlobTables(NdbTableImpl &t)
{
for (unsigned i = 0; i < t.m_columns.size(); i++) {
NdbColumnImpl & c = *t.m_columns[i];
if (! c.getBlobType() || c.getPartSize() == 0)
continue;
char btname[NdbBlob::BlobTableNameSize];
NdbBlob::getBlobTableName(btname, &t, &c);
// Save BLOB table handle
NdbTableImpl * cachedBlobTable = getTable(btname);;
c.m_blobTable = cachedBlobTable;
}
return 0; return 0;
} }
......
...@@ -71,6 +71,7 @@ public: ...@@ -71,6 +71,7 @@ public:
bool m_autoIncrement; bool m_autoIncrement;
Uint64 m_autoIncrementInitialValue; Uint64 m_autoIncrementInitialValue;
BaseString m_defaultValue; BaseString m_defaultValue;
NdbTableImpl * m_blobTable;
/** /**
* Internal types and sizes, and aggregates * Internal types and sizes, and aggregates
...@@ -362,6 +363,7 @@ public: ...@@ -362,6 +363,7 @@ public:
int createTable(NdbTableImpl &t); int createTable(NdbTableImpl &t);
int createBlobTables(NdbTableImpl &); int createBlobTables(NdbTableImpl &);
int addBlobTables(NdbTableImpl &);
int alterTable(NdbTableImpl &t); int alterTable(NdbTableImpl &t);
int dropTable(const char * name); int dropTable(const char * name);
int dropTable(NdbTableImpl &); int dropTable(NdbTableImpl &);
...@@ -616,7 +618,6 @@ NdbDictionaryImpl::getTableImpl(const char * internalTableName) ...@@ -616,7 +618,6 @@ NdbDictionaryImpl::getTableImpl(const char * internalTableName)
if (ret == 0){ if (ret == 0){
ret = m_receiver.getTable(internalTableName, m_ndb.usingFullyQualifiedNames()); ret = m_receiver.getTable(internalTableName, m_ndb.usingFullyQualifiedNames());
m_globalHash->lock(); m_globalHash->lock();
m_globalHash->put(internalTableName, ret); m_globalHash->put(internalTableName, ret);
m_globalHash->unlock(); m_globalHash->unlock();
...@@ -629,6 +630,8 @@ NdbDictionaryImpl::getTableImpl(const char * internalTableName) ...@@ -629,6 +630,8 @@ NdbDictionaryImpl::getTableImpl(const char * internalTableName)
m_ndb.theFirstTupleId[ret->getTableId()] = ~0; m_ndb.theFirstTupleId[ret->getTableId()] = ~0;
m_ndb.theLastTupleId[ret->getTableId()] = ~0; m_ndb.theLastTupleId[ret->getTableId()] = ~0;
addBlobTables(*ret);
return ret; return ret;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment