Commit 95aa9c28 authored by pekka@clam.ndb.mysql.com's avatar pekka@clam.ndb.mysql.com

Merge clam.ndb.mysql.com:/space/pekka/ndb/version/my50-bug19285

into  clam.ndb.mysql.com:/space/pekka/ndb/version/my51-bug19285
parents 694c1db7 a66d94b7
...@@ -79,19 +79,41 @@ class NdbEventOperationImpl; ...@@ -79,19 +79,41 @@ class NdbEventOperationImpl;
* Non-void NdbBlob methods return -1 on error and 0 on success. Output * Non-void NdbBlob methods return -1 on error and 0 on success. Output
* parameters are used when necessary. * parameters are used when necessary.
* *
* Operation types: * Usage notes for different operation types:
* - insertTuple must use setValue if blob column is non-nullable *
* - readTuple with exclusive lock can also update existing value * - insertTuple must use setValue if blob attribute is non-nullable
* - updateTuple can overwrite with setValue or update existing value *
* - writeTuple always overwrites and must use setValue if non-nullable * - readTuple or scan readTuples with lock mode LM_CommittedRead is
* automatically upgraded to lock mode LM_Read if any blob attributes
* are accessed (to guarantee consistent view)
*
* - readTuple (with any lock mode) can only read blob value
*
* - updateTuple can either overwrite existing value with setValue or
* update it in active phase
*
* - writeTuple always overwrites blob value and must use setValue if
* blob attribute is non-nullable
*
* - deleteTuple creates implicit non-accessible blob handles * - deleteTuple creates implicit non-accessible blob handles
* - scan with exclusive lock can also update existing value *
* - scan "lock takeover" update op must do its own getBlobHandle * - scan readTuples (any lock mode) can use its blob handles only
* to read blob value
*
* - scan readTuples with lock mode LM_Exclusive can update row and blob
* value using updateCurrentTuple, where the operation returned must
* create its own blob handles explicitly
*
* - scan readTuples with lock mode LM_Exclusive can delete row (and
* therefore blob values) using deleteCurrentTuple, which creates
* implicit non-accessible blob handles
*
* - the operation returned by lockCurrentTuple cannot update blob value
* *
* Bugs / limitations: * Bugs / limitations:
* - lock mode upgrade should be handled automatically *
* - lock mode vs allowed operation is not checked
* - too many pending blob ops can blow up i/o buffers * - too many pending blob ops can blow up i/o buffers
*
* - table and its blob part tables are not created atomically * - table and its blob part tables are not created atomically
*/ */
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
...@@ -214,6 +236,9 @@ public: ...@@ -214,6 +236,9 @@ public:
/** /**
* Return error object. The error may be blob specific or may be * Return error object. The error may be blob specific or may be
* copied from a failed implicit operation. * copied from a failed implicit operation.
*
* The error code is copied back to the operation unless the operation
* already has a non-zero error code.
*/ */
const NdbError& getNdbError() const; const NdbError& getNdbError() const;
/** /**
...@@ -326,6 +351,7 @@ private: ...@@ -326,6 +351,7 @@ private:
bool isWriteOp(); bool isWriteOp();
bool isDeleteOp(); bool isDeleteOp();
bool isScanOp(); bool isScanOp();
bool isReadOnlyOp();
bool isTakeOverOp(); bool isTakeOverOp();
// computations // computations
Uint32 getPartNumber(Uint64 pos); Uint32 getPartNumber(Uint64 pos);
...@@ -367,10 +393,10 @@ private: ...@@ -367,10 +393,10 @@ private:
int atNextResult(); int atNextResult();
int atNextEvent(); int atNextEvent();
// errors // errors
void setErrorCode(int anErrorCode, bool invalidFlag = true); void setErrorCode(int anErrorCode, bool invalidFlag = false);
void setErrorCode(NdbOperation* anOp, bool invalidFlag = true); void setErrorCode(NdbOperation* anOp, bool invalidFlag = false);
void setErrorCode(NdbTransaction* aCon, bool invalidFlag = true); void setErrorCode(NdbTransaction* aCon, bool invalidFlag = false);
void setErrorCode(NdbEventOperationImpl* anOp, bool invalidFlag = true); void setErrorCode(NdbEventOperationImpl* anOp, bool invalidFlag = false);
#ifdef VM_TRACE #ifdef VM_TRACE
int getOperationType() const; int getOperationType() const;
friend class NdbOut& operator<<(NdbOut&, const NdbBlob&); friend class NdbOut& operator<<(NdbOut&, const NdbBlob&);
......
...@@ -42,7 +42,7 @@ public: ...@@ -42,7 +42,7 @@ public:
* readTuples. * readTuples.
*/ */
enum ScanFlag { enum ScanFlag {
SF_TupScan = (1 << 16), // scan TUP - only LM_CommittedRead SF_TupScan = (1 << 16), // scan TUP
SF_OrderBy = (1 << 24), // index scan in order SF_OrderBy = (1 << 24), // index scan in order
SF_Descending = (2 << 24), // index scan in descending order SF_Descending = (2 << 24), // index scan in descending order
SF_ReadRangeNo = (4 << 24), // enable @ref get_range_no SF_ReadRangeNo = (4 << 24), // enable @ref get_range_no
......
...@@ -393,6 +393,16 @@ NdbBlob::isScanOp() ...@@ -393,6 +393,16 @@ NdbBlob::isScanOp()
theNdbOp->theOperationType == NdbOperation::OpenRangeScanRequest; theNdbOp->theOperationType == NdbOperation::OpenRangeScanRequest;
} }
inline bool
NdbBlob::isReadOnlyOp()
{
return ! (
theNdbOp->theOperationType == NdbOperation::InsertRequest ||
theNdbOp->theOperationType == NdbOperation::UpdateRequest ||
theNdbOp->theOperationType == NdbOperation::WriteRequest
);
}
inline bool inline bool
NdbBlob::isTakeOverOp() NdbBlob::isTakeOverOp()
{ {
...@@ -638,12 +648,12 @@ NdbBlob::getValue(void* data, Uint32 bytes) ...@@ -638,12 +648,12 @@ NdbBlob::getValue(void* data, Uint32 bytes)
{ {
DBUG_ENTER("NdbBlob::getValue"); DBUG_ENTER("NdbBlob::getValue");
DBUG_PRINT("info", ("data=%p bytes=%u", data, bytes)); DBUG_PRINT("info", ("data=%p bytes=%u", data, bytes));
if (theGetFlag || theState != Prepared) { if (! isReadOp() && ! isScanOp()) {
setErrorCode(NdbBlobImpl::ErrState); setErrorCode(NdbBlobImpl::ErrCompat);
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
if (! isReadOp() && ! isScanOp()) { if (theGetFlag || theState != Prepared) {
setErrorCode(NdbBlobImpl::ErrUsage); setErrorCode(NdbBlobImpl::ErrState);
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
if (data == NULL && bytes != 0) { if (data == NULL && bytes != 0) {
...@@ -661,12 +671,12 @@ NdbBlob::setValue(const void* data, Uint32 bytes) ...@@ -661,12 +671,12 @@ NdbBlob::setValue(const void* data, Uint32 bytes)
{ {
DBUG_ENTER("NdbBlob::setValue"); DBUG_ENTER("NdbBlob::setValue");
DBUG_PRINT("info", ("data=%p bytes=%u", data, bytes)); DBUG_PRINT("info", ("data=%p bytes=%u", data, bytes));
if (theSetFlag || theState != Prepared) { if (isReadOnlyOp()) {
setErrorCode(NdbBlobImpl::ErrState); setErrorCode(NdbBlobImpl::ErrCompat);
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
if (! isInsertOp() && ! isUpdateOp() && ! isWriteOp()) { if (theSetFlag || theState != Prepared) {
setErrorCode(NdbBlobImpl::ErrUsage); setErrorCode(NdbBlobImpl::ErrState);
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
if (data == NULL && bytes != 0) { if (data == NULL && bytes != 0) {
...@@ -762,6 +772,10 @@ int ...@@ -762,6 +772,10 @@ int
NdbBlob::setNull() NdbBlob::setNull()
{ {
DBUG_ENTER("NdbBlob::setNull"); DBUG_ENTER("NdbBlob::setNull");
if (isReadOnlyOp()) {
setErrorCode(NdbBlobImpl::ErrCompat);
DBUG_RETURN(-1);
}
if (theNullFlag == -1) { if (theNullFlag == -1) {
if (theState == Prepared) { if (theState == Prepared) {
DBUG_RETURN(setValue(0, 0)); DBUG_RETURN(setValue(0, 0));
...@@ -800,6 +814,10 @@ NdbBlob::truncate(Uint64 length) ...@@ -800,6 +814,10 @@ NdbBlob::truncate(Uint64 length)
{ {
DBUG_ENTER("NdbBlob::truncate"); DBUG_ENTER("NdbBlob::truncate");
DBUG_PRINT("info", ("length=%llu", length)); DBUG_PRINT("info", ("length=%llu", length));
if (isReadOnlyOp()) {
setErrorCode(NdbBlobImpl::ErrCompat);
DBUG_RETURN(-1);
}
if (theNullFlag == -1) { if (theNullFlag == -1) {
setErrorCode(NdbBlobImpl::ErrState); setErrorCode(NdbBlobImpl::ErrState);
DBUG_RETURN(-1); DBUG_RETURN(-1);
...@@ -857,12 +875,14 @@ NdbBlob::setPos(Uint64 pos) ...@@ -857,12 +875,14 @@ NdbBlob::setPos(Uint64 pos)
int int
NdbBlob::readData(void* data, Uint32& bytes) NdbBlob::readData(void* data, Uint32& bytes)
{ {
DBUG_ENTER("NdbBlob::readData");
if (theState != Active) { if (theState != Active) {
setErrorCode(NdbBlobImpl::ErrState); setErrorCode(NdbBlobImpl::ErrState);
return -1; DBUG_RETURN(-1);
} }
char* buf = static_cast<char*>(data); char* buf = static_cast<char*>(data);
return readDataPrivate(buf, bytes); int ret = readDataPrivate(buf, bytes);
DBUG_RETURN(ret);
} }
int int
...@@ -951,12 +971,18 @@ NdbBlob::readDataPrivate(char* buf, Uint32& bytes) ...@@ -951,12 +971,18 @@ NdbBlob::readDataPrivate(char* buf, Uint32& bytes)
int int
NdbBlob::writeData(const void* data, Uint32 bytes) NdbBlob::writeData(const void* data, Uint32 bytes)
{ {
DBUG_ENTER("NdbBlob::writeData");
if (isReadOnlyOp()) {
setErrorCode(NdbBlobImpl::ErrCompat);
DBUG_RETURN(-1);
}
if (theState != Active) { if (theState != Active) {
setErrorCode(NdbBlobImpl::ErrState); setErrorCode(NdbBlobImpl::ErrState);
return -1; DBUG_RETURN(-1);
} }
const char* buf = static_cast<const char*>(data); const char* buf = static_cast<const char*>(data);
return writeDataPrivate(buf, bytes); int ret = writeDataPrivate(buf, bytes);
DBUG_RETURN(0);
} }
int int
...@@ -1355,6 +1381,9 @@ NdbBlob::atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl ...@@ -1355,6 +1381,9 @@ NdbBlob::atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
if (isReadOp()) { if (isReadOp()) {
// upgrade lock mode
if (theNdbOp->theLockMode == NdbOperation::LM_CommittedRead)
theNdbOp->theLockMode = NdbOperation::LM_Read;
// add read of head+inline in this op // add read of head+inline in this op
if (getHeadInlineValue(theNdbOp) == -1) if (getHeadInlineValue(theNdbOp) == -1)
DBUG_RETURN(-1); DBUG_RETURN(-1);
...@@ -1373,6 +1402,9 @@ NdbBlob::atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl ...@@ -1373,6 +1402,9 @@ NdbBlob::atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl
supportedOp = true; supportedOp = true;
} }
if (isScanOp()) { if (isScanOp()) {
// upgrade lock mode
if (theNdbOp->theLockMode == NdbOperation::LM_CommittedRead)
theNdbOp->theLockMode = NdbOperation::LM_Read;
// add read of head+inline in this op // add read of head+inline in this op
if (getHeadInlineValue(theNdbOp) == -1) if (getHeadInlineValue(theNdbOp) == -1)
DBUG_RETURN(-1); DBUG_RETURN(-1);
......
...@@ -24,7 +24,7 @@ public: ...@@ -24,7 +24,7 @@ public:
STATIC_CONST( ErrTable = 4263 ); STATIC_CONST( ErrTable = 4263 );
// "Invalid usage of blob attribute" // "Invalid usage of blob attribute"
STATIC_CONST( ErrUsage = 4264 ); STATIC_CONST( ErrUsage = 4264 );
// "Method is not valid in current blob state" // "The blob method is not valid in current blob state"
STATIC_CONST( ErrState = 4265 ); STATIC_CONST( ErrState = 4265 );
// "Invalid blob seek position" // "Invalid blob seek position"
STATIC_CONST( ErrSeek = 4266 ); STATIC_CONST( ErrSeek = 4266 );
...@@ -33,9 +33,11 @@ public: ...@@ -33,9 +33,11 @@ public:
// "Error in blob head update forced rollback of transaction" // "Error in blob head update forced rollback of transaction"
STATIC_CONST( ErrAbort = 4268 ); STATIC_CONST( ErrAbort = 4268 );
// "Unknown blob error" // "Unknown blob error"
STATIC_CONST( ErrUnknown = 4269 ); STATIC_CONST( ErrUnknown = 4270 );
// "Corrupted main table PK in blob operation" // "Corrupted main table PK in blob operation"
STATIC_CONST( ErrCorruptPK = 4274 ); STATIC_CONST( ErrCorruptPK = 4274 );
// "The blob method is incompatible with operation type or lock mode"
STATIC_CONST( ErrCompat = 4275 );
}; };
#endif #endif
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment