Commit 6331fd76 authored by unknown's avatar unknown

Merge blobs

parent df6a7758
...@@ -187,6 +187,7 @@ private: ...@@ -187,6 +187,7 @@ private:
friend class NdbOperation; friend class NdbOperation;
friend class NdbScanOperation; friend class NdbScanOperation;
friend class NdbDictionaryImpl; friend class NdbDictionaryImpl;
friend class NdbResultSet; // atNextResult
// state // state
State theState; State theState;
void setState(State newState); void setState(State newState);
......
...@@ -763,10 +763,8 @@ protected: ...@@ -763,10 +763,8 @@ protected:
* These are support methods only used locally in this class. * These are support methods only used locally in this class.
******************************************************************************/ ******************************************************************************/
virtual int equal_impl(const NdbColumnImpl* anAttrObject, virtual int equal_impl(const NdbColumnImpl*,const char* aValue, Uint32 len);
const char* aValue, virtual NdbRecAttr* getValue_impl(const NdbColumnImpl*, char* aValue = 0);
Uint32 len);
NdbRecAttr* getValue_impl(const NdbColumnImpl* anAttrObject, char* aValue = 0);
int setValue(const NdbColumnImpl* anAttrObject, const char* aValue, Uint32 len); int setValue(const NdbColumnImpl* anAttrObject, const char* aValue, Uint32 len);
NdbBlob* getBlobHandle(NdbConnection* aCon, const NdbColumnImpl* anAttrObject); NdbBlob* getBlobHandle(NdbConnection* aCon, const NdbColumnImpl* anAttrObject);
int incValue(const NdbColumnImpl* anAttrObject, Uint32 aValue); int incValue(const NdbColumnImpl* anAttrObject, Uint32 aValue);
...@@ -809,7 +807,7 @@ protected: ...@@ -809,7 +807,7 @@ protected:
// get table or index key from prepared signals // get table or index key from prepared signals
int getKeyFromTCREQ(Uint32* data, unsigned size); int getKeyFromTCREQ(Uint32* data, unsigned size);
int getKeyFromKEYINFO20(Uint32* data, unsigned size);
/****************************************************************************** /******************************************************************************
* These are the private variables that are defined in the operation objects. * These are the private variables that are defined in the operation objects.
*****************************************************************************/ *****************************************************************************/
......
...@@ -42,7 +42,7 @@ class NdbScanOperation : public NdbOperation { ...@@ -42,7 +42,7 @@ class NdbScanOperation : public NdbOperation {
friend class NdbConnection; friend class NdbConnection;
friend class NdbResultSet; friend class NdbResultSet;
friend class NdbOperation; friend class NdbOperation;
friend class NdbBlob;
public: public:
/** /**
* Type of cursor * Type of cursor
...@@ -92,13 +92,12 @@ public: ...@@ -92,13 +92,12 @@ public:
return readTuples(LM_Exclusive, 0, parallell); return readTuples(LM_Exclusive, 0, parallell);
} }
protected:
CursorType m_cursor_type;
NdbBlob* getBlobHandle(const char* anAttrName); NdbBlob* getBlobHandle(const char* anAttrName);
NdbBlob* getBlobHandle(Uint32 anAttrId); NdbBlob* getBlobHandle(Uint32 anAttrId);
private: protected:
CursorType m_cursor_type;
NdbScanOperation(Ndb* aNdb); NdbScanOperation(Ndb* aNdb);
~NdbScanOperation(); ~NdbScanOperation();
...@@ -154,6 +153,7 @@ private: ...@@ -154,6 +153,7 @@ private:
void receiver_completed(NdbReceiver*); void receiver_completed(NdbReceiver*);
void execCLOSE_SCAN_REP(); void execCLOSE_SCAN_REP();
int getKeyFromKEYINFO20(Uint32* data, unsigned size);
NdbOperation* takeOverScanOp(OperationType opType, NdbConnection*); NdbOperation* takeOverScanOp(OperationType opType, NdbConnection*);
Uint32 m_ordered; Uint32 m_ordered;
......
...@@ -14,13 +14,14 @@ ...@@ -14,13 +14,14 @@
along with this program; if not, write to the Free Software along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "Ndb.hpp" #include <Ndb.hpp>
#include "NdbDictionaryImpl.hpp" #include <NdbDictionaryImpl.hpp>
#include "NdbConnection.hpp" #include <NdbConnection.hpp>
#include "NdbOperation.hpp" #include <NdbOperation.hpp>
#include "NdbIndexOperation.hpp" #include <NdbIndexOperation.hpp>
#include "NdbRecAttr.hpp" #include <NdbRecAttr.hpp>
#include "NdbBlob.hpp" #include <NdbBlob.hpp>
#include <NdbScanOperation.hpp>
#ifdef NDB_BLOB_DEBUG #ifdef NDB_BLOB_DEBUG
#define DBG(x) \ #define DBG(x) \
...@@ -301,7 +302,7 @@ NdbBlob::getTableKeyValue(NdbOperation* anOp) ...@@ -301,7 +302,7 @@ NdbBlob::getTableKeyValue(NdbOperation* anOp)
assert(c != NULL); assert(c != NULL);
if (c->m_pk) { if (c->m_pk) {
unsigned len = c->m_attrSize * c->m_arraySize; unsigned len = c->m_attrSize * c->m_arraySize;
if (anOp->getValue(c, (char*)&data[pos]) == NULL) { if (anOp->getValue_impl(c, (char*)&data[pos]) == NULL) {
setErrorCode(anOp); setErrorCode(anOp);
return -1; return -1;
} }
...@@ -382,7 +383,7 @@ int ...@@ -382,7 +383,7 @@ int
NdbBlob::getHeadInlineValue(NdbOperation* anOp) NdbBlob::getHeadInlineValue(NdbOperation* anOp)
{ {
DBG("getHeadInlineValue"); DBG("getHeadInlineValue");
theHeadInlineRecAttr = anOp->getValue(theColumn, theHeadInlineBuf.data); theHeadInlineRecAttr = anOp->getValue_impl(theColumn, theHeadInlineBuf.data);
if (theHeadInlineRecAttr == NULL) { if (theHeadInlineRecAttr == NULL) {
setErrorCode(anOp); setErrorCode(anOp);
return -1; return -1;
...@@ -1250,7 +1251,7 @@ NdbBlob::atNextResult() ...@@ -1250,7 +1251,7 @@ NdbBlob::atNextResult()
// get primary key // get primary key
{ Uint32* data = (Uint32*)theKeyBuf.data; { Uint32* data = (Uint32*)theKeyBuf.data;
unsigned size = theTable->m_sizeOfKeysInWords; unsigned size = theTable->m_sizeOfKeysInWords;
if (theNdbOp->getKeyFromKEYINFO20(data, size) == -1) { if (((NdbScanOperation*)theNdbOp)->getKeyFromKEYINFO20(data, size) == -1) {
setErrorCode(ErrUsage); setErrorCode(ErrUsage);
return -1; return -1;
} }
......
...@@ -100,9 +100,11 @@ NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){ ...@@ -100,9 +100,11 @@ NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){
key.m_attrSize = 4; key.m_attrSize = 4;
key.m_nullable = true; // So that receive works w.r.t KEYINFO20 key.m_nullable = true; // So that receive works w.r.t KEYINFO20
} }
m_key_info = key_size;
for(Uint32 i = 0; i<rows; i++){ for(Uint32 i = 0; i<rows; i++){
NdbRecAttr * prev = theCurrentRecAttr; NdbRecAttr * prev = theCurrentRecAttr;
assert(prev == 0 || i > 0);
// Put key-recAttr fir on each row // Put key-recAttr fir on each row
if(key_size && !getValue(&key, (char*)0)){ if(key_size && !getValue(&key, (char*)0)){
...@@ -112,7 +114,7 @@ NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){ ...@@ -112,7 +114,7 @@ NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){
NdbRecAttr* tRecAttr = org->theFirstRecAttr; NdbRecAttr* tRecAttr = org->theFirstRecAttr;
while(tRecAttr != 0){ while(tRecAttr != 0){
if(getValue(&NdbColumnImpl::getImpl(*tRecAttr->m_column), (char*)0)) if(getValue(&NdbColumnImpl::getImpl(*tRecAttr->m_column), (char*)0) != 0)
tRecAttr = tRecAttr->next(); tRecAttr = tRecAttr->next();
else else
break; break;
......
...@@ -29,6 +29,7 @@ ...@@ -29,6 +29,7 @@
#include <Ndb.hpp> #include <Ndb.hpp>
#include <NdbConnection.hpp> #include <NdbConnection.hpp>
#include <NdbResultSet.hpp> #include <NdbResultSet.hpp>
#include <NdbBlob.hpp>
NdbResultSet::NdbResultSet(NdbScanOperation *owner) NdbResultSet::NdbResultSet(NdbScanOperation *owner)
: m_operation(owner) : m_operation(owner)
...@@ -45,7 +46,18 @@ void NdbResultSet::init() ...@@ -45,7 +46,18 @@ void NdbResultSet::init()
int NdbResultSet::nextResult(bool fetchAllowed) int NdbResultSet::nextResult(bool fetchAllowed)
{ {
return m_operation->nextResult(fetchAllowed); int res;
if ((res = m_operation->nextResult(fetchAllowed)) == 0) {
// handle blobs
NdbBlob* tBlob = m_operation->theBlobList;
while (tBlob != 0) {
if (tBlob->atNextResult() == -1)
return -1;
tBlob = tBlob->theNext;
}
return 0;
}
return res;
} }
void NdbResultSet::close() void NdbResultSet::close()
......
...@@ -644,7 +644,7 @@ void NdbScanOperation::closeScan() ...@@ -644,7 +644,7 @@ void NdbScanOperation::closeScan()
{ {
int self = pthread_self() ; int self = pthread_self() ;
do { if(m_transConnection) do {
TransporterFacade* tp = TransporterFacade::instance(); TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr); Guard guard(tp->theMutexPtr);
...@@ -874,6 +874,26 @@ NdbScanOperation::doSendScan(int aProcessorId) ...@@ -874,6 +874,26 @@ NdbScanOperation::doSendScan(int aProcessorId)
* in separate threads and thus increasing the parallelism during * in separate threads and thus increasing the parallelism during
* the scan process. * the scan process.
*****************************************************************************/ *****************************************************************************/
int
NdbScanOperation::getKeyFromKEYINFO20(Uint32* data, unsigned size)
{
Uint32 idx = m_current_api_receiver;
Uint32 last = m_api_receivers_count;
Uint32 row;
NdbReceiver * tRec;
NdbRecAttr * tRecAttr;
if(idx < last && (tRec = m_api_receivers[idx])
&& ((row = tRec->m_current_row) <= tRec->m_defined_rows)
&& (tRecAttr = tRec->m_rows[row-1])){
const Uint32 * src = (Uint32*)tRecAttr->aRef();
memcpy(data, src, 4*size);
return 0;
}
return -1;
}
NdbOperation* NdbOperation*
NdbScanOperation::takeOverScanOp(OperationType opType, NdbConnection* pTrans){ NdbScanOperation::takeOverScanOp(OperationType opType, NdbConnection* pTrans){
...@@ -942,11 +962,37 @@ NdbScanOperation::takeOverScanOp(OperationType opType, NdbConnection* pTrans){ ...@@ -942,11 +962,37 @@ NdbScanOperation::takeOverScanOp(OperationType opType, NdbConnection* pTrans){
memcpy(keyInfo->keyData, src, 4 * left); memcpy(keyInfo->keyData, src, 4 * left);
} }
} }
// create blob handles automatically
if (opType == DeleteRequest && m_currentTable->m_noOfBlobs != 0) {
for (unsigned i = 0; i < m_currentTable->m_columns.size(); i++) {
NdbColumnImpl* c = m_currentTable->m_columns[i];
assert(c != 0);
if (c->getBlobType()) {
if (newOp->getBlobHandle(pTrans, c) == NULL)
return NULL;
}
}
}
return newOp; return newOp;
} }
return 0; return 0;
} }
NdbBlob*
NdbScanOperation::getBlobHandle(const char* anAttrName)
{
return NdbOperation::getBlobHandle(m_transConnection,
m_currentTable->getColumn(anAttrName));
}
NdbBlob*
NdbScanOperation::getBlobHandle(Uint32 anAttrId)
{
return NdbOperation::getBlobHandle(m_transConnection,
m_currentTable->getColumn(anAttrId));
}
NdbIndexScanOperation::NdbIndexScanOperation(Ndb* aNdb) NdbIndexScanOperation::NdbIndexScanOperation(Ndb* aNdb)
: NdbScanOperation(aNdb) : NdbScanOperation(aNdb)
{ {
......
...@@ -584,13 +584,14 @@ verifyBlobTable(const Bcol& b, const Bval& v, Uint32 pk1, bool exists) ...@@ -584,13 +584,14 @@ verifyBlobTable(const Bcol& b, const Bval& v, Uint32 pk1, bool exists)
NdbRecAttr* ra_pk; NdbRecAttr* ra_pk;
NdbRecAttr* ra_part; NdbRecAttr* ra_part;
NdbRecAttr* ra_data; NdbRecAttr* ra_data;
NdbResultSet* rs;
CHK((g_con = g_ndb->startTransaction()) != 0); CHK((g_con = g_ndb->startTransaction()) != 0);
CHK((g_opr = g_con->getNdbOperation(b.m_btname)) != 0); CHK((g_ops = g_con->getNdbScanOperation(b.m_btname)) != 0);
CHK(g_opr->openScanRead() == 0); CHK((rs = g_ops->readTuples()) != 0);
CHK((ra_pk = g_opr->getValue("PK")) != 0); CHK((ra_pk = g_ops->getValue("PK")) != 0);
CHK((ra_part = g_opr->getValue("PART")) != 0); CHK((ra_part = g_ops->getValue("PART")) != 0);
CHK((ra_data = g_opr->getValue("DATA")) != 0); CHK((ra_data = g_ops->getValue("DATA")) != 0);
CHK(g_con->executeScan() == 0); CHK(g_con->execute(NoCommit) == 0);
unsigned partcount; unsigned partcount;
if (! exists || v.m_len <= b.m_inline) if (! exists || v.m_len <= b.m_inline)
partcount = 0; partcount = 0;
...@@ -600,7 +601,7 @@ verifyBlobTable(const Bcol& b, const Bval& v, Uint32 pk1, bool exists) ...@@ -600,7 +601,7 @@ verifyBlobTable(const Bcol& b, const Bval& v, Uint32 pk1, bool exists)
memset(seen, 0, partcount); memset(seen, 0, partcount);
while (1) { while (1) {
int ret; int ret;
CHK((ret = g_con->nextScanResult()) == 0 || ret == 1); CHK((ret = rs->nextResult()) == 0 || ret == 1);
if (ret == 1) if (ret == 1)
break; break;
if (pk1 != ra_pk->u_32_value()) if (pk1 != ra_pk->u_32_value())
...@@ -620,7 +621,7 @@ verifyBlobTable(const Bcol& b, const Bval& v, Uint32 pk1, bool exists) ...@@ -620,7 +621,7 @@ verifyBlobTable(const Bcol& b, const Bval& v, Uint32 pk1, bool exists)
for (unsigned i = 0; i < partcount; i++) for (unsigned i = 0; i < partcount; i++)
CHK(seen[i] == 1); CHK(seen[i] == 1);
g_ndb->closeTransaction(g_con); g_ndb->closeTransaction(g_con);
g_opr = 0; g_ops = 0;
g_con = 0; g_con = 0;
return 0; return 0;
} }
...@@ -829,9 +830,9 @@ readScan(bool rw, bool idx) ...@@ -829,9 +830,9 @@ readScan(bool rw, bool idx)
if (! idx) { if (! idx) {
CHK((g_ops = g_con->getNdbScanOperation(g_opt.m_tname)) != 0); CHK((g_ops = g_con->getNdbScanOperation(g_opt.m_tname)) != 0);
} else { } else {
CHK((g_ops = g_con->getNdbScanOperation(g_opt.m_x2name, g_opt.m_tname)) != 0); CHK((g_ops = g_con->getNdbIndexScanOperation(g_opt.m_x2name, g_opt.m_tname)) != 0);
} }
CHK((rs = g_ops->readTuples(240, NdbScanOperation::LM_Exclusive)) != 0); CHK((rs = g_ops->readTuples(NdbScanOperation::LM_Exclusive)) != 0);
CHK(g_ops->getValue("PK1", (char*)&tup.m_pk1) != 0); CHK(g_ops->getValue("PK1", (char*)&tup.m_pk1) != 0);
if (g_opt.m_pk2len != 0) if (g_opt.m_pk2len != 0)
CHK(g_ops->getValue("PK2", tup.m_pk2) != 0); CHK(g_ops->getValue("PK2", tup.m_pk2) != 0);
...@@ -921,9 +922,9 @@ deleteScan(bool idx) ...@@ -921,9 +922,9 @@ deleteScan(bool idx)
if (! idx) { if (! idx) {
CHK((g_ops = g_con->getNdbScanOperation(g_opt.m_tname)) != 0); CHK((g_ops = g_con->getNdbScanOperation(g_opt.m_tname)) != 0);
} else { } else {
CHK((g_ops = g_con->getNdbScanOperation(g_opt.m_x2name, g_opt.m_tname)) != 0); CHK((g_ops = g_con->getNdbIndexScanOperation(g_opt.m_x2name, g_opt.m_tname)) != 0);
} }
CHK((rs = g_ops->readTuples(240, NdbScanOperation::LM_Exclusive)) != 0); CHK((rs = g_ops->readTuples(NdbScanOperation::LM_Exclusive)) != 0);
CHK(g_ops->getValue("PK1", (char*)&tup.m_pk1) != 0); CHK(g_ops->getValue("PK1", (char*)&tup.m_pk1) != 0);
if (g_opt.m_pk2len != 0) if (g_opt.m_pk2len != 0)
CHK(g_ops->getValue("PK2", tup.m_pk2) != 0); CHK(g_ops->getValue("PK2", tup.m_pk2) != 0);
...@@ -1131,7 +1132,7 @@ NDB_COMMAND(testOdbcDriver, "testBlobs", "testBlobs", "testBlobs", 65535) ...@@ -1131,7 +1132,7 @@ NDB_COMMAND(testOdbcDriver, "testBlobs", "testBlobs", "testBlobs", 65535)
if (strcmp(arg, "-dbgall") == 0) { if (strcmp(arg, "-dbgall") == 0) {
g_opt.m_dbg = true; g_opt.m_dbg = true;
g_opt.m_dbgall = true; g_opt.m_dbgall = true;
putenv("NDB_BLOB_DEBUG=1"); putenv(strdup("NDB_BLOB_DEBUG=1"));
continue; continue;
} }
if (strcmp(arg, "-full") == 0) { if (strcmp(arg, "-full") == 0) {
......
...@@ -440,7 +440,7 @@ testcase(int flag) ...@@ -440,7 +440,7 @@ testcase(int flag)
int newkey = 0; int newkey = 0;
if ((con = ndb->startTransaction()) == 0) if ((con = ndb->startTransaction()) == 0)
return ndberror("startTransaction key=%d", key); return ndberror("startTransaction key=%d", key);
if ((sop = con->getNdbScanOperation(tab)) == 0) if ((op = sop = con->getNdbScanOperation(tab)) == 0)
return ndberror("getNdbOperation key=%d", key); return ndberror("getNdbOperation key=%d", key);
if ((rs = sop->readTuples(1)) == 0) if ((rs = sop->readTuples(1)) == 0)
return ndberror("openScanRead key=%d", key); return ndberror("openScanRead key=%d", key);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment