Commit 817c0d87 authored by unknown's avatar unknown

Merge blobs

parent 6a4f612f
......@@ -187,6 +187,7 @@ private:
friend class NdbOperation;
friend class NdbScanOperation;
friend class NdbDictionaryImpl;
friend class NdbResultSet; // atNextResult
// state
State theState;
void setState(State newState);
......
......@@ -763,10 +763,8 @@ protected:
* These are support methods only used locally in this class.
******************************************************************************/
virtual int equal_impl(const NdbColumnImpl* anAttrObject,
const char* aValue,
Uint32 len);
NdbRecAttr* getValue_impl(const NdbColumnImpl* anAttrObject, char* aValue = 0);
virtual int equal_impl(const NdbColumnImpl*,const char* aValue, Uint32 len);
virtual NdbRecAttr* getValue_impl(const NdbColumnImpl*, char* aValue = 0);
int setValue(const NdbColumnImpl* anAttrObject, const char* aValue, Uint32 len);
NdbBlob* getBlobHandle(NdbConnection* aCon, const NdbColumnImpl* anAttrObject);
int incValue(const NdbColumnImpl* anAttrObject, Uint32 aValue);
......@@ -809,7 +807,7 @@ protected:
// get table or index key from prepared signals
int getKeyFromTCREQ(Uint32* data, unsigned size);
int getKeyFromKEYINFO20(Uint32* data, unsigned size);
/******************************************************************************
* These are the private variables that are defined in the operation objects.
*****************************************************************************/
......
......@@ -42,7 +42,7 @@ class NdbScanOperation : public NdbOperation {
friend class NdbConnection;
friend class NdbResultSet;
friend class NdbOperation;
friend class NdbBlob;
public:
/**
* Type of cursor
......@@ -92,13 +92,12 @@ public:
return readTuples(LM_Exclusive, 0, parallell);
}
protected:
CursorType m_cursor_type;
NdbBlob* getBlobHandle(const char* anAttrName);
NdbBlob* getBlobHandle(Uint32 anAttrId);
private:
protected:
CursorType m_cursor_type;
NdbScanOperation(Ndb* aNdb);
~NdbScanOperation();
......@@ -154,6 +153,7 @@ private:
void receiver_completed(NdbReceiver*);
void execCLOSE_SCAN_REP();
int getKeyFromKEYINFO20(Uint32* data, unsigned size);
NdbOperation* takeOverScanOp(OperationType opType, NdbConnection*);
Uint32 m_ordered;
......
......@@ -14,13 +14,14 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "Ndb.hpp"
#include "NdbDictionaryImpl.hpp"
#include "NdbConnection.hpp"
#include "NdbOperation.hpp"
#include "NdbIndexOperation.hpp"
#include "NdbRecAttr.hpp"
#include "NdbBlob.hpp"
#include <Ndb.hpp>
#include <NdbDictionaryImpl.hpp>
#include <NdbConnection.hpp>
#include <NdbOperation.hpp>
#include <NdbIndexOperation.hpp>
#include <NdbRecAttr.hpp>
#include <NdbBlob.hpp>
#include <NdbScanOperation.hpp>
#ifdef NDB_BLOB_DEBUG
#define DBG(x) \
......@@ -301,7 +302,7 @@ NdbBlob::getTableKeyValue(NdbOperation* anOp)
assert(c != NULL);
if (c->m_pk) {
unsigned len = c->m_attrSize * c->m_arraySize;
if (anOp->getValue(c, (char*)&data[pos]) == NULL) {
if (anOp->getValue_impl(c, (char*)&data[pos]) == NULL) {
setErrorCode(anOp);
return -1;
}
......@@ -382,7 +383,7 @@ int
NdbBlob::getHeadInlineValue(NdbOperation* anOp)
{
DBG("getHeadInlineValue");
theHeadInlineRecAttr = anOp->getValue(theColumn, theHeadInlineBuf.data);
theHeadInlineRecAttr = anOp->getValue_impl(theColumn, theHeadInlineBuf.data);
if (theHeadInlineRecAttr == NULL) {
setErrorCode(anOp);
return -1;
......@@ -1250,7 +1251,7 @@ NdbBlob::atNextResult()
// get primary key
{ Uint32* data = (Uint32*)theKeyBuf.data;
unsigned size = theTable->m_sizeOfKeysInWords;
if (theNdbOp->getKeyFromKEYINFO20(data, size) == -1) {
if (((NdbScanOperation*)theNdbOp)->getKeyFromKEYINFO20(data, size) == -1) {
setErrorCode(ErrUsage);
return -1;
}
......
......@@ -100,9 +100,11 @@ NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){
key.m_attrSize = 4;
key.m_nullable = true; // So that receive works w.r.t KEYINFO20
}
m_key_info = key_size;
for(Uint32 i = 0; i<rows; i++){
NdbRecAttr * prev = theCurrentRecAttr;
assert(prev == 0 || i > 0);
// Put key-recAttr fir on each row
if(key_size && !getValue(&key, (char*)0)){
......@@ -112,7 +114,7 @@ NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){
NdbRecAttr* tRecAttr = org->theFirstRecAttr;
while(tRecAttr != 0){
if(getValue(&NdbColumnImpl::getImpl(*tRecAttr->m_column), (char*)0))
if(getValue(&NdbColumnImpl::getImpl(*tRecAttr->m_column), (char*)0) != 0)
tRecAttr = tRecAttr->next();
else
break;
......
......@@ -29,6 +29,7 @@
#include <Ndb.hpp>
#include <NdbConnection.hpp>
#include <NdbResultSet.hpp>
#include <NdbBlob.hpp>
NdbResultSet::NdbResultSet(NdbScanOperation *owner)
: m_operation(owner)
......@@ -45,7 +46,18 @@ void NdbResultSet::init()
int NdbResultSet::nextResult(bool fetchAllowed)
{
return m_operation->nextResult(fetchAllowed);
int res;
if ((res = m_operation->nextResult(fetchAllowed)) == 0) {
// handle blobs
NdbBlob* tBlob = m_operation->theBlobList;
while (tBlob != 0) {
if (tBlob->atNextResult() == -1)
return -1;
tBlob = tBlob->theNext;
}
return 0;
}
return res;
}
void NdbResultSet::close()
......
......@@ -644,7 +644,7 @@ void NdbScanOperation::closeScan()
{
int self = pthread_self() ;
do {
if(m_transConnection) do {
TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr);
......@@ -874,6 +874,26 @@ NdbScanOperation::doSendScan(int aProcessorId)
* in separate threads and thus increasing the parallelism during
* the scan process.
*****************************************************************************/
int
NdbScanOperation::getKeyFromKEYINFO20(Uint32* data, unsigned size)
{
Uint32 idx = m_current_api_receiver;
Uint32 last = m_api_receivers_count;
Uint32 row;
NdbReceiver * tRec;
NdbRecAttr * tRecAttr;
if(idx < last && (tRec = m_api_receivers[idx])
&& ((row = tRec->m_current_row) <= tRec->m_defined_rows)
&& (tRecAttr = tRec->m_rows[row-1])){
const Uint32 * src = (Uint32*)tRecAttr->aRef();
memcpy(data, src, 4*size);
return 0;
}
return -1;
}
NdbOperation*
NdbScanOperation::takeOverScanOp(OperationType opType, NdbConnection* pTrans){
......@@ -942,11 +962,37 @@ NdbScanOperation::takeOverScanOp(OperationType opType, NdbConnection* pTrans){
memcpy(keyInfo->keyData, src, 4 * left);
}
}
// create blob handles automatically
if (opType == DeleteRequest && m_currentTable->m_noOfBlobs != 0) {
for (unsigned i = 0; i < m_currentTable->m_columns.size(); i++) {
NdbColumnImpl* c = m_currentTable->m_columns[i];
assert(c != 0);
if (c->getBlobType()) {
if (newOp->getBlobHandle(pTrans, c) == NULL)
return NULL;
}
}
}
return newOp;
}
return 0;
}
NdbBlob*
NdbScanOperation::getBlobHandle(const char* anAttrName)
{
return NdbOperation::getBlobHandle(m_transConnection,
m_currentTable->getColumn(anAttrName));
}
NdbBlob*
NdbScanOperation::getBlobHandle(Uint32 anAttrId)
{
return NdbOperation::getBlobHandle(m_transConnection,
m_currentTable->getColumn(anAttrId));
}
NdbIndexScanOperation::NdbIndexScanOperation(Ndb* aNdb)
: NdbScanOperation(aNdb)
{
......
......@@ -584,13 +584,14 @@ verifyBlobTable(const Bcol& b, const Bval& v, Uint32 pk1, bool exists)
NdbRecAttr* ra_pk;
NdbRecAttr* ra_part;
NdbRecAttr* ra_data;
NdbResultSet* rs;
CHK((g_con = g_ndb->startTransaction()) != 0);
CHK((g_opr = g_con->getNdbOperation(b.m_btname)) != 0);
CHK(g_opr->openScanRead() == 0);
CHK((ra_pk = g_opr->getValue("PK")) != 0);
CHK((ra_part = g_opr->getValue("PART")) != 0);
CHK((ra_data = g_opr->getValue("DATA")) != 0);
CHK(g_con->executeScan() == 0);
CHK((g_ops = g_con->getNdbScanOperation(b.m_btname)) != 0);
CHK((rs = g_ops->readTuples()) != 0);
CHK((ra_pk = g_ops->getValue("PK")) != 0);
CHK((ra_part = g_ops->getValue("PART")) != 0);
CHK((ra_data = g_ops->getValue("DATA")) != 0);
CHK(g_con->execute(NoCommit) == 0);
unsigned partcount;
if (! exists || v.m_len <= b.m_inline)
partcount = 0;
......@@ -600,7 +601,7 @@ verifyBlobTable(const Bcol& b, const Bval& v, Uint32 pk1, bool exists)
memset(seen, 0, partcount);
while (1) {
int ret;
CHK((ret = g_con->nextScanResult()) == 0 || ret == 1);
CHK((ret = rs->nextResult()) == 0 || ret == 1);
if (ret == 1)
break;
if (pk1 != ra_pk->u_32_value())
......@@ -620,7 +621,7 @@ verifyBlobTable(const Bcol& b, const Bval& v, Uint32 pk1, bool exists)
for (unsigned i = 0; i < partcount; i++)
CHK(seen[i] == 1);
g_ndb->closeTransaction(g_con);
g_opr = 0;
g_ops = 0;
g_con = 0;
return 0;
}
......@@ -829,9 +830,9 @@ readScan(bool rw, bool idx)
if (! idx) {
CHK((g_ops = g_con->getNdbScanOperation(g_opt.m_tname)) != 0);
} else {
CHK((g_ops = g_con->getNdbScanOperation(g_opt.m_x2name, g_opt.m_tname)) != 0);
CHK((g_ops = g_con->getNdbIndexScanOperation(g_opt.m_x2name, g_opt.m_tname)) != 0);
}
CHK((rs = g_ops->readTuples(240, NdbScanOperation::LM_Exclusive)) != 0);
CHK((rs = g_ops->readTuples(NdbScanOperation::LM_Exclusive)) != 0);
CHK(g_ops->getValue("PK1", (char*)&tup.m_pk1) != 0);
if (g_opt.m_pk2len != 0)
CHK(g_ops->getValue("PK2", tup.m_pk2) != 0);
......@@ -921,9 +922,9 @@ deleteScan(bool idx)
if (! idx) {
CHK((g_ops = g_con->getNdbScanOperation(g_opt.m_tname)) != 0);
} else {
CHK((g_ops = g_con->getNdbScanOperation(g_opt.m_x2name, g_opt.m_tname)) != 0);
CHK((g_ops = g_con->getNdbIndexScanOperation(g_opt.m_x2name, g_opt.m_tname)) != 0);
}
CHK((rs = g_ops->readTuples(240, NdbScanOperation::LM_Exclusive)) != 0);
CHK((rs = g_ops->readTuples(NdbScanOperation::LM_Exclusive)) != 0);
CHK(g_ops->getValue("PK1", (char*)&tup.m_pk1) != 0);
if (g_opt.m_pk2len != 0)
CHK(g_ops->getValue("PK2", tup.m_pk2) != 0);
......@@ -1131,7 +1132,7 @@ NDB_COMMAND(testOdbcDriver, "testBlobs", "testBlobs", "testBlobs", 65535)
if (strcmp(arg, "-dbgall") == 0) {
g_opt.m_dbg = true;
g_opt.m_dbgall = true;
putenv("NDB_BLOB_DEBUG=1");
putenv(strdup("NDB_BLOB_DEBUG=1"));
continue;
}
if (strcmp(arg, "-full") == 0) {
......
......@@ -440,7 +440,7 @@ testcase(int flag)
int newkey = 0;
if ((con = ndb->startTransaction()) == 0)
return ndberror("startTransaction key=%d", key);
if ((sop = con->getNdbScanOperation(tab)) == 0)
if ((op = sop = con->getNdbScanOperation(tab)) == 0)
return ndberror("getNdbOperation key=%d", key);
if ((rs = sop->readTuples(1)) == 0)
return ndberror("openScanRead key=%d", key);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment