Commit 6da0dd79 authored by pekka@mysql.com's avatar pekka@mysql.com

NDB blob fixes. INCOMPATIBLE with existing blobs

parent 354739ac
...@@ -97,21 +97,22 @@ NdbBlob::getBlobTable(NdbTableImpl& bt, const NdbTableImpl* t, const NdbColumnIm ...@@ -97,21 +97,22 @@ NdbBlob::getBlobTable(NdbTableImpl& bt, const NdbTableImpl* t, const NdbColumnIm
bt.setName(btname); bt.setName(btname);
bt.setLogging(t->getLogging()); bt.setLogging(t->getLogging());
bt.setFragmentType(t->getFragmentType()); bt.setFragmentType(t->getFragmentType());
{ NdbDictionary::Column bc("DIST"); { NdbDictionary::Column bc("PK");
bc.setType(NdbDictionary::Column::Unsigned); bc.setType(NdbDictionary::Column::Unsigned);
assert(t->m_sizeOfKeysInWords != 0);
bc.setLength(t->m_sizeOfKeysInWords);
bc.setPrimaryKey(true); bc.setPrimaryKey(true);
bc.setDistributionKey(true); bc.setDistributionKey(true);
bt.addColumn(bc); bt.addColumn(bc);
} }
{ NdbDictionary::Column bc("PART"); { NdbDictionary::Column bc("DIST");
bc.setType(NdbDictionary::Column::Unsigned); bc.setType(NdbDictionary::Column::Unsigned);
bc.setPrimaryKey(true); bc.setPrimaryKey(true);
bc.setDistributionKey(true);
bt.addColumn(bc); bt.addColumn(bc);
} }
{ NdbDictionary::Column bc("PK"); { NdbDictionary::Column bc("PART");
bc.setType(NdbDictionary::Column::Unsigned); bc.setType(NdbDictionary::Column::Unsigned);
assert(t->m_sizeOfKeysInWords != 0);
bc.setLength(t->m_sizeOfKeysInWords);
bc.setPrimaryKey(true); bc.setPrimaryKey(true);
bt.addColumn(bc); bt.addColumn(bc);
} }
...@@ -392,9 +393,9 @@ NdbBlob::setPartKeyValue(NdbOperation* anOp, Uint32 part) ...@@ -392,9 +393,9 @@ NdbBlob::setPartKeyValue(NdbOperation* anOp, Uint32 part)
Uint32* data = (Uint32*)theKeyBuf.data; Uint32* data = (Uint32*)theKeyBuf.data;
unsigned size = theTable->m_sizeOfKeysInWords; unsigned size = theTable->m_sizeOfKeysInWords;
DBG("setPartKeyValue dist=" << getDistKey(part) << " part=" << part << " key=" << ndb_blob_debug(data, size)); DBG("setPartKeyValue dist=" << getDistKey(part) << " part=" << part << " key=" << ndb_blob_debug(data, size));
if (anOp->equal((Uint32)0, getDistKey(part)) == -1 || if (anOp->equal((Uint32)0, theKeyBuf.data) == -1 ||
anOp->equal((Uint32)1, part) == -1 || anOp->equal((Uint32)1, getDistKey(part)) == -1 ||
anOp->equal((Uint32)2, theKeyBuf.data) == -1) { anOp->equal((Uint32)2, part) == -1) {
setErrorCode(anOp); setErrorCode(anOp);
return -1; return -1;
} }
...@@ -676,7 +677,6 @@ NdbBlob::readDataPrivate(char* buf, Uint32& bytes) ...@@ -676,7 +677,6 @@ NdbBlob::readDataPrivate(char* buf, Uint32& bytes)
if (readParts(thePartBuf.data, part, 1) == -1) if (readParts(thePartBuf.data, part, 1) == -1)
return -1; return -1;
// need result now // need result now
DBG("execute pending part reads");
if (executePendingBlobReads() == -1) if (executePendingBlobReads() == -1)
return -1; return -1;
Uint32 n = thePartSize - off; Uint32 n = thePartSize - off;
...@@ -710,7 +710,6 @@ NdbBlob::readDataPrivate(char* buf, Uint32& bytes) ...@@ -710,7 +710,6 @@ NdbBlob::readDataPrivate(char* buf, Uint32& bytes)
if (readParts(thePartBuf.data, part, 1) == -1) if (readParts(thePartBuf.data, part, 1) == -1)
return -1; return -1;
// need result now // need result now
DBG("execute pending part reads");
if (executePendingBlobReads() == -1) if (executePendingBlobReads() == -1)
return -1; return -1;
memcpy(buf, thePartBuf.data, len); memcpy(buf, thePartBuf.data, len);
...@@ -773,14 +772,12 @@ NdbBlob::writeDataPrivate(const char* buf, Uint32 bytes) ...@@ -773,14 +772,12 @@ NdbBlob::writeDataPrivate(const char* buf, Uint32 bytes)
if (off != 0) { if (off != 0) {
DBG("partial first block pos=" << pos << " len=" << len); DBG("partial first block pos=" << pos << " len=" << len);
// flush writes to guarantee correct read // flush writes to guarantee correct read
DBG("execute pending part writes");
if (executePendingBlobWrites() == -1) if (executePendingBlobWrites() == -1)
return -1; return -1;
Uint32 part = (pos - theInlineSize) / thePartSize; Uint32 part = (pos - theInlineSize) / thePartSize;
if (readParts(thePartBuf.data, part, 1) == -1) if (readParts(thePartBuf.data, part, 1) == -1)
return -1; return -1;
// need result now // need result now
DBG("execute pending part reads");
if (executePendingBlobReads() == -1) if (executePendingBlobReads() == -1)
return -1; return -1;
Uint32 n = thePartSize - off; Uint32 n = thePartSize - off;
...@@ -824,13 +821,11 @@ NdbBlob::writeDataPrivate(const char* buf, Uint32 bytes) ...@@ -824,13 +821,11 @@ NdbBlob::writeDataPrivate(const char* buf, Uint32 bytes)
Uint32 part = (pos - theInlineSize) / thePartSize; Uint32 part = (pos - theInlineSize) / thePartSize;
if (theLength > pos + len) { if (theLength > pos + len) {
// flush writes to guarantee correct read // flush writes to guarantee correct read
DBG("execute pending part writes");
if (executePendingBlobWrites() == -1) if (executePendingBlobWrites() == -1)
return -1; return -1;
if (readParts(thePartBuf.data, part, 1) == -1) if (readParts(thePartBuf.data, part, 1) == -1)
return -1; return -1;
// need result now // need result now
DBG("execute pending part reads");
if (executePendingBlobReads() == -1) if (executePendingBlobReads() == -1)
return -1; return -1;
memcpy(thePartBuf.data, buf, len); memcpy(thePartBuf.data, buf, len);
...@@ -980,9 +975,11 @@ NdbBlob::deletePartsUnknown(Uint32 part) ...@@ -980,9 +975,11 @@ NdbBlob::deletePartsUnknown(Uint32 part)
} }
tOp->m_abortOption = IgnoreError; tOp->m_abortOption = IgnoreError;
n++; n++;
if (theNdbCon->executeNoBlobs(NoCommit) == -1)
return -1;
} }
DBG("deletePartsUnknown: executeNoBlobs [in] bat=" << bat);
if (theNdbCon->executeNoBlobs(NoCommit) == -1)
return -1;
DBG("deletePartsUnknown: executeNoBlobs [out]");
n = 0; n = 0;
while (n < bat) { while (n < bat) {
NdbOperation* tOp = tOpList[n]; NdbOperation* tOp = tOpList[n];
...@@ -1011,8 +1008,10 @@ NdbBlob::executePendingBlobReads() ...@@ -1011,8 +1008,10 @@ NdbBlob::executePendingBlobReads()
{ {
Uint8 flags = (1 << NdbOperation::ReadRequest); Uint8 flags = (1 << NdbOperation::ReadRequest);
if (thePendingBlobOps & flags) { if (thePendingBlobOps & flags) {
DBG("executePendingBlobReads: executeNoBlobs [in]");
if (theNdbCon->executeNoBlobs(NoCommit) == -1) if (theNdbCon->executeNoBlobs(NoCommit) == -1)
return -1; return -1;
DBG("executePendingBlobReads: executeNoBlobs [out]");
thePendingBlobOps = 0; thePendingBlobOps = 0;
theNdbCon->thePendingBlobOps = 0; theNdbCon->thePendingBlobOps = 0;
} }
...@@ -1024,8 +1023,10 @@ NdbBlob::executePendingBlobWrites() ...@@ -1024,8 +1023,10 @@ NdbBlob::executePendingBlobWrites()
{ {
Uint8 flags = 0xFF & ~(1 << NdbOperation::ReadRequest); Uint8 flags = 0xFF & ~(1 << NdbOperation::ReadRequest);
if (thePendingBlobOps & flags) { if (thePendingBlobOps & flags) {
DBG("executePendingBlobWrites: executeNoBlobs [in]");
if (theNdbCon->executeNoBlobs(NoCommit) == -1) if (theNdbCon->executeNoBlobs(NoCommit) == -1)
return -1; return -1;
DBG("executePendingBlobWrites: executeNoBlobs [out]");
thePendingBlobOps = 0; thePendingBlobOps = 0;
theNdbCon->thePendingBlobOps = 0; theNdbCon->thePendingBlobOps = 0;
} }
...@@ -1037,10 +1038,10 @@ NdbBlob::executePendingBlobWrites() ...@@ -1037,10 +1038,10 @@ NdbBlob::executePendingBlobWrites()
int int
NdbBlob::invokeActiveHook() NdbBlob::invokeActiveHook()
{ {
DBG("invokeActiveHook"); DBG("invokeActiveHook [in]");
assert(theState == Active && theActiveHook != NULL); assert(theState == Active && theActiveHook != NULL);
int ret = (*theActiveHook)(this, theActiveHookArg); int ret = (*theActiveHook)(this, theActiveHookArg);
DBG("invokeActiveHook ret=" << ret); DBG("invokeActiveHook [out] ret=" << ret);
if (ret != 0) { if (ret != 0) {
// no error is set on blob level // no error is set on blob level
return -1; return -1;
...@@ -1244,7 +1245,7 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch) ...@@ -1244,7 +1245,7 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch)
return -1; return -1;
} }
} else { } else {
NdbOperation* tOp = theNdbCon->getNdbIndexOperation(theAccessTable->m_index, theTable, theNdbOp); NdbIndexOperation* tOp = theNdbCon->getNdbIndexOperation(theAccessTable->m_index, theTable, theNdbOp);
if (tOp == NULL || if (tOp == NULL ||
tOp->readTuple() == -1 || tOp->readTuple() == -1 ||
setAccessKeyValue(tOp) == -1 || setAccessKeyValue(tOp) == -1 ||
......
...@@ -1232,6 +1232,13 @@ deleteScan(bool idx) ...@@ -1232,6 +1232,13 @@ deleteScan(bool idx)
// main // main
// from here on print always
#undef DBG
#define DBG(x) \
do { \
ndbout << "line " << __LINE__ << " " << x << endl; \
} while (0)
static int static int
testmain() testmain()
{ {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment