added test compare function for testing event

fixed one bug in event, pk comes first in events...
parent f73f5b50
...@@ -252,6 +252,7 @@ private: ...@@ -252,6 +252,7 @@ private:
Uint32 attrId() const; /* Get attribute id */ Uint32 attrId() const; /* Get attribute id */
bool setNULL(); /* Set NULL indicator */ bool setNULL(); /* Set NULL indicator */
void setUNDEFINED(); /* Set UNDEFINED indicator */
bool receive_data(const Uint32*, Uint32); bool receive_data(const Uint32*, Uint32);
void release(); /* Release memory if allocated */ void release(); /* Release memory if allocated */
...@@ -422,6 +423,13 @@ NdbRecAttr::setNULL() ...@@ -422,6 +423,13 @@ NdbRecAttr::setNULL()
return m_nullable; return m_nullable;
} }
inline
void
NdbRecAttr::setUNDEFINED()
{
theNULLind = -1;
}
inline inline
int int
NdbRecAttr::isNULL() const NdbRecAttr::isNULL() const
......
...@@ -58,10 +58,14 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N, ...@@ -58,10 +58,14 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N,
m_state(EO_ERROR), m_bufferL(bufferLength) m_state(EO_ERROR), m_bufferL(bufferLength)
{ {
m_eventId = 0; m_eventId = 0;
theFirstRecAttrs[0] = NULL; theFirstPkAttrs[0] = NULL;
theCurrentRecAttrs[0] = NULL; theCurrentPkAttrs[0] = NULL;
theFirstRecAttrs[1] = NULL; theFirstPkAttrs[1] = NULL;
theCurrentRecAttrs[1] = NULL; theCurrentPkAttrs[1] = NULL;
theFirstDataAttrs[0] = NULL;
theCurrentDataAttrs[0] = NULL;
theFirstDataAttrs[1] = NULL;
theCurrentDataAttrs[1] = NULL;
sdata = NULL; sdata = NULL;
ptr[0].p = NULL; ptr[0].p = NULL;
ptr[1].p = NULL; ptr[1].p = NULL;
...@@ -94,7 +98,15 @@ NdbEventOperationImpl::~NdbEventOperationImpl() ...@@ -94,7 +98,15 @@ NdbEventOperationImpl::~NdbEventOperationImpl()
int i; int i;
if (sdata) NdbMem_Free((char*)sdata); if (sdata) NdbMem_Free((char*)sdata);
for (i=0 ; i<2; i++) { for (i=0 ; i<2; i++) {
NdbRecAttr *p = theFirstRecAttrs[i]; NdbRecAttr *p = theFirstPkAttrs[i];
while (p) {
NdbRecAttr *p_next = p->next();
m_ndb->releaseRecAttr(p);
p = p_next;
}
}
for (i=0 ; i<2; i++) {
NdbRecAttr *p = theFirstDataAttrs[i];
while (p) { while (p) {
NdbRecAttr *p_next = p->next(); NdbRecAttr *p_next = p->next();
m_ndb->releaseRecAttr(p); m_ndb->releaseRecAttr(p);
...@@ -138,14 +150,26 @@ NdbEventOperationImpl::getValue(const NdbColumnImpl *tAttrInfo, char *aValue, in ...@@ -138,14 +150,26 @@ NdbEventOperationImpl::getValue(const NdbColumnImpl *tAttrInfo, char *aValue, in
{ {
DBUG_ENTER("NdbEventOperationImpl::getValue"); DBUG_ENTER("NdbEventOperationImpl::getValue");
// Insert Attribute Id into ATTRINFO part. // Insert Attribute Id into ATTRINFO part.
NdbRecAttr *&theFirstRecAttr = theFirstRecAttrs[n];
NdbRecAttr *&theCurrentRecAttr = theCurrentRecAttrs[n]; NdbRecAttr **theFirstAttr;
NdbRecAttr **theCurrentAttr;
if (tAttrInfo->getPrimaryKey())
{
theFirstAttr = &theFirstPkAttrs[n];
theCurrentAttr = &theCurrentPkAttrs[n];
}
else
{
theFirstAttr = &theFirstDataAttrs[n];
theCurrentAttr = &theCurrentDataAttrs[n];
}
/************************************************************************ /************************************************************************
* Get a Receive Attribute object and link it into the operation object. * Get a Receive Attribute object and link it into the operation object.
************************************************************************/ ************************************************************************/
NdbRecAttr *tRecAttr = m_ndb->getRecAttr(); NdbRecAttr *tAttr = m_ndb->getRecAttr();
if (tRecAttr == NULL) { if (tAttr == NULL) {
exit(-1); exit(-1);
//setErrorCodeAbort(4000); //setErrorCodeAbort(4000);
DBUG_RETURN(NULL); DBUG_RETURN(NULL);
...@@ -156,51 +180,51 @@ NdbEventOperationImpl::getValue(const NdbColumnImpl *tAttrInfo, char *aValue, in ...@@ -156,51 +180,51 @@ NdbEventOperationImpl::getValue(const NdbColumnImpl *tAttrInfo, char *aValue, in
* the RecAttr object * the RecAttr object
* Also set attribute size, array size and attribute type * Also set attribute size, array size and attribute type
********************************************************************/ ********************************************************************/
if (tRecAttr->setup(tAttrInfo, aValue)) { if (tAttr->setup(tAttrInfo, aValue)) {
//setErrorCodeAbort(4000); //setErrorCodeAbort(4000);
m_ndb->releaseRecAttr(tRecAttr); m_ndb->releaseRecAttr(tAttr);
exit(-1); exit(-1);
DBUG_RETURN(NULL); DBUG_RETURN(NULL);
} }
//theErrorLine++; //theErrorLine++;
tRecAttr->setNULL(); tAttr->setUNDEFINED();
// We want to keep the list sorted to make data insertion easier later // We want to keep the list sorted to make data insertion easier later
if (theFirstRecAttr == NULL) {
theFirstRecAttr = tRecAttr; if (*theFirstAttr == NULL) {
theCurrentRecAttr = tRecAttr; *theFirstAttr = tAttr;
tRecAttr->next(NULL); *theCurrentAttr = tAttr;
tAttr->next(NULL);
} else { } else {
Uint32 tAttrId = tAttrInfo->m_attrId; Uint32 tAttrId = tAttrInfo->m_attrId;
if (tAttrId > theCurrentRecAttr->attrId()) { // right order if (tAttrId > (*theCurrentAttr)->attrId()) { // right order
theCurrentRecAttr->next(tRecAttr); (*theCurrentAttr)->next(tAttr);
tRecAttr->next(NULL); tAttr->next(NULL);
theCurrentRecAttr = tRecAttr; *theCurrentAttr = tAttr;
} else if (theFirstRecAttr->next() == NULL || // only one in list } else if ((*theFirstAttr)->next() == NULL || // only one in list
theFirstRecAttr->attrId() > tAttrId) {// or first (*theFirstAttr)->attrId() > tAttrId) {// or first
tRecAttr->next(theFirstRecAttr); tAttr->next(*theFirstAttr);
theFirstRecAttr = tRecAttr; *theFirstAttr = tAttr;
} else { // at least 2 in list and not first and not last } else { // at least 2 in list and not first and not last
NdbRecAttr *p = theFirstRecAttr; NdbRecAttr *p = *theFirstAttr;
NdbRecAttr *p_next = p->next(); NdbRecAttr *p_next = p->next();
while (tAttrId > p_next->attrId()) { while (tAttrId > p_next->attrId()) {
p = p_next; p = p_next;
p_next = p->next(); p_next = p->next();
} }
if (tAttrId == p_next->attrId()) { // Using same attribute twice if (tAttrId == p_next->attrId()) { // Using same attribute twice
tRecAttr->release(); // do I need to do this? tAttr->release(); // do I need to do this?
m_ndb->releaseRecAttr(tRecAttr); m_ndb->releaseRecAttr(tAttr);
exit(-1); exit(-1);
DBUG_RETURN(NULL); DBUG_RETURN(NULL);
} }
// this is it, between p and p_next // this is it, between p and p_next
p->next(tRecAttr); p->next(tAttr);
tRecAttr->next(p_next); tAttr->next(p_next);
} }
} }
DBUG_RETURN(tAttr);
DBUG_RETURN(tRecAttr);
} }
int int
...@@ -213,7 +237,8 @@ NdbEventOperationImpl::execute() ...@@ -213,7 +237,8 @@ NdbEventOperationImpl::execute()
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
if (theFirstRecAttrs[0] == NULL) { // defaults to get all if (theFirstPkAttrs[0] == NULL &&
theFirstDataAttrs[0] == NULL) { // defaults to get all
} }
...@@ -362,8 +387,10 @@ NdbEventOperationImpl::next(int *pOverrun) ...@@ -362,8 +387,10 @@ NdbEventOperationImpl::next(int *pOverrun)
#endif #endif
// now move the data into the RecAttrs // now move the data into the RecAttrs
if ((theFirstRecAttrs[0] == NULL) && if ((theFirstPkAttrs[0] == NULL) &&
(theFirstRecAttrs[1] == NULL)) (theFirstPkAttrs[1] == NULL) &&
(theFirstDataAttrs[0] == NULL) &&
(theFirstDataAttrs[1] == NULL))
{ {
DBUG_RETURN(r); DBUG_RETURN(r);
} }
...@@ -385,11 +412,28 @@ NdbEventOperationImpl::next(int *pOverrun) ...@@ -385,11 +412,28 @@ NdbEventOperationImpl::next(int *pOverrun)
printf("\n"); printf("\n");
#endif #endif
NdbRecAttr *tWorkingRecAttr = theFirstRecAttrs[0];
// copy data into the RecAttr's // copy data into the RecAttr's
// we assume that the respective attribute lists are sorted // we assume that the respective attribute lists are sorted
// first the pk's
{
NdbRecAttr *tAttr= theFirstPkAttrs[0];
while(tAttr)
{
assert(aAttrPtr < aAttrEndPtr);
unsigned tDataSz= AttributeHeader(*aAttrPtr).getDataSize();
assert(tAttr->attrId() ==
AttributeHeader(*aAttrPtr).getAttributeId());
assert(tAttr->receive_data(aDataPtr, tDataSz));
// next
aAttrPtr++;
aDataPtr+= tDataSz;
tAttr= tAttr->next();
}
}
NdbRecAttr *tWorkingRecAttr = theFirstDataAttrs[0];
Uint32 tRecAttrId; Uint32 tRecAttrId;
Uint32 tAttrId; Uint32 tAttrId;
Uint32 tDataSz; Uint32 tDataSz;
...@@ -401,7 +445,7 @@ NdbEventOperationImpl::next(int *pOverrun) ...@@ -401,7 +445,7 @@ NdbEventOperationImpl::next(int *pOverrun)
while (tAttrId > tRecAttrId) { while (tAttrId > tRecAttrId) {
//printf("[%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId); //printf("[%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId);
tWorkingRecAttr->setNULL(); tWorkingRecAttr->setUNDEFINED();
tWorkingRecAttr = tWorkingRecAttr->next(); tWorkingRecAttr = tWorkingRecAttr->next();
if (tWorkingRecAttr == NULL) if (tWorkingRecAttr == NULL)
break; break;
...@@ -413,32 +457,25 @@ NdbEventOperationImpl::next(int *pOverrun) ...@@ -413,32 +457,25 @@ NdbEventOperationImpl::next(int *pOverrun)
//printf("[%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId); //printf("[%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId);
if (tAttrId == tRecAttrId) { if (tAttrId == tRecAttrId) {
if (!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey())
hasSomeData++; hasSomeData++;
//printf("set!\n"); //printf("set!\n");
tWorkingRecAttr->receive_data(aDataPtr, tDataSz); assert(tWorkingRecAttr->receive_data(aDataPtr, tDataSz));
// move forward, data has already moved forward
aAttrPtr++;
aDataPtr += tDataSz;
tWorkingRecAttr = tWorkingRecAttr->next(); tWorkingRecAttr = tWorkingRecAttr->next();
} else { }
// move only attr forward
aAttrPtr++; aAttrPtr++;
aDataPtr += tDataSz; aDataPtr += tDataSz;
} }
}
while (tWorkingRecAttr != NULL) { while (tWorkingRecAttr != NULL) {
tRecAttrId = tWorkingRecAttr->attrId(); tRecAttrId = tWorkingRecAttr->attrId();
//printf("set undefined [%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId); //printf("set undefined [%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId);
tWorkingRecAttr->setNULL(); tWorkingRecAttr->setUNDEFINED();
tWorkingRecAttr = tWorkingRecAttr->next(); tWorkingRecAttr = tWorkingRecAttr->next();
} }
tWorkingRecAttr = theFirstRecAttrs[1]; tWorkingRecAttr = theFirstDataAttrs[1];
aDataPtr = ptr[2].p; aDataPtr = ptr[2].p;
Uint32 *aDataEndPtr = aDataPtr + ptr[2].sz; Uint32 *aDataEndPtr = aDataPtr + ptr[2].sz;
while ((aDataPtr < aDataEndPtr) && (tWorkingRecAttr != NULL)) { while ((aDataPtr < aDataEndPtr) && (tWorkingRecAttr != NULL)) {
...@@ -447,7 +484,7 @@ NdbEventOperationImpl::next(int *pOverrun) ...@@ -447,7 +484,7 @@ NdbEventOperationImpl::next(int *pOverrun)
tDataSz = AttributeHeader(*aDataPtr).getDataSize(); tDataSz = AttributeHeader(*aDataPtr).getDataSize();
aDataPtr++; aDataPtr++;
while (tAttrId > tRecAttrId) { while (tAttrId > tRecAttrId) {
tWorkingRecAttr->setNULL(); tWorkingRecAttr->setUNDEFINED();
tWorkingRecAttr = tWorkingRecAttr->next(); tWorkingRecAttr = tWorkingRecAttr->next();
if (tWorkingRecAttr == NULL) if (tWorkingRecAttr == NULL)
break; break;
...@@ -456,20 +493,16 @@ NdbEventOperationImpl::next(int *pOverrun) ...@@ -456,20 +493,16 @@ NdbEventOperationImpl::next(int *pOverrun)
if (tWorkingRecAttr == NULL) if (tWorkingRecAttr == NULL)
break; break;
if (tAttrId == tRecAttrId) { if (tAttrId == tRecAttrId) {
if (!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey()) assert(!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey());
hasSomeData++; hasSomeData++;
tWorkingRecAttr->receive_data(aDataPtr, tDataSz); assert(tWorkingRecAttr->receive_data(aDataPtr, tDataSz));
aDataPtr += tDataSz;
// move forward, data+attr has already moved forward
tWorkingRecAttr = tWorkingRecAttr->next(); tWorkingRecAttr = tWorkingRecAttr->next();
} else {
// move only data+attr forward
aDataPtr += tDataSz;
} }
aDataPtr += tDataSz;
} }
while (tWorkingRecAttr != NULL) { while (tWorkingRecAttr != NULL) {
tWorkingRecAttr->setNULL(); tWorkingRecAttr->setUNDEFINED();
tWorkingRecAttr = tWorkingRecAttr->next(); tWorkingRecAttr = tWorkingRecAttr->next();
} }
...@@ -504,7 +537,16 @@ NdbEventOperationImpl::print() ...@@ -504,7 +537,16 @@ NdbEventOperationImpl::print()
ndbout << "EventId " << m_eventId << "\n"; ndbout << "EventId " << m_eventId << "\n";
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++) {
NdbRecAttr *p = theFirstRecAttrs[i]; NdbRecAttr *p = theFirstPkAttrs[i];
ndbout << " %u " << i;
while (p) {
ndbout << " : " << p->attrId() << " = " << *p;
p = p->next();
}
ndbout << "\n";
}
for (int i = 0; i < 2; i++) {
NdbRecAttr *p = theFirstDataAttrs[i];
ndbout << " %u " << i; ndbout << " %u " << i;
while (p) { while (p) {
ndbout << " : " << p->attrId() << " = " << *p; ndbout << " : " << p->attrId() << " = " << *p;
......
...@@ -60,8 +60,10 @@ public: ...@@ -60,8 +60,10 @@ public:
NdbEventImpl *m_eventImpl; NdbEventImpl *m_eventImpl;
NdbGlobalEventBufferHandle *m_bufferHandle; NdbGlobalEventBufferHandle *m_bufferHandle;
NdbRecAttr *theFirstRecAttrs[2]; NdbRecAttr *theFirstPkAttrs[2];
NdbRecAttr *theCurrentRecAttrs[2]; NdbRecAttr *theCurrentPkAttrs[2];
NdbRecAttr *theFirstDataAttrs[2];
NdbRecAttr *theCurrentDataAttrs[2];
NdbEventOperation::State m_state; NdbEventOperation::State m_state;
Uint32 m_eventId; Uint32 m_eventId;
......
...@@ -95,6 +95,8 @@ public: ...@@ -95,6 +95,8 @@ public:
int numRecords = 1); int numRecords = 1);
NdbIndexScanOperation* pIndexScanOp; NdbIndexScanOperation* pIndexScanOp;
NDBT_ResultRow& get_row(Uint32 idx) { return *rows[idx];}
protected: protected:
void allocRows(int rows); void allocRows(int rows);
void deallocRows(); void deallocRows();
......
...@@ -91,7 +91,11 @@ private: ...@@ -91,7 +91,11 @@ private:
; \ ; \
} }
#define ERR(error) ERR_OUT(g_err, error) #define ERR(error) \
{ \
const NdbError &_error= (error); \
ERR_OUT(g_err, _error); \
}
#define ERR_INFO(error) ERR_OUT(g_info, error) #define ERR_INFO(error) ERR_OUT(g_info, error)
#endif #endif
...@@ -27,7 +27,7 @@ public: ...@@ -27,7 +27,7 @@ public:
const NdbRecAttr * attributeStore(int i) const ; const NdbRecAttr * attributeStore(int i) const ;
const NdbRecAttr * attributeStore(const char* name) const ; const NdbRecAttr * attributeStore(const char* name) const ;
BaseString c_str(); BaseString c_str() const ;
NdbOut & header (NdbOut &) const; NdbOut & header (NdbOut &) const;
friend NdbOut & operator << (NdbOut&, const NDBT_ResultRow &); friend NdbOut & operator << (NdbOut&, const NDBT_ResultRow &);
...@@ -37,6 +37,11 @@ public: ...@@ -37,6 +37,11 @@ public:
*/ */
NDBT_ResultRow * clone() const; NDBT_ResultRow * clone() const;
bool operator==(const NDBT_ResultRow&) const ;
bool operator!=(const NDBT_ResultRow& other) const {
return ! (*this == other);
}
private: private:
int cols; int cols;
char **names; char **names;
......
...@@ -66,6 +66,14 @@ public: ...@@ -66,6 +66,14 @@ public:
int copyTableData(Ndb*, int copyTableData(Ndb*,
const char* destName); const char* destName);
/**
* Compare this table with other_table
*
* return 0 - on equality
* -1 - on error
* >0 - otherwise
*/
int compare(Ndb*, const char * other_table, int flags);
private: private:
static int takeOverAndDeleteRecord(Ndb*, static int takeOverAndDeleteRecord(Ndb*,
......
...@@ -206,15 +206,10 @@ int runVerify(NDBT_Context* ctx, NDBT_Step* step) ...@@ -206,15 +206,10 @@ int runVerify(NDBT_Context* ctx, NDBT_Step* step)
char buf[1024]; char buf[1024];
sprintf(buf, "%s_SHADOW", table->getName()); sprintf(buf, "%s_SHADOW", table->getName());
const NdbDictionary::Table * table_shadow;
if ((table_shadow = GETNDB(step)->getDictionary()->getTable(buf)) == 0)
{
g_err << "Unable to get table " << buf << endl;
return NDBT_FAILED;
}
HugoTransactions hugoTrans(*table_shadow); HugoTransactions hugoTrans(*table);
if (hugoTrans.pkReadRecords(GETNDB(step), records) != 0){ if (hugoTrans.compare(GETNDB(step), buf, 0))
{
return NDBT_FAILED; return NDBT_FAILED;
} }
...@@ -337,12 +332,36 @@ int runEventApplier(NDBT_Context* ctx, NDBT_Step* step) ...@@ -337,12 +332,36 @@ int runEventApplier(NDBT_Context* ctx, NDBT_Step* step)
DBUG_RETURN(NDBT_FAILED); DBUG_RETURN(NDBT_FAILED);
} }
break; break;
case NdbDictionary::Event::TE_ALL: default:
abort(); abort();
} }
for (i= 0; i < n_columns; i++) for (i= 0; i < n_columns; i++)
{ {
if (recAttr[i]->isNULL())
{
if (table->getColumn(i)->getPrimaryKey())
{
g_err << "internal error: primary key isNull()="
<< recAttr[i]->isNULL() << endl;
DBUG_RETURN(NDBT_FAILED);
}
switch (pOp->getEventType()) {
case NdbDictionary::Event::TE_INSERT:
if (recAttr[i]->isNULL() < 0)
{
g_err << "internal error: missing value for insert\n";
DBUG_RETURN(NDBT_FAILED);
}
break;
case NdbDictionary::Event::TE_DELETE:
break;
case NdbDictionary::Event::TE_UPDATE:
break;
default:
abort();
}
}
if (table->getColumn(i)->getPrimaryKey() && if (table->getColumn(i)->getPrimaryKey() &&
op->equal(i,recAttr[i]->aRef())) op->equal(i,recAttr[i]->aRef()))
{ {
...@@ -358,7 +377,7 @@ int runEventApplier(NDBT_Context* ctx, NDBT_Step* step) ...@@ -358,7 +377,7 @@ int runEventApplier(NDBT_Context* ctx, NDBT_Step* step)
for (i= 0; i < n_columns; i++) for (i= 0; i < n_columns; i++)
{ {
if (!table->getColumn(i)->getPrimaryKey() && if (!table->getColumn(i)->getPrimaryKey() &&
op->setValue(i,recAttr[i]->aRef())) op->setValue(i,recAttr[i]->isNULL() ? 0:recAttr[i]->aRef()))
{ {
g_err << "setValue(insert) " << i << " " g_err << "setValue(insert) " << i << " "
<< op->getNdbError().code << " " << op->getNdbError().code << " "
...@@ -374,7 +393,7 @@ int runEventApplier(NDBT_Context* ctx, NDBT_Step* step) ...@@ -374,7 +393,7 @@ int runEventApplier(NDBT_Context* ctx, NDBT_Step* step)
{ {
if (!table->getColumn(i)->getPrimaryKey() && if (!table->getColumn(i)->getPrimaryKey() &&
recAttr[i]->isNULL() >= 0 && recAttr[i]->isNULL() >= 0 &&
op->setValue(i,recAttr[i]->aRef())) op->setValue(i,recAttr[i]->isNULL() ? 0:recAttr[i]->aRef()))
{ {
g_err << "setValue(update) " << i << " " g_err << "setValue(update) " << i << " "
<< op->getNdbError().code << " " << op->getNdbError().code << " "
......
...@@ -206,5 +206,5 @@ args: -l 1 -n SR9 T1 ...@@ -206,5 +206,5 @@ args: -l 1 -n SR9 T1
# #
max-time: 2500 max-time: 2500
cmd: test_event cmd: test_event
args: -n BasicEventOperation T1 T6 args: -n EventOperationApplier
...@@ -84,7 +84,7 @@ NDBT_ResultRow::header (NdbOut & out) const { ...@@ -84,7 +84,7 @@ NDBT_ResultRow::header (NdbOut & out) const {
return out; return out;
} }
BaseString NDBT_ResultRow::c_str() { BaseString NDBT_ResultRow::c_str() const {
BaseString str; BaseString str;
...@@ -133,3 +133,10 @@ NDBT_ResultRow::clone () const { ...@@ -133,3 +133,10 @@ NDBT_ResultRow::clone () const {
return row; return row;
} }
bool
NDBT_ResultRow::operator==(const NDBT_ResultRow& other) const
{
// quick and dirty
return c_str() == other.c_str();
}
...@@ -1300,3 +1300,152 @@ UtilTransactions::getOperation(NdbConnection* pTrans, ...@@ -1300,3 +1300,152 @@ UtilTransactions::getOperation(NdbConnection* pTrans,
return 0; return 0;
} }
} }
#include <HugoOperations.hpp>
int
UtilTransactions::compare(Ndb* pNdb, const char* tab_name2, int flags){
NdbError err;
int return_code= -1, row_count= 0;
int retryAttempt = 0, retryMax = 10;
HugoCalculator calc(tab);
NDBT_ResultRow row(tab);
NdbTransaction* pTrans= 0;
const NdbDictionary::Table* tmp= pNdb->getDictionary()->getTable(tab_name2);
if(tmp == 0)
{
g_err << "Unable to lookup table: " << tab_name2
<< endl << pNdb->getDictionary()->getNdbError() << endl;
return -1;
}
const NdbDictionary::Table& tab2= *tmp;
HugoOperations cmp(tab2);
UtilTransactions count(tab2);
while (true){
if (retryAttempt++ >= retryMax){
g_info << "ERROR: has retried this operation " << retryAttempt
<< " times, failing!" << endl;
return -1;
}
NdbScanOperation *pOp= 0;
pTrans = pNdb->startTransaction();
if (pTrans == NULL) {
err = pNdb->getNdbError();
goto error;
}
pOp= pTrans->getNdbScanOperation(tab.getName());
if (pOp == NULL) {
ERR(err= pTrans->getNdbError());
goto error;
}
if( pOp->readTuples(NdbScanOperation::LM_Read) ) {
ERR(err= pTrans->getNdbError());
goto error;
}
if( pOp->interpret_exit_ok() == -1 ) {
ERR(err= pTrans->getNdbError());
goto error;
}
// Read all attributes
{
for (int a = 0; a < tab.getNoOfColumns(); a++){
if ((row.attributeStore(a) =
pOp->getValue(tab.getColumn(a)->getName())) == 0) {
ERR(err= pTrans->getNdbError());
goto error;
}
}
}
if( pTrans->execute(NoCommit) == -1 ) {
ERR(err= pTrans->getNdbError());
goto error;
}
{
int eof;
while((eof = pOp->nextResult(true)) == 0)
{
do {
row_count++;
if(cmp.startTransaction(pNdb) != NDBT_OK)
{
ERR(err= pNdb->getNdbError());
goto error;
}
int rowNo= calc.getIdValue(&row);
if(cmp.pkReadRecord(pNdb, rowNo, 1) != NDBT_OK)
{
ERR(err= cmp.getTransaction()->getNdbError());
goto error;
}
if(cmp.execute_Commit(pNdb) != NDBT_OK)
{
ERR(err= cmp.getTransaction()->getNdbError());
goto error;
}
if(row != cmp.get_row(0))
{
g_err << "COMPARE FAILED" << endl;
g_err << row << endl;
g_err << cmp.get_row(0) << endl;
return_code= 1;
goto close;
}
retryAttempt= 0;
cmp.closeTransaction(pNdb);
} while((eof = pOp->nextResult(false)) == 0);
}
if (eof == -1)
{
err = pTrans->getNdbError();
goto error;
}
}
pTrans->close(); pTrans= 0;
g_info << row_count << " rows compared" << endl;
{
int row_count2;
if(count.selectCount(pNdb, 0, &row_count2) != NDBT_OK)
{
g_err << "Failed to count rows in tab_name2" << endl;
return -1;
}
g_info << row_count2 << " rows in tab_name2" << endl;
return (row_count == row_count2 ? 0 : 1);
}
error:
if(err.status == NdbError::TemporaryError)
{
NdbSleep_MilliSleep(50);
if(pTrans != 0)
{
pTrans->close();
pTrans= 0;
}
if(cmp.getTransaction())
cmp.closeTransaction(pNdb);
continue;
}
break;
}
close:
if(pTrans != 0) pTrans->close();
return return_code;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment