moved sorting and duplicate check for unique indexes from api to kernel

parent 3b1703b1
SUBDIRS = src tools . include @ndb_opt_subdirs@ SUBDIRS = src tools . include @ndb_opt_subdirs@
DIST_SUBDIRS = src tools include test docs DIST_SUBDIRS = src tools include test docs
EXTRA_DIST = config EXTRA_DIST = config ndbapi-examples
include $(top_srcdir)/ndb/config/common.mk.am include $(top_srcdir)/ndb/config/common.mk.am
dist-hook: dist-hook:
-rm -rf `find $(distdir) -type d -name SCCS` -rm -rf `find $(distdir) -type d -name SCCS`
-rm -rf `find $(distdir) -type d -name old_files` -rm -rf `find $(distdir) -type d -name old_files`
-rm -rf `find $(distdir)/ndbapi-examples -name '*.o'`
list='$(SUBDIRS)'; for subdir in $$list; do \ list='$(SUBDIRS)'; for subdir in $$list; do \
if test "$$subdir" != "." -a "$$subdir" != "include"; then \ if test "$$subdir" != "." -a "$$subdir" != "include"; then \
files="`find $$subdir -name '*\.h'` `find $$subdir -name '*\.hpp'`"; \ files="`find $$subdir -name '*\.h'` `find $$subdir -name '*\.hpp'`"; \
......
...@@ -207,7 +207,7 @@ public: ...@@ -207,7 +207,7 @@ public:
NotUnique = 4251, NotUnique = 4251,
AllocationError = 4252, AllocationError = 4252,
CreateIndexTableFailed = 4253, CreateIndexTableFailed = 4253,
InvalidAttributeOrder = 4255 DuplicateAttributes = 4258
}; };
private: private:
......
...@@ -6301,6 +6301,7 @@ Dbdict::createIndex_slavePrepare(Signal* signal, OpCreateIndexPtr opPtr) ...@@ -6301,6 +6301,7 @@ Dbdict::createIndex_slavePrepare(Signal* signal, OpCreateIndexPtr opPtr)
void void
Dbdict::createIndex_toCreateTable(Signal* signal, OpCreateIndexPtr opPtr) Dbdict::createIndex_toCreateTable(Signal* signal, OpCreateIndexPtr opPtr)
{ {
Uint32 attrid_map[MAX_ATTRIBUTES_IN_INDEX];
Uint32 k; Uint32 k;
jam(); jam();
const CreateIndxReq* const req = &opPtr.p->m_request; const CreateIndxReq* const req = &opPtr.p->m_request;
...@@ -6369,39 +6370,49 @@ Dbdict::createIndex_toCreateTable(Signal* signal, OpCreateIndexPtr opPtr) ...@@ -6369,39 +6370,49 @@ Dbdict::createIndex_toCreateTable(Signal* signal, OpCreateIndexPtr opPtr)
// tree node size in words (make configurable later) // tree node size in words (make configurable later)
indexPtr.p->tupKeyLength = MAX_TTREE_NODE_SIZE; indexPtr.p->tupKeyLength = MAX_TTREE_NODE_SIZE;
} }
// hash index attributes must currently be in table order
Uint32 prevAttrId = RNIL; AttributeMask mask;
mask.clear();
for (k = 0; k < opPtr.p->m_attrList.sz; k++) { for (k = 0; k < opPtr.p->m_attrList.sz; k++) {
jam(); jam();
bool found = false; unsigned current_id= opPtr.p->m_attrList.id[k];
for (Uint32 tAttr = tablePtr.p->firstAttribute; tAttr != RNIL; ) { AttributeRecord* aRec= NULL;
AttributeRecord* aRec = c_attributeRecordPool.getPtr(tAttr); Uint32 tAttr= tablePtr.p->firstAttribute;
tAttr = aRec->nextAttrInTable; for (; tAttr != RNIL; tAttr= aRec->nextAttrInTable)
if (aRec->attributeId != opPtr.p->m_attrList.id[k]) {
aRec = c_attributeRecordPool.getPtr(tAttr);
if (aRec->attributeId != current_id)
continue; continue;
jam(); jam();
found = true; break;
const Uint32 a = aRec->attributeDescriptor;
if (indexPtr.p->isHashIndex()) {
const Uint32 s1 = AttributeDescriptor::getSize(a);
const Uint32 s2 = AttributeDescriptor::getArraySize(a);
indexPtr.p->tupKeyLength += ((1 << s1) * s2 + 31) >> 5;
}
} }
if (! found) { if (tAttr == RNIL) {
jam(); jam();
opPtr.p->m_errorCode = CreateIndxRef::BadRequestType; opPtr.p->m_errorCode = CreateIndxRef::BadRequestType;
opPtr.p->m_errorLine = __LINE__; opPtr.p->m_errorLine = __LINE__;
return; return;
} }
if (indexPtr.p->isHashIndex() && if (mask.get(current_id))
k > 0 && prevAttrId >= opPtr.p->m_attrList.id[k]) { {
jam(); jam();
opPtr.p->m_errorCode = CreateIndxRef::InvalidAttributeOrder; opPtr.p->m_errorCode = CreateIndxRef::DuplicateAttributes;
opPtr.p->m_errorLine = __LINE__; opPtr.p->m_errorLine = __LINE__;
return; return;
} }
prevAttrId = opPtr.p->m_attrList.id[k]; mask.set(current_id);
const Uint32 a = aRec->attributeDescriptor;
unsigned kk= k;
if (indexPtr.p->isHashIndex()) {
const Uint32 s1 = AttributeDescriptor::getSize(a);
const Uint32 s2 = AttributeDescriptor::getArraySize(a);
indexPtr.p->tupKeyLength += ((1 << s1) * s2 + 31) >> 5;
// reorder the attributes according to the tableid order
// for unque indexes
for (; kk > 0 && current_id < attrid_map[kk-1]>>16; kk--)
attrid_map[kk]= attrid_map[kk-1];
}
attrid_map[kk]= k | (current_id << 16);
} }
indexPtr.p->noOfPrimkey = indexPtr.p->noOfAttributes; indexPtr.p->noOfPrimkey = indexPtr.p->noOfAttributes;
// plus concatenated primary table key attribute // plus concatenated primary table key attribute
...@@ -6421,12 +6432,17 @@ Dbdict::createIndex_toCreateTable(Signal* signal, OpCreateIndexPtr opPtr) ...@@ -6421,12 +6432,17 @@ Dbdict::createIndex_toCreateTable(Signal* signal, OpCreateIndexPtr opPtr)
// write index key attributes // write index key attributes
AttributeRecordPtr aRecPtr; AttributeRecordPtr aRecPtr;
c_attributeRecordPool.getPtr(aRecPtr, tablePtr.p->firstAttribute); c_attributeRecordPool.getPtr(aRecPtr, tablePtr.p->firstAttribute);
for (k = 0; k < opPtr.p->m_attrList.sz; k++) { for (unsigned k = 0; k < opPtr.p->m_attrList.sz; k++) {
// insert the attributes in the order decided above in attrid_map
// k is new order, current_id is in previous order
// ToDo: make sure "current_id" is stored with the table and
// passed up to NdbDictionary
unsigned current_id= opPtr.p->m_attrList.id[attrid_map[k] & 0xffff];
jam(); jam();
for (Uint32 tAttr = tablePtr.p->firstAttribute; tAttr != RNIL; ) { for (Uint32 tAttr = tablePtr.p->firstAttribute; tAttr != RNIL; ) {
AttributeRecord* aRec = c_attributeRecordPool.getPtr(tAttr); AttributeRecord* aRec = c_attributeRecordPool.getPtr(tAttr);
tAttr = aRec->nextAttrInTable; tAttr = aRec->nextAttrInTable;
if (aRec->attributeId != opPtr.p->m_attrList.id[k]) if (aRec->attributeId != current_id)
continue; continue;
jam(); jam();
const Uint32 a = aRec->attributeDescriptor; const Uint32 a = aRec->attributeDescriptor;
......
...@@ -2148,26 +2148,6 @@ NdbDictInterface::createIndex(Ndb & ndb, ...@@ -2148,26 +2148,6 @@ NdbDictInterface::createIndex(Ndb & ndb,
} }
attributeList.id[i] = col->m_attrId; attributeList.id[i] = col->m_attrId;
} }
if (it == DictTabInfo::UniqueHashIndex) {
// Sort index attributes according to primary table (using insertion sort)
for(i = 1; i < attributeList.sz; i++) {
unsigned int temp = attributeList.id[i];
unsigned int j = i;
while((j > 0) && (attributeList.id[j - 1] > temp)) {
attributeList.id[j] = attributeList.id[j - 1];
j--;
}
attributeList.id[j] = temp;
}
// Check for illegal duplicate attributes
for(i = 0; i<attributeList.sz; i++) {
if ((i != (attributeList.sz - 1)) &&
(attributeList.id[i] == attributeList.id[i+1])) {
m_error.code = 4258;
return -1;
}
}
}
LinearSectionPtr ptr[3]; LinearSectionPtr ptr[3];
ptr[0].p = (Uint32*)&attributeList; ptr[0].p = (Uint32*)&attributeList;
ptr[0].sz = 1 + attributeList.sz; ptr[0].sz = 1 + attributeList.sz;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment