Commit 82c0e3b4 authored by pekka@mysql.com's avatar pekka@mysql.com

ndb charsets: metadata in TUP, TUX

parent 083e4f47
...@@ -89,7 +89,8 @@ public: ...@@ -89,7 +89,8 @@ public:
ArraySizeTooBig = 737, ArraySizeTooBig = 737,
RecordTooBig = 738, RecordTooBig = 738,
InvalidPrimaryKeySize = 739, InvalidPrimaryKeySize = 739,
NullablePrimaryKey = 740 NullablePrimaryKey = 740,
InvalidCharset = 743
}; };
private: private:
......
...@@ -130,7 +130,7 @@ private: ...@@ -130,7 +130,7 @@ private:
Uint32 keyLength; Uint32 keyLength;
Uint32 nextLCP; Uint32 nextLCP;
Uint32 noOfKeyAttr; Uint32 noOfKeyAttr;
Uint32 noOfNewAttr; Uint32 noOfNewAttr; // noOfCharsets in upper half
Uint32 checksumIndicator; Uint32 checksumIndicator;
Uint32 noOfAttributeGroups; Uint32 noOfAttributeGroups;
Uint32 GCPIndicator; Uint32 GCPIndicator;
......
...@@ -119,12 +119,13 @@ class TupAddAttrReq { ...@@ -119,12 +119,13 @@ class TupAddAttrReq {
friend class Dblqh; friend class Dblqh;
friend class Dbtux; friend class Dbtux;
public: public:
STATIC_CONST( SignalLength = 4 ); STATIC_CONST( SignalLength = 5 );
private: private:
Uint32 tupConnectPtr; Uint32 tupConnectPtr;
Uint32 notused1; Uint32 notused1;
Uint32 attrId; Uint32 attrId;
Uint32 attrDescriptor; Uint32 attrDescriptor;
Uint32 extTypeInfo;
}; };
class TupAddAttrConf { class TupAddAttrConf {
...@@ -141,6 +142,10 @@ class TupAddAttrRef { ...@@ -141,6 +142,10 @@ class TupAddAttrRef {
friend class Dbtup; friend class Dbtup;
public: public:
STATIC_CONST( SignalLength = 2 ); STATIC_CONST( SignalLength = 2 );
enum ErrorCode {
NoError = 0,
InvalidCharset = 743
};
private: private:
Uint32 userPtr; Uint32 userPtr;
Uint32 errorCode; Uint32 errorCode;
...@@ -178,7 +183,8 @@ public: ...@@ -178,7 +183,8 @@ public:
STATIC_CONST( SignalLength = 2 ); STATIC_CONST( SignalLength = 2 );
enum ErrorCode { enum ErrorCode {
NoError = 0, NoError = 0,
InvalidAttributeType = 831, InvalidAttributeType = 742,
InvalidCharset = 743,
InvalidNodeSize = 832 InvalidNodeSize = 832
}; };
private: private:
......
...@@ -90,6 +90,11 @@ public: ...@@ -90,6 +90,11 @@ public:
*/ */
static const Type& getType(Uint32 typeId); static const Type& getType(Uint32 typeId);
/**
* Get type by id but replace char type by corresponding binary type.
*/
static const Type& getTypeBinary(Uint32 typeId);
/** /**
* Check character set. * Check character set.
*/ */
......
...@@ -176,6 +176,25 @@ NdbSqlUtil::getType(Uint32 typeId) ...@@ -176,6 +176,25 @@ NdbSqlUtil::getType(Uint32 typeId)
return m_typeList[Type::Undefined]; return m_typeList[Type::Undefined];
} }
const NdbSqlUtil::Type&
NdbSqlUtil::getTypeBinary(Uint32 typeId)
{
switch (typeId) {
case Type::Char:
typeId = Type::Binary;
break;
case Type::Varchar:
typeId = Type::Varbinary;
break;
case Type::Text:
typeId = Type::Blob;
break;
default:
break;
}
return getType(typeId);
}
// compare // compare
int int
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h> #include <ndb_global.h>
#include <my_sys.h>
#define DBDICT_C #define DBDICT_C
#include "Dbdict.hpp" #include "Dbdict.hpp"
...@@ -4100,6 +4101,8 @@ Dbdict::execADD_FRAGREQ(Signal* signal) { ...@@ -4100,6 +4101,8 @@ Dbdict::execADD_FRAGREQ(Signal* signal) {
req->noOfKeyAttr = tabPtr.p->noOfPrimkey; req->noOfKeyAttr = tabPtr.p->noOfPrimkey;
req->noOfNewAttr = 0; req->noOfNewAttr = 0;
// noOfCharsets passed to TUP in upper half
req->noOfNewAttr |= (tabPtr.p->noOfCharsets << 16);
req->checksumIndicator = 1; req->checksumIndicator = 1;
req->noOfAttributeGroups = 1; req->noOfAttributeGroups = 1;
req->GCPIndicator = 0; req->GCPIndicator = 0;
...@@ -4161,6 +4164,8 @@ Dbdict::sendLQHADDATTRREQ(Signal* signal, ...@@ -4161,6 +4164,8 @@ Dbdict::sendLQHADDATTRREQ(Signal* signal,
entry.attrId = attrPtr.p->attributeId; entry.attrId = attrPtr.p->attributeId;
entry.attrDescriptor = attrPtr.p->attributeDescriptor; entry.attrDescriptor = attrPtr.p->attributeDescriptor;
entry.extTypeInfo = attrPtr.p->extType; entry.extTypeInfo = attrPtr.p->extType;
// charset number passed to TUP, TUX in upper half
entry.extTypeInfo |= (attrPtr.p->extPrecision & ~0xFFFF);
if (tabPtr.p->isIndex()) { if (tabPtr.p->isIndex()) {
Uint32 primaryAttrId; Uint32 primaryAttrId;
if (attrPtr.p->nextAttrInTable != RNIL) { if (attrPtr.p->nextAttrInTable != RNIL) {
...@@ -4697,6 +4702,8 @@ void Dbdict::handleTabInfo(SimpleProperties::Reader & it, ...@@ -4697,6 +4702,8 @@ void Dbdict::handleTabInfo(SimpleProperties::Reader & it,
Uint32 keyLength = 0; Uint32 keyLength = 0;
Uint32 attrCount = tablePtr.p->noOfAttributes; Uint32 attrCount = tablePtr.p->noOfAttributes;
Uint32 nullCount = 0; Uint32 nullCount = 0;
Uint32 noOfCharsets = 0;
Uint16 charsets[128];
Uint32 recordLength = 0; Uint32 recordLength = 0;
AttributeRecordPtr attrPtr; AttributeRecordPtr attrPtr;
c_attributeRecordHash.removeAll(); c_attributeRecordHash.removeAll();
...@@ -4751,6 +4758,31 @@ void Dbdict::handleTabInfo(SimpleProperties::Reader & it, ...@@ -4751,6 +4758,31 @@ void Dbdict::handleTabInfo(SimpleProperties::Reader & it,
attrPtr.p->extPrecision = attrDesc.AttributeExtPrecision; attrPtr.p->extPrecision = attrDesc.AttributeExtPrecision;
attrPtr.p->extScale = attrDesc.AttributeExtScale; attrPtr.p->extScale = attrDesc.AttributeExtScale;
attrPtr.p->extLength = attrDesc.AttributeExtLength; attrPtr.p->extLength = attrDesc.AttributeExtLength;
// charset in upper half of precision
unsigned csNumber = (attrPtr.p->extPrecision >> 16);
if (csNumber != 0) {
CHARSET_INFO* cs = get_charset(csNumber, MYF(0));
if (cs == NULL) {
parseP->errorCode = CreateTableRef::InvalidCharset;
parseP->errorLine = __LINE__;
return;
}
unsigned i = 0;
while (i < noOfCharsets) {
if (charsets[i] == csNumber)
break;
i++;
}
if (i == noOfCharsets) {
noOfCharsets++;
if (noOfCharsets > sizeof(charsets)/sizeof(charsets[0])) {
parseP->errorCode = CreateTableRef::InvalidFormat;
parseP->errorLine = __LINE__;
return;
}
charsets[i] = csNumber;
}
}
/** /**
* Ignore incoming old-style type and recompute it. * Ignore incoming old-style type and recompute it.
...@@ -4814,6 +4846,7 @@ void Dbdict::handleTabInfo(SimpleProperties::Reader & it, ...@@ -4814,6 +4846,7 @@ void Dbdict::handleTabInfo(SimpleProperties::Reader & it,
tablePtr.p->noOfPrimkey = keyCount; tablePtr.p->noOfPrimkey = keyCount;
tablePtr.p->noOfNullAttr = nullCount; tablePtr.p->noOfNullAttr = nullCount;
tablePtr.p->noOfCharsets = noOfCharsets;
tablePtr.p->tupKeyLength = keyLength; tablePtr.p->tupKeyLength = keyLength;
tabRequire(recordLength<= MAX_TUPLE_SIZE_IN_WORDS, tabRequire(recordLength<= MAX_TUPLE_SIZE_IN_WORDS,
......
...@@ -455,7 +455,7 @@ public: ...@@ -455,7 +455,7 @@ public:
Uint16 totalAttrReceived; Uint16 totalAttrReceived;
Uint16 fragCopyCreation; Uint16 fragCopyCreation;
Uint16 noOfKeyAttr; Uint16 noOfKeyAttr;
Uint16 noOfNewAttr; Uint32 noOfNewAttr; // noOfCharsets in upper half
Uint16 noOfAttributeGroups; Uint16 noOfAttributeGroups;
Uint16 lh3DistrBits; Uint16 lh3DistrBits;
Uint16 tableType; Uint16 tableType;
......
...@@ -1444,6 +1444,7 @@ Dblqh::sendAddAttrReq(Signal* signal) ...@@ -1444,6 +1444,7 @@ Dblqh::sendAddAttrReq(Signal* signal)
tupreq->notused1 = 0; tupreq->notused1 = 0;
tupreq->attrId = attrId; tupreq->attrId = attrId;
tupreq->attrDescriptor = entry.attrDescriptor; tupreq->attrDescriptor = entry.attrDescriptor;
tupreq->extTypeInfo = entry.extTypeInfo;
sendSignal(fragptr.p->tupBlockref, GSN_TUP_ADD_ATTRREQ, sendSignal(fragptr.p->tupBlockref, GSN_TUP_ADD_ATTRREQ,
signal, TupAddAttrReq::SignalLength, JBB); signal, TupAddAttrReq::SignalLength, JBB);
return; return;
......
...@@ -22,26 +22,59 @@ class AttributeOffset { ...@@ -22,26 +22,59 @@ class AttributeOffset {
private: private:
static void setOffset(Uint32 & desc, Uint32 offset); static void setOffset(Uint32 & desc, Uint32 offset);
static void setCharsetPos(Uint32 & desc, Uint32 offset);
static void setNullFlagPos(Uint32 & desc, Uint32 offset); static void setNullFlagPos(Uint32 & desc, Uint32 offset);
static Uint32 getOffset(const Uint32 &); static Uint32 getOffset(const Uint32 &);
static bool getCharsetFlag(const Uint32 &);
static Uint32 getCharsetPos(const Uint32 &);
static Uint32 getNullFlagPos(const Uint32 &); static Uint32 getNullFlagPos(const Uint32 &);
static Uint32 getNullFlagOffset(const Uint32 &); static Uint32 getNullFlagOffset(const Uint32 &);
static Uint32 getNullFlagBitOffset(const Uint32 &); static Uint32 getNullFlagBitOffset(const Uint32 &);
static bool isNULL(const Uint32 &, const Uint32 &); static bool isNULL(const Uint32 &, const Uint32 &);
}; };
#define AO_ATTRIBUTE_OFFSET_MASK (0xffff) /**
#define AO_NULL_FLAG_POS_MASK (0x7ff) * Allow for 4096 attributes, all nullable, and for 128 different
#define AO_NULL_FLAG_POS_SHIFT (21) * character sets.
#define AO_NULL_FLAG_WORD_MASK (31) *
#define AO_NULL_FLAG_OFFSET_SHIFT (5) * a = Attribute offset - 11 bits 0-10 ( addr word in 8 kb )
* c = Has charset flag 1 bits 11-11
* s = Charset pointer position - 7 bits 12-18 ( in table descriptor )
* f = Null flag offset in word - 5 bits 20-24 ( address 32 bits )
* w = Null word offset - 7 bits 25-32 ( f+w addr 4096 attrs )
*
* 1111111111222222222233
* 01234567890123456789012345678901
* aaaaaaaaaaacsssssss fffffwwwwwww
*/
#define AO_ATTRIBUTE_OFFSET_SHIFT 0
#define AO_ATTRIBUTE_OFFSET_MASK 0x7ff
#define AO_CHARSET_FLAG_SHIFT 11
#define AO_CHARSET_POS_SHIFT 12
#define AO_CHARSET_POS_MASK 127
#define AO_NULL_FLAG_POS_MASK 0xfff // f+w
#define AO_NULL_FLAG_POS_SHIFT 20
#define AO_NULL_FLAG_WORD_MASK 31 // f
#define AO_NULL_FLAG_OFFSET_SHIFT 5
inline inline
void void
AttributeOffset::setOffset(Uint32 & desc, Uint32 offset){ AttributeOffset::setOffset(Uint32 & desc, Uint32 offset){
ASSERT_MAX(offset, AO_ATTRIBUTE_OFFSET_MASK, "AttributeOffset::setOffset"); ASSERT_MAX(offset, AO_ATTRIBUTE_OFFSET_MASK, "AttributeOffset::setOffset");
desc |= offset; desc |= (offset << AO_ATTRIBUTE_OFFSET_SHIFT);
}
inline
void
AttributeOffset::setCharsetPos(Uint32 & desc, Uint32 offset) {
ASSERT_MAX(offset, AO_CHARSET_POS_MASK, "AttributeOffset::setCharsetPos");
desc |= (1 << AO_CHARSET_FLAG_SHIFT);
desc |= (offset << AO_CHARSET_POS_SHIFT);
} }
inline inline
...@@ -55,7 +88,21 @@ inline ...@@ -55,7 +88,21 @@ inline
Uint32 Uint32
AttributeOffset::getOffset(const Uint32 & desc) AttributeOffset::getOffset(const Uint32 & desc)
{ {
return desc & AO_ATTRIBUTE_OFFSET_MASK; return (desc >> AO_ATTRIBUTE_OFFSET_SHIFT) & AO_ATTRIBUTE_OFFSET_MASK;
}
inline
bool
AttributeOffset::getCharsetFlag(const Uint32 & desc)
{
return (desc >> AO_CHARSET_FLAG_SHIFT) & 1;
}
inline
Uint32
AttributeOffset::getCharsetPos(const Uint32 & desc)
{
return (desc >> AO_CHARSET_POS_SHIFT) & AO_CHARSET_POS_MASK;
} }
inline inline
......
...@@ -502,6 +502,7 @@ struct Fragoperrec { ...@@ -502,6 +502,7 @@ struct Fragoperrec {
Uint32 attributeCount; Uint32 attributeCount;
Uint32 freeNullBit; Uint32 freeNullBit;
Uint32 noOfNewAttrCount; Uint32 noOfNewAttrCount;
Uint32 charsetIndex;
BlockReference lqhBlockrefFrag; BlockReference lqhBlockrefFrag;
}; };
typedef Ptr<Fragoperrec> FragoperrecPtr; typedef Ptr<Fragoperrec> FragoperrecPtr;
...@@ -785,6 +786,7 @@ struct Tablerec { ...@@ -785,6 +786,7 @@ struct Tablerec {
ReadFunction* readFunctionArray; ReadFunction* readFunctionArray;
UpdateFunction* updateFunctionArray; UpdateFunction* updateFunctionArray;
CHARSET_INFO** charsetArray;
Uint32 readKeyArray; Uint32 readKeyArray;
Uint32 tabDescriptor; Uint32 tabDescriptor;
...@@ -796,6 +798,7 @@ struct Tablerec { ...@@ -796,6 +798,7 @@ struct Tablerec {
Uint16 tupheadsize; Uint16 tupheadsize;
Uint16 noOfAttr; Uint16 noOfAttr;
Uint16 noOfKeyAttr; Uint16 noOfKeyAttr;
Uint16 noOfCharsets;
Uint16 noOfNewAttr; Uint16 noOfNewAttr;
Uint16 noOfNullAttr; Uint16 noOfNullAttr;
Uint16 noOfAttributeGroups; Uint16 noOfAttributeGroups;
...@@ -1909,7 +1912,8 @@ private: ...@@ -1909,7 +1912,8 @@ private:
void updatePackedList(Signal* signal, Uint16 ahostIndex); void updatePackedList(Signal* signal, Uint16 ahostIndex);
void setUpDescriptorReferences(Uint32 descriptorReference, void setUpDescriptorReferences(Uint32 descriptorReference,
Tablerec* const regTabPtr); Tablerec* const regTabPtr,
const Uint32* offset);
void setUpKeyArray(Tablerec* const regTabPtr); void setUpKeyArray(Tablerec* const regTabPtr);
bool addfragtotab(Tablerec* const regTabPtr, Uint32 fragId, Uint32 fragIndex); bool addfragtotab(Tablerec* const regTabPtr, Uint32 fragId, Uint32 fragIndex);
void deleteFragTab(Tablerec* const regTabPtr, Uint32 fragId); void deleteFragTab(Tablerec* const regTabPtr, Uint32 fragId);
...@@ -2098,7 +2102,8 @@ private: ...@@ -2098,7 +2102,8 @@ private:
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// Public methods // Public methods
Uint32 allocTabDescr(Uint32 noOfAttributes, Uint32 noOfKeyAttr, Uint32 noOfAttributeGroups); Uint32 getTabDescrOffsets(const Tablerec* regTabPtr, Uint32* offset);
Uint32 allocTabDescr(const Tablerec* regTabPtr, Uint32* offset);
void freeTabDescr(Uint32 retRef, Uint32 retNo); void freeTabDescr(Uint32 retRef, Uint32 retNo);
Uint32 getTabDescrWord(Uint32 index); Uint32 getTabDescrWord(Uint32 index);
void setTabDescrWord(Uint32 index, Uint32 word); void setTabDescrWord(Uint32 index, Uint32 word);
......
...@@ -1067,6 +1067,7 @@ Dbtup::initTab(Tablerec* const regTabPtr) ...@@ -1067,6 +1067,7 @@ Dbtup::initTab(Tablerec* const regTabPtr)
}//for }//for
regTabPtr->readFunctionArray = NULL; regTabPtr->readFunctionArray = NULL;
regTabPtr->updateFunctionArray = NULL; regTabPtr->updateFunctionArray = NULL;
regTabPtr->charsetArray = NULL;
regTabPtr->tabDescriptor = RNIL; regTabPtr->tabDescriptor = RNIL;
regTabPtr->attributeGroupDescriptor = RNIL; regTabPtr->attributeGroupDescriptor = RNIL;
......
...@@ -20,12 +20,14 @@ ...@@ -20,12 +20,14 @@
#include <RefConvert.hpp> #include <RefConvert.hpp>
#include <ndb_limits.h> #include <ndb_limits.h>
#include <pc.hpp> #include <pc.hpp>
#include <signaldata/TupFrag.hpp>
#include <signaldata/FsConf.hpp> #include <signaldata/FsConf.hpp>
#include <signaldata/FsRemoveReq.hpp> #include <signaldata/FsRemoveReq.hpp>
#include <signaldata/DropTab.hpp> #include <signaldata/DropTab.hpp>
#include <signaldata/AlterTab.hpp> #include <signaldata/AlterTab.hpp>
#include <AttributeDescriptor.hpp> #include <AttributeDescriptor.hpp>
#include "AttributeOffset.hpp" #include "AttributeOffset.hpp"
#include <my_sys.h>
#define ljam() { jamLine(20000 + __LINE__); } #define ljam() { jamLine(20000 + __LINE__); }
#define ljamEntry() { jamEntryLine(20000 + __LINE__); } #define ljamEntry() { jamEntryLine(20000 + __LINE__); }
...@@ -52,7 +54,10 @@ void Dbtup::execTUPFRAGREQ(Signal* signal) ...@@ -52,7 +54,10 @@ void Dbtup::execTUPFRAGREQ(Signal* signal)
/* Uint32 schemaVersion = signal->theData[8];*/ /* Uint32 schemaVersion = signal->theData[8];*/
Uint32 noOfKeyAttr = signal->theData[9]; Uint32 noOfKeyAttr = signal->theData[9];
Uint32 noOfNewAttr = signal->theData[10]; Uint32 noOfNewAttr = (signal->theData[10] & 0xFFFF);
/* DICT sends number of character sets in upper half */
Uint32 noOfCharsets = (signal->theData[10] >> 16);
Uint32 checksumIndicator = signal->theData[11]; Uint32 checksumIndicator = signal->theData[11];
Uint32 noOfAttributeGroups = signal->theData[12]; Uint32 noOfAttributeGroups = signal->theData[12];
Uint32 globalCheckpointIdIndicator = signal->theData[13]; Uint32 globalCheckpointIdIndicator = signal->theData[13];
...@@ -75,6 +80,7 @@ void Dbtup::execTUPFRAGREQ(Signal* signal) ...@@ -75,6 +80,7 @@ void Dbtup::execTUPFRAGREQ(Signal* signal)
fragOperPtr.p->attributeCount = noOfAttributes; fragOperPtr.p->attributeCount = noOfAttributes;
fragOperPtr.p->freeNullBit = noOfNullAttr; fragOperPtr.p->freeNullBit = noOfNullAttr;
fragOperPtr.p->noOfNewAttrCount = noOfNewAttr; fragOperPtr.p->noOfNewAttrCount = noOfNewAttr;
fragOperPtr.p->charsetIndex = 0;
ndbrequire(reqinfo == ZADDFRAG); ndbrequire(reqinfo == ZADDFRAG);
...@@ -156,6 +162,7 @@ void Dbtup::execTUPFRAGREQ(Signal* signal) ...@@ -156,6 +162,7 @@ void Dbtup::execTUPFRAGREQ(Signal* signal)
regTabPtr.p->tupheadsize = regTabPtr.p->tupGCPIndex; regTabPtr.p->tupheadsize = regTabPtr.p->tupGCPIndex;
regTabPtr.p->noOfKeyAttr = noOfKeyAttr; regTabPtr.p->noOfKeyAttr = noOfKeyAttr;
regTabPtr.p->noOfCharsets = noOfCharsets;
regTabPtr.p->noOfAttr = noOfAttributes; regTabPtr.p->noOfAttr = noOfAttributes;
regTabPtr.p->noOfNewAttr = noOfNewAttr; regTabPtr.p->noOfNewAttr = noOfNewAttr;
regTabPtr.p->noOfNullAttr = noOfNullAttr; regTabPtr.p->noOfNullAttr = noOfNullAttr;
...@@ -163,13 +170,14 @@ void Dbtup::execTUPFRAGREQ(Signal* signal) ...@@ -163,13 +170,14 @@ void Dbtup::execTUPFRAGREQ(Signal* signal)
regTabPtr.p->notNullAttributeMask.clear(); regTabPtr.p->notNullAttributeMask.clear();
Uint32 tableDescriptorRef = allocTabDescr(noOfAttributes, noOfKeyAttr, noOfAttributeGroups); Uint32 offset[10];
Uint32 tableDescriptorRef = allocTabDescr(regTabPtr.p, offset);
if (tableDescriptorRef == RNIL) { if (tableDescriptorRef == RNIL) {
ljam(); ljam();
fragrefuse4Lab(signal, fragOperPtr, regFragPtr, regTabPtr.p, fragId); fragrefuse4Lab(signal, fragOperPtr, regFragPtr, regTabPtr.p, fragId);
return; return;
}//if }//if
setUpDescriptorReferences(tableDescriptorRef, regTabPtr.p); setUpDescriptorReferences(tableDescriptorRef, regTabPtr.p, offset);
} else { } else {
ljam(); ljam();
fragOperPtr.p->definingFragment = false; fragOperPtr.p->definingFragment = false;
...@@ -251,6 +259,9 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal) ...@@ -251,6 +259,9 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal)
ptrCheckGuard(fragOperPtr, cnoOfFragoprec, fragoperrec); ptrCheckGuard(fragOperPtr, cnoOfFragoprec, fragoperrec);
Uint32 attrId = signal->theData[2]; Uint32 attrId = signal->theData[2];
Uint32 attrDescriptor = signal->theData[3]; Uint32 attrDescriptor = signal->theData[3];
// DICT sends extended type (ignored) and charset number
Uint32 extType = (signal->theData[4] & 0xFF);
Uint32 csNumber = (signal->theData[4] >> 16);
regTabPtr.i = fragOperPtr.p->tableidFrag; regTabPtr.i = fragOperPtr.p->tableidFrag;
ptrCheckGuard(regTabPtr, cnoOfTablerec, tablerec); ptrCheckGuard(regTabPtr, cnoOfTablerec, tablerec);
...@@ -304,6 +315,29 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal) ...@@ -304,6 +315,29 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal)
} else { } else {
ndbrequire(false); ndbrequire(false);
}//if }//if
if (csNumber != 0) {
CHARSET_INFO* cs = get_charset(csNumber, MYF(0));
if (cs == NULL) {
ljam();
terrorCode = TupAddAttrRef::InvalidCharset;
addattrrefuseLab(signal, regFragPtr, fragOperPtr, regTabPtr.p, fragId);
return;
}
Uint32 i = 0;
while (i < fragOperPtr.p->charsetIndex) {
ljam();
if (regTabPtr.p->charsetArray[i] == cs)
break;
i++;
}
if (i == fragOperPtr.p->charsetIndex) {
ljam();
ndbrequire(i < regTabPtr.p->noOfCharsets);
regTabPtr.p->charsetArray[i] = cs;
AttributeOffset::setCharsetPos(attrDes2, i);
fragOperPtr.p->charsetIndex++;
}
}
setTabDescrWord(firstTabDesIndex + 1, attrDes2); setTabDescrWord(firstTabDesIndex + 1, attrDes2);
if (regTabPtr.p->tupheadsize > MAX_TUPLE_SIZE_IN_WORDS) { if (regTabPtr.p->tupheadsize > MAX_TUPLE_SIZE_IN_WORDS) {
...@@ -340,20 +374,28 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal) ...@@ -340,20 +374,28 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal)
return; return;
}//Dbtup::execTUP_ADD_ATTRREQ() }//Dbtup::execTUP_ADD_ATTRREQ()
/*
* Descriptor has these parts:
*
* 0 readFunctionArray ( one for each attribute )
* 1 updateFunctionArray ( ditto )
* 2 charsetArray ( pointers to distinct CHARSET_INFO )
* 3 readKeyArray ( attribute ids of keys )
* 4 attributeGroupDescriptor ( currently size 1 but unused )
* 5 tabDescriptor ( attribute descriptors, each ZAD_SIZE )
*/
void Dbtup::setUpDescriptorReferences(Uint32 descriptorReference, void Dbtup::setUpDescriptorReferences(Uint32 descriptorReference,
Tablerec* const regTabPtr) Tablerec* const regTabPtr,
const Uint32* offset)
{ {
Uint32 noOfAttributes = regTabPtr->noOfAttr; Uint32* desc = &tableDescriptor[descriptorReference].tabDescr;
descriptorReference += ZTD_SIZE; regTabPtr->readFunctionArray = (ReadFunction*)(desc + offset[0]);
ReadFunction * tmp = (ReadFunction*)&tableDescriptor[descriptorReference].tabDescr; regTabPtr->updateFunctionArray = (UpdateFunction*)(desc + offset[1]);
regTabPtr->readFunctionArray = tmp; regTabPtr->charsetArray = (CHARSET_INFO**)(desc + offset[2]);
regTabPtr->updateFunctionArray = (UpdateFunction*)(tmp + noOfAttributes); regTabPtr->readKeyArray = descriptorReference + offset[3];
regTabPtr->attributeGroupDescriptor = descriptorReference + offset[4];
TableDescriptor * start = &tableDescriptor[descriptorReference]; regTabPtr->tabDescriptor = descriptorReference + offset[5];
TableDescriptor * end = (TableDescriptor*)(tmp + 2 * noOfAttributes);
regTabPtr->readKeyArray = descriptorReference + (end - start);
regTabPtr->attributeGroupDescriptor = regTabPtr->readKeyArray + regTabPtr->noOfKeyAttr;
regTabPtr->tabDescriptor = regTabPtr->attributeGroupDescriptor + regTabPtr->noOfAttributeGroups;
}//Dbtup::setUpDescriptorReferences() }//Dbtup::setUpDescriptorReferences()
Uint32 Uint32
...@@ -491,14 +533,18 @@ void Dbtup::releaseTabDescr(Tablerec* const regTabPtr) ...@@ -491,14 +533,18 @@ void Dbtup::releaseTabDescr(Tablerec* const regTabPtr)
Uint32 descriptor = regTabPtr->readKeyArray; Uint32 descriptor = regTabPtr->readKeyArray;
if (descriptor != RNIL) { if (descriptor != RNIL) {
ljam(); ljam();
Uint32 offset[10];
getTabDescrOffsets(regTabPtr, offset);
regTabPtr->tabDescriptor = RNIL; regTabPtr->tabDescriptor = RNIL;
regTabPtr->readKeyArray = RNIL; regTabPtr->readKeyArray = RNIL;
regTabPtr->readFunctionArray = NULL; regTabPtr->readFunctionArray = NULL;
regTabPtr->updateFunctionArray = NULL; regTabPtr->updateFunctionArray = NULL;
regTabPtr->charsetArray = NULL;
regTabPtr->attributeGroupDescriptor= RNIL; regTabPtr->attributeGroupDescriptor= RNIL;
Uint32 sizeFunctionArrays = 2 * (regTabPtr->noOfAttr * sizeOfReadFunction()); // move to start of descriptor
descriptor -= (sizeFunctionArrays + ZTD_SIZE); descriptor -= offset[3];
Uint32 retNo = getTabDescrWord(descriptor + ZTD_DATASIZE); Uint32 retNo = getTabDescrWord(descriptor + ZTD_DATASIZE);
ndbrequire(getTabDescrWord(descriptor + ZTD_HEADER) == ZTD_TYPE_NORMAL); ndbrequire(getTabDescrWord(descriptor + ZTD_HEADER) == ZTD_TYPE_NORMAL);
ndbrequire(retNo == getTabDescrWord((descriptor + retNo) - ZTD_TR_SIZE)); ndbrequire(retNo == getTabDescrWord((descriptor + retNo) - ZTD_TR_SIZE));
......
...@@ -31,12 +31,33 @@ ...@@ -31,12 +31,33 @@
/* memory attached to fragments (could be allocated per table */ /* memory attached to fragments (could be allocated per table */
/* instead. Performs its task by a buddy algorithm. */ /* instead. Performs its task by a buddy algorithm. */
/* **************************************************************** */ /* **************************************************************** */
Uint32 Dbtup::allocTabDescr(Uint32 noOfAttributes, Uint32 noOfKeyAttr, Uint32 noOfAttributeGroups)
Uint32
Dbtup::getTabDescrOffsets(const Tablerec* regTabPtr, Uint32* offset)
{
// belongs to configure.in
unsigned sizeOfPointer = sizeof(CHARSET_INFO*);
ndbrequire((sizeOfPointer & 0x3) == 0);
sizeOfPointer = (sizeOfPointer >> 2);
// do in layout order and return offsets (see DbtupMeta.cpp)
Uint32 allocSize = 0;
// magically aligned to 8 bytes
offset[0] = allocSize += ZTD_SIZE;
offset[1] = allocSize += regTabPtr->noOfAttr * sizeOfReadFunction();
offset[2] = allocSize += regTabPtr->noOfAttr * sizeOfReadFunction();
offset[3] = allocSize += regTabPtr->noOfCharsets * sizeOfPointer;
offset[4] = allocSize += regTabPtr->noOfKeyAttr;
offset[5] = allocSize += regTabPtr->noOfAttributeGroups;
allocSize += regTabPtr->noOfAttr * ZAD_SIZE;
allocSize += ZTD_TRAILER_SIZE;
// return number of words
return allocSize;
}
Uint32 Dbtup::allocTabDescr(const Tablerec* regTabPtr, Uint32* offset)
{ {
Uint32 reference = RNIL; Uint32 reference = RNIL;
Uint32 allocSize = (ZTD_SIZE + ZTD_TRAILER_SIZE) + (noOfAttributes * ZAD_SIZE); Uint32 allocSize = getTabDescrOffsets(regTabPtr, offset);
allocSize += noOfAttributeGroups;
allocSize += ((2 * noOfAttributes * sizeOfReadFunction()) + noOfKeyAttr);
/* ---------------------------------------------------------------- */ /* ---------------------------------------------------------------- */
/* ALWAYS ALLOCATE A MULTIPLE OF 16 BYTES */ /* ALWAYS ALLOCATE A MULTIPLE OF 16 BYTES */
/* ---------------------------------------------------------------- */ /* ---------------------------------------------------------------- */
......
...@@ -330,11 +330,15 @@ private: ...@@ -330,11 +330,15 @@ private:
/* /*
* Attribute metadata. Size must be multiple of word size. * Attribute metadata. Size must be multiple of word size.
*
* Prefix comparison of char data must use strxfrm and binary
* comparison. The charset is currently unused.
*/ */
struct DescAttr { struct DescAttr {
Uint32 m_attrDesc; // standard AttributeDescriptor Uint32 m_attrDesc; // standard AttributeDescriptor
Uint16 m_primaryAttrId; Uint16 m_primaryAttrId;
Uint16 m_typeId; unsigned m_typeId : 6;
unsigned m_charset : 10;
}; };
static const unsigned DescAttrSize = sizeof(DescAttr) >> 2; static const unsigned DescAttrSize = sizeof(DescAttr) >> 2;
......
...@@ -178,19 +178,31 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal) ...@@ -178,19 +178,31 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal)
descAttr.m_attrDesc = req->attrDescriptor; descAttr.m_attrDesc = req->attrDescriptor;
descAttr.m_primaryAttrId = req->primaryAttrId; descAttr.m_primaryAttrId = req->primaryAttrId;
descAttr.m_typeId = req->extTypeInfo & 0xFF; descAttr.m_typeId = req->extTypeInfo & 0xFF;
descAttr.m_charset = (req->extTypeInfo >> 16);
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugMeta) { if (debugFlags & DebugMeta) {
debugOut << "Add frag " << fragPtr.i << " attr " << attrId << " " << descAttr << endl; debugOut << "Add frag " << fragPtr.i << " attr " << attrId << " " << descAttr << endl;
} }
#endif #endif
// check if type is valid and has a comparison method // check that type is valid and has a binary comparison method
const NdbSqlUtil::Type& type = NdbSqlUtil::getType(descAttr.m_typeId); const NdbSqlUtil::Type& type = NdbSqlUtil::getTypeBinary(descAttr.m_typeId);
if (type.m_typeId == NdbSqlUtil::Type::Undefined || if (type.m_typeId == NdbSqlUtil::Type::Undefined ||
type.m_cmp == 0) { type.m_cmp == 0) {
jam(); jam();
errorCode = TuxAddAttrRef::InvalidAttributeType; errorCode = TuxAddAttrRef::InvalidAttributeType;
break; break;
} }
#ifdef dbtux_uses_charset
if (descAttr.m_charset != 0) {
CHARSET_INFO *cs = get_charset(descAttr.m_charset, MYF(0));
// here use the non-binary type
if (! NdbSqlUtil::usable_in_ordered_index(descAttr.m_typeId, cs)) {
jam();
errorCode = TuxAddAttrRef::InvalidCharset;
break;
}
}
#endif
if (indexPtr.p->m_numAttrs == fragOpPtr.p->m_numAttrsRecvd) { if (indexPtr.p->m_numAttrs == fragOpPtr.p->m_numAttrsRecvd) {
jam(); jam();
// initialize tree header // initialize tree header
......
...@@ -107,6 +107,9 @@ public: ...@@ -107,6 +107,9 @@ public:
/* Number of primary key attributes (should be computed) */ /* Number of primary key attributes (should be computed) */
Uint16 noOfPrimkey; Uint16 noOfPrimkey;
/* Number of distinct character sets (computed) */
Uint16 noOfCharsets;
/* Length of primary key in words (should be computed) */ /* Length of primary key in words (should be computed) */
/* For ordered index this is tree node size in words */ /* For ordered index this is tree node size in words */
Uint16 tupKeyLength; Uint16 tupKeyLength;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment