Commit b2987e0a authored by joreland@mysql.com's avatar joreland@mysql.com

Add general n-bitfields to ndb storage engine

parent b67dba95
...@@ -230,6 +230,7 @@ public: ...@@ -230,6 +230,7 @@ public:
STATIC_CONST( StringType = 3 ); STATIC_CONST( StringType = 3 );
// AttributeSize constants // AttributeSize constants
STATIC_CONST( aBit = 0 );
STATIC_CONST( an8Bit = 3 ); STATIC_CONST( an8Bit = 3 );
STATIC_CONST( a16Bit = 4 ); STATIC_CONST( a16Bit = 4 );
STATIC_CONST( a32Bit = 5 ); STATIC_CONST( a32Bit = 5 );
...@@ -310,7 +311,8 @@ public: ...@@ -310,7 +311,8 @@ public:
ExtDatetime = NdbSqlUtil::Type::Datetime, ExtDatetime = NdbSqlUtil::Type::Datetime,
ExtTimespec = NdbSqlUtil::Type::Timespec, ExtTimespec = NdbSqlUtil::Type::Timespec,
ExtBlob = NdbSqlUtil::Type::Blob, ExtBlob = NdbSqlUtil::Type::Blob,
ExtText = NdbSqlUtil::Type::Text ExtText = NdbSqlUtil::Type::Text,
ExtBit = NdbSqlUtil::Type::Bit
}; };
// Attribute data interpretation // Attribute data interpretation
...@@ -440,7 +442,13 @@ public: ...@@ -440,7 +442,13 @@ public:
// head + inline part [ attr precision lower half ] // head + inline part [ attr precision lower half ]
AttributeArraySize = (NDB_BLOB_HEAD_SIZE << 2) + (AttributeExtPrecision & 0xFFFF); AttributeArraySize = (NDB_BLOB_HEAD_SIZE << 2) + (AttributeExtPrecision & 0xFFFF);
return true; return true;
case DictTabInfo::ExtBit:
AttributeType = DictTabInfo::UnSignedType;
AttributeSize = DictTabInfo::aBit;
AttributeArraySize = AttributeExtLength;
return true;
}; };
return false; return false;
} }
......
...@@ -187,7 +187,8 @@ public: ...@@ -187,7 +187,8 @@ public:
Datetime, ///< Precision down to 1 sec (sizeof(Datetime) == 8 bytes ) Datetime, ///< Precision down to 1 sec (sizeof(Datetime) == 8 bytes )
Timespec, ///< Precision down to 1 nsec(sizeof(Datetime) == 12 bytes ) Timespec, ///< Precision down to 1 nsec(sizeof(Datetime) == 12 bytes )
Blob, ///< Binary large object (see NdbBlob) Blob, ///< Binary large object (see NdbBlob)
Text ///< Text blob Text, ///< Text blob,
Bit ///< Bit, length specifies no of bits
}; };
/** /**
......
...@@ -83,7 +83,8 @@ public: ...@@ -83,7 +83,8 @@ public:
Datetime, // Precision down to 1 sec (size 8 bytes) Datetime, // Precision down to 1 sec (size 8 bytes)
Timespec, // Precision down to 1 nsec (size 12 bytes) Timespec, // Precision down to 1 nsec (size 12 bytes)
Blob, // Blob Blob, // Blob
Text // Text blob Text, // Text blob,
Bit // A bit
}; };
Enum m_typeId; Enum m_typeId;
Cmp* m_cmp; // comparison method Cmp* m_cmp; // comparison method
......
...@@ -4843,8 +4843,17 @@ void Dbdict::handleTabInfo(SimpleProperties::Reader & it, ...@@ -4843,8 +4843,17 @@ void Dbdict::handleTabInfo(SimpleProperties::Reader & it,
nullCount += attrDesc.AttributeNullableFlag; nullCount += attrDesc.AttributeNullableFlag;
const Uint32 aSz = (1 << attrDesc.AttributeSize); const Uint32 aSz = (1 << attrDesc.AttributeSize);
const Uint32 sz = ((aSz * attrDesc.AttributeArraySize) + 31) >> 5; Uint32 sz;
if(aSz != 1)
{
sz = ((aSz * attrDesc.AttributeArraySize) + 31) >> 5;
}
else
{
sz = 0;
nullCount += attrDesc.AttributeArraySize;
}
recordLength += sz; recordLength += sz;
if(attrDesc.AttributeKeyFlag){ if(attrDesc.AttributeKeyFlag){
keyLength += sz; keyLength += sz;
......
...@@ -500,7 +500,8 @@ struct Fragoperrec { ...@@ -500,7 +500,8 @@ struct Fragoperrec {
Uint32 tableidFrag; Uint32 tableidFrag;
Uint32 fragPointer; Uint32 fragPointer;
Uint32 attributeCount; Uint32 attributeCount;
Uint32 freeNullBit; Uint32 currNullBit;
Uint32 noOfNullBits;
Uint32 noOfNewAttrCount; Uint32 noOfNewAttrCount;
Uint32 charsetIndex; Uint32 charsetIndex;
BlockReference lqhBlockrefFrag; BlockReference lqhBlockrefFrag;
...@@ -1630,6 +1631,12 @@ private: ...@@ -1630,6 +1631,12 @@ private:
Uint32 attrDescriptor, Uint32 attrDescriptor,
Uint32 attrDes2); Uint32 attrDes2);
bool readBitsNULLable(Uint32* outBuffer, AttributeHeader*, Uint32, Uint32);
bool updateBitsNULLable(Uint32* inBuffer, Uint32, Uint32);
bool readBitsNotNULL(Uint32* outBuffer, AttributeHeader*, Uint32, Uint32);
bool updateBitsNotNULL(Uint32* inBuffer, Uint32, Uint32);
// ***************************************************************** // *****************************************************************
// Read char routines optionally (tXfrmFlag) apply strxfrm // Read char routines optionally (tXfrmFlag) apply strxfrm
// ***************************************************************** // *****************************************************************
......
...@@ -96,9 +96,10 @@ void Dbtup::execTUPFRAGREQ(Signal* signal) ...@@ -96,9 +96,10 @@ void Dbtup::execTUPFRAGREQ(Signal* signal)
fragOperPtr.p->fragidFrag = fragId; fragOperPtr.p->fragidFrag = fragId;
fragOperPtr.p->tableidFrag = regTabPtr.i; fragOperPtr.p->tableidFrag = regTabPtr.i;
fragOperPtr.p->attributeCount = noOfAttributes; fragOperPtr.p->attributeCount = noOfAttributes;
fragOperPtr.p->freeNullBit = noOfNullAttr; fragOperPtr.p->noOfNullBits = noOfNullAttr;
fragOperPtr.p->noOfNewAttrCount = noOfNewAttr; fragOperPtr.p->noOfNewAttrCount = noOfNewAttr;
fragOperPtr.p->charsetIndex = 0; fragOperPtr.p->charsetIndex = 0;
fragOperPtr.p->currNullBit = 0;
ndbrequire(reqinfo == ZADDFRAG); ndbrequire(reqinfo == ZADDFRAG);
...@@ -309,13 +310,12 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal) ...@@ -309,13 +310,12 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal)
Uint32 firstTabDesIndex = regTabPtr.p->tabDescriptor + (attrId * ZAD_SIZE); Uint32 firstTabDesIndex = regTabPtr.p->tabDescriptor + (attrId * ZAD_SIZE);
setTabDescrWord(firstTabDesIndex, attrDescriptor); setTabDescrWord(firstTabDesIndex, attrDescriptor);
Uint32 attrLen = AttributeDescriptor::getSize(attrDescriptor); Uint32 attrLen = AttributeDescriptor::getSize(attrDescriptor);
Uint32 nullBitPos = 0; /* Default pos for NOT NULL attributes */ Uint32 nullBitPos = fragOperPtr.p->currNullBit;
if (AttributeDescriptor::getNullable(attrDescriptor)) { if (AttributeDescriptor::getNullable(attrDescriptor)) {
if (!AttributeDescriptor::getDynamic(attrDescriptor)) { if (!AttributeDescriptor::getDynamic(attrDescriptor)) {
ljam(); /* NULL ATTR */ ljam(); /* NULL ATTR */
fragOperPtr.p->freeNullBit--; /* STORE NULL BIT POSTITION */ fragOperPtr.p->currNullBit++;
nullBitPos = fragOperPtr.p->freeNullBit;
ndbrequire(fragOperPtr.p->freeNullBit < ZNIL); /* Check not below zero */
}//if }//if
} else { } else {
ljam(); ljam();
...@@ -331,9 +331,20 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal) ...@@ -331,9 +331,20 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal)
case 2: case 2:
{ {
ljam(); ljam();
Uint32 bitsUsed = AttributeDescriptor::getArraySize(attrDescriptor) * (1 << attrLen); if(attrLen != 0)
regTabPtr.p->tupheadsize += ((bitsUsed + 31) >> 5); {
break; ljam();
Uint32 bitsUsed =
AttributeDescriptor::getArraySize(attrDescriptor) * (1 << attrLen);
regTabPtr.p->tupheadsize += ((bitsUsed + 31) >> 5);
break;
}
else
{
ljam();
Uint32 bitCount = AttributeDescriptor::getArraySize(attrDescriptor);
fragOperPtr.p->currNullBit += bitCount;
}
} }
default: default:
ndbrequire(false); ndbrequire(false);
...@@ -375,7 +386,9 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal) ...@@ -375,7 +386,9 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal)
addattrrefuseLab(signal, regFragPtr, fragOperPtr, regTabPtr.p, fragId); addattrrefuseLab(signal, regFragPtr, fragOperPtr, regTabPtr.p, fragId);
return; return;
}//if }//if
if (lastAttr && (fragOperPtr.p->freeNullBit != 0)) { if (lastAttr &&
(fragOperPtr.p->currNullBit != fragOperPtr.p->noOfNullBits))
{
ljam(); ljam();
terrorCode = ZINCONSISTENT_NULL_ATTRIBUTE_COUNT; terrorCode = ZINCONSISTENT_NULL_ATTRIBUTE_COUNT;
addattrrefuseLab(signal, regFragPtr, fragOperPtr, regTabPtr.p, fragId); addattrrefuseLab(signal, regFragPtr, fragOperPtr, regTabPtr.p, fragId);
......
...@@ -40,7 +40,11 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr) ...@@ -40,7 +40,11 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr)
if ((AttributeDescriptor::getArrayType(attrDescriptor) == ZNON_ARRAY) || if ((AttributeDescriptor::getArrayType(attrDescriptor) == ZNON_ARRAY) ||
(AttributeDescriptor::getArrayType(attrDescriptor) == ZFIXED_ARRAY)) { (AttributeDescriptor::getArrayType(attrDescriptor) == ZFIXED_ARRAY)) {
if (!AttributeDescriptor::getNullable(attrDescriptor)) { if (!AttributeDescriptor::getNullable(attrDescriptor)) {
if (AttributeDescriptor::getSizeInWords(attrDescriptor) == 1) { if (AttributeDescriptor::getSize(attrDescriptor) == 0){
ljam();
regTabPtr->readFunctionArray[i] = &Dbtup::readBitsNotNULL;
regTabPtr->updateFunctionArray[i] = &Dbtup::updateBitsNotNULL;
} else if (AttributeDescriptor::getSizeInWords(attrDescriptor) == 1){
ljam(); ljam();
regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHOneWordNotNULL; regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHOneWordNotNULL;
regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHOneWordNotNULL; regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHOneWordNotNULL;
...@@ -61,7 +65,11 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr) ...@@ -61,7 +65,11 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr)
regTabPtr->readFunctionArray[i] = &Dbtup::readCharNotNULL; regTabPtr->readFunctionArray[i] = &Dbtup::readCharNotNULL;
} }
} else { } else {
if (AttributeDescriptor::getSizeInWords(attrDescriptor) == 1) { if (AttributeDescriptor::getSize(attrDescriptor) == 0){
ljam();
regTabPtr->readFunctionArray[i] = &Dbtup::readBitsNULLable;
regTabPtr->updateFunctionArray[i] = &Dbtup::updateBitsNULLable;
} else if (AttributeDescriptor::getSizeInWords(attrDescriptor) == 1){
ljam(); ljam();
regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHOneWordNULLable; regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHOneWordNULLable;
regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHManyWordNULLable; regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHManyWordNULLable;
...@@ -1005,3 +1013,205 @@ Dbtup::read_psuedo(Uint32 attrId, Uint32* outBuffer){ ...@@ -1005,3 +1013,205 @@ Dbtup::read_psuedo(Uint32 attrId, Uint32* outBuffer){
return 0; return 0;
} }
} }
bool
Dbtup::readBitsNotNULL(Uint32* outBuffer,
AttributeHeader* ahOut,
Uint32 attrDescriptor,
Uint32 attrDes2)
{
Tablerec* const regTabPtr = tabptr.p;
Uint32 bitCount = AttributeDescriptor::getArraySize(attrDescriptor);
Uint32 offsetInTuple = AttributeOffset::getNullFlagOffset(attrDes2);
Uint32 offsetInWord = AttributeOffset::getNullFlagBitOffset(attrDes2);
ndbrequire(offsetInTuple < regTabPtr->tupNullWords);
offsetInTuple += regTabPtr->tupNullIndex;
ndbrequire(offsetInTuple < tCheckOffset);
Uint32 indexBuf = tOutBufIndex;
Uint32 newIndexBuf = indexBuf + ((bitCount + 31) >> 5);
Uint32 maxRead = tMaxRead;
outBuffer[indexBuf] =
(tTupleHeader[offsetInTuple] >> offsetInWord) & ((1 << bitCount) - 1);
if(offsetInWord + bitCount > 32)
{
ndbrequire(false);
}
if (newIndexBuf <= maxRead) {
ljam();
ahOut->setDataSize((bitCount + 31) >> 5);
tOutBufIndex = newIndexBuf;
return true;
} else {
ljam();
terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
return false;
}//if
}
bool
Dbtup::readBitsNULLable(Uint32* outBuffer,
AttributeHeader* ahOut,
Uint32 attrDescriptor,
Uint32 attrDes2)
{
Tablerec* const regTabPtr = tabptr.p;
Uint32 bitCount = AttributeDescriptor::getArraySize(attrDescriptor);
Uint32 offsetInTuple = AttributeOffset::getNullFlagOffset(attrDes2);
Uint32 offsetInWord = AttributeOffset::getNullFlagBitOffset(attrDes2);
ndbrequire(offsetInTuple < regTabPtr->tupNullWords);
offsetInTuple += regTabPtr->tupNullIndex;
ndbrequire(offsetInTuple < tCheckOffset);
Uint32 indexBuf = tOutBufIndex;
Uint32 newIndexBuf = indexBuf + (bitCount + 31) >> 5;
Uint32 maxRead = tMaxRead;
if((tTupleHeader[offsetInTuple] >> offsetInWord) & 1)
{
ljam();
ahOut->setNULL();
return true;
}
offsetInWord ++;
if(offsetInWord == 32)
{
ljam();
offsetInWord = 0;
offsetInTuple++;
}
outBuffer[indexBuf] =
(tTupleHeader[offsetInTuple] >> offsetInWord) & ((1 << bitCount) - 1);
if(offsetInWord + bitCount > 32)
{
ndbrequire(false);
}
if (newIndexBuf <= maxRead) {
ljam();
ahOut->setDataSize((bitCount + 31) >> 5);
tOutBufIndex = newIndexBuf;
return true;
} else {
ljam();
terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
return false;
}//if
}
bool
Dbtup::updateBitsNotNULL(Uint32* inBuffer,
Uint32 attrDescriptor,
Uint32 attrDes2)
{
Tablerec* const regTabPtr = tabptr.p;
Uint32 indexBuf = tInBufIndex;
Uint32 inBufLen = tInBufLen;
AttributeHeader ahIn(inBuffer[indexBuf]);
Uint32 nullIndicator = ahIn.isNULL();
Uint32 bitCount = AttributeDescriptor::getArraySize(attrDescriptor);
Uint32 newIndex = indexBuf + 1 + ((bitCount + 31) >> 5);
Uint32 nullFlagOffset = AttributeOffset::getNullFlagOffset(attrDes2);
Uint32 nullFlagBitOffset = AttributeOffset::getNullFlagBitOffset(attrDes2);
Uint32 nullWordOffset = nullFlagOffset + regTabPtr->tupNullIndex;
ndbrequire((nullFlagOffset < regTabPtr->tupNullWords) &&
(nullWordOffset < tCheckOffset));
Uint32 nullBits = tTupleHeader[nullWordOffset];
if (newIndex <= inBufLen) {
Uint32 updateWord = inBuffer[indexBuf + 1];
if (!nullIndicator) {
tInBufIndex = newIndex;
Uint32 mask = ((1 << bitCount) - 1) << nullFlagBitOffset;
if(nullFlagBitOffset + bitCount <= 32)
{
ljam();
tTupleHeader[nullWordOffset] =
(nullBits & ~mask) |
((updateWord << nullFlagBitOffset) & mask);
return true;
}
else
{
abort();
}
return true;
} else {
ljam();
terrorCode = ZNOT_NULL_ATTR;
return false;
}//if
} else {
ljam();
terrorCode = ZAI_INCONSISTENCY_ERROR;
return false;
}//if
return true;
}
bool
Dbtup::updateBitsNULLable(Uint32* inBuffer,
Uint32 attrDescriptor,
Uint32 attrDes2)
{
Tablerec* const regTabPtr = tabptr.p;
AttributeHeader ahIn(inBuffer[tInBufIndex]);
Uint32 indexBuf = tInBufIndex;
Uint32 nullIndicator = ahIn.isNULL();
Uint32 nullFlagOffset = AttributeOffset::getNullFlagOffset(attrDes2);
Uint32 nullFlagBitOffset = AttributeOffset::getNullFlagBitOffset(attrDes2);
Uint32 nullWordOffset = nullFlagOffset + regTabPtr->tupNullIndex;
ndbrequire((nullFlagOffset < regTabPtr->tupNullWords) &&
(nullWordOffset < tCheckOffset));
Uint32 nullBits = tTupleHeader[nullWordOffset];
Uint32 bitCount = AttributeDescriptor::getArraySize(attrDescriptor);
if (!nullIndicator) {
Uint32 newIndex = indexBuf + 1 + ((bitCount + 31) >> 5);
nullBits &= (~(1 << nullFlagBitOffset));
ljam();
tTupleHeader[nullWordOffset] = nullBits;
nullFlagBitOffset++;
if(nullFlagBitOffset == 32)
{
ljam();
nullFlagBitOffset = 0;
nullWordOffset++;
nullBits = tTupleHeader[nullWordOffset];
}
Uint32 updateWord = inBuffer[indexBuf + 1];
Uint32 mask = ((1 << bitCount) - 1) << nullFlagBitOffset;
tInBufLen = newIndex;
if(nullFlagBitOffset + bitCount <= 32)
{
ljam();
tTupleHeader[nullWordOffset] =
(nullBits & ~mask) |
((updateWord << nullFlagBitOffset) & mask);
return true;
}
abort();
} else {
Uint32 newIndex = tInBufIndex + 1;
if (newIndex <= tInBufLen) {
nullBits |= (1 << nullFlagBitOffset);
ljam();
tTupleHeader[nullWordOffset] = nullBits;
tInBufIndex = newIndex;
return true;
} else {
ljam();
terrorCode = ZAI_INCONSISTENCY_ERROR;
return false;
}//if
}//if
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment