Commit dcd8383b authored by unknown's avatar unknown

added test compare function for testing event

fixed one bug in event, pk comes first in events...

parent 01e9d190
......@@ -252,6 +252,7 @@ private:
Uint32 attrId() const; /* Get attribute id */
bool setNULL(); /* Set NULL indicator */
void setUNDEFINED(); /* Set UNDEFINED indicator */
bool receive_data(const Uint32*, Uint32);
void release(); /* Release memory if allocated */
......@@ -422,6 +423,13 @@ NdbRecAttr::setNULL()
return m_nullable;
}
inline
void
NdbRecAttr::setUNDEFINED()
{
theNULLind = -1;
}
inline
int
NdbRecAttr::isNULL() const
......
......@@ -58,10 +58,14 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N,
m_state(EO_ERROR), m_bufferL(bufferLength)
{
m_eventId = 0;
theFirstRecAttrs[0] = NULL;
theCurrentRecAttrs[0] = NULL;
theFirstRecAttrs[1] = NULL;
theCurrentRecAttrs[1] = NULL;
theFirstPkAttrs[0] = NULL;
theCurrentPkAttrs[0] = NULL;
theFirstPkAttrs[1] = NULL;
theCurrentPkAttrs[1] = NULL;
theFirstDataAttrs[0] = NULL;
theCurrentDataAttrs[0] = NULL;
theFirstDataAttrs[1] = NULL;
theCurrentDataAttrs[1] = NULL;
sdata = NULL;
ptr[0].p = NULL;
ptr[1].p = NULL;
......@@ -94,7 +98,15 @@ NdbEventOperationImpl::~NdbEventOperationImpl()
int i;
if (sdata) NdbMem_Free((char*)sdata);
for (i=0 ; i<2; i++) {
NdbRecAttr *p = theFirstRecAttrs[i];
NdbRecAttr *p = theFirstPkAttrs[i];
while (p) {
NdbRecAttr *p_next = p->next();
m_ndb->releaseRecAttr(p);
p = p_next;
}
}
for (i=0 ; i<2; i++) {
NdbRecAttr *p = theFirstDataAttrs[i];
while (p) {
NdbRecAttr *p_next = p->next();
m_ndb->releaseRecAttr(p);
......@@ -138,14 +150,26 @@ NdbEventOperationImpl::getValue(const NdbColumnImpl *tAttrInfo, char *aValue, in
{
DBUG_ENTER("NdbEventOperationImpl::getValue");
// Insert Attribute Id into ATTRINFO part.
NdbRecAttr *&theFirstRecAttr = theFirstRecAttrs[n];
NdbRecAttr *&theCurrentRecAttr = theCurrentRecAttrs[n];
NdbRecAttr **theFirstAttr;
NdbRecAttr **theCurrentAttr;
if (tAttrInfo->getPrimaryKey())
{
theFirstAttr = &theFirstPkAttrs[n];
theCurrentAttr = &theCurrentPkAttrs[n];
}
else
{
theFirstAttr = &theFirstDataAttrs[n];
theCurrentAttr = &theCurrentDataAttrs[n];
}
/************************************************************************
* Get a Receive Attribute object and link it into the operation object.
************************************************************************/
NdbRecAttr *tRecAttr = m_ndb->getRecAttr();
if (tRecAttr == NULL) {
NdbRecAttr *tAttr = m_ndb->getRecAttr();
if (tAttr == NULL) {
exit(-1);
//setErrorCodeAbort(4000);
DBUG_RETURN(NULL);
......@@ -156,51 +180,51 @@ NdbEventOperationImpl::getValue(const NdbColumnImpl *tAttrInfo, char *aValue, in
* the RecAttr object
* Also set attribute size, array size and attribute type
********************************************************************/
if (tRecAttr->setup(tAttrInfo, aValue)) {
if (tAttr->setup(tAttrInfo, aValue)) {
//setErrorCodeAbort(4000);
m_ndb->releaseRecAttr(tRecAttr);
m_ndb->releaseRecAttr(tAttr);
exit(-1);
DBUG_RETURN(NULL);
}
//theErrorLine++;
tRecAttr->setNULL();
tAttr->setUNDEFINED();
// We want to keep the list sorted to make data insertion easier later
if (theFirstRecAttr == NULL) {
theFirstRecAttr = tRecAttr;
theCurrentRecAttr = tRecAttr;
tRecAttr->next(NULL);
if (*theFirstAttr == NULL) {
*theFirstAttr = tAttr;
*theCurrentAttr = tAttr;
tAttr->next(NULL);
} else {
Uint32 tAttrId = tAttrInfo->m_attrId;
if (tAttrId > theCurrentRecAttr->attrId()) { // right order
theCurrentRecAttr->next(tRecAttr);
tRecAttr->next(NULL);
theCurrentRecAttr = tRecAttr;
} else if (theFirstRecAttr->next() == NULL || // only one in list
theFirstRecAttr->attrId() > tAttrId) {// or first
tRecAttr->next(theFirstRecAttr);
theFirstRecAttr = tRecAttr;
if (tAttrId > (*theCurrentAttr)->attrId()) { // right order
(*theCurrentAttr)->next(tAttr);
tAttr->next(NULL);
*theCurrentAttr = tAttr;
} else if ((*theFirstAttr)->next() == NULL || // only one in list
(*theFirstAttr)->attrId() > tAttrId) {// or first
tAttr->next(*theFirstAttr);
*theFirstAttr = tAttr;
} else { // at least 2 in list and not first and not last
NdbRecAttr *p = theFirstRecAttr;
NdbRecAttr *p = *theFirstAttr;
NdbRecAttr *p_next = p->next();
while (tAttrId > p_next->attrId()) {
p = p_next;
p_next = p->next();
}
if (tAttrId == p_next->attrId()) { // Using same attribute twice
tRecAttr->release(); // do I need to do this?
m_ndb->releaseRecAttr(tRecAttr);
tAttr->release(); // do I need to do this?
m_ndb->releaseRecAttr(tAttr);
exit(-1);
DBUG_RETURN(NULL);
}
// this is it, between p and p_next
p->next(tRecAttr);
tRecAttr->next(p_next);
p->next(tAttr);
tAttr->next(p_next);
}
}
DBUG_RETURN(tRecAttr);
DBUG_RETURN(tAttr);
}
int
......@@ -213,7 +237,8 @@ NdbEventOperationImpl::execute()
DBUG_RETURN(-1);
}
if (theFirstRecAttrs[0] == NULL) { // defaults to get all
if (theFirstPkAttrs[0] == NULL &&
theFirstDataAttrs[0] == NULL) { // defaults to get all
}
......@@ -362,8 +387,10 @@ NdbEventOperationImpl::next(int *pOverrun)
#endif
// now move the data into the RecAttrs
if ((theFirstRecAttrs[0] == NULL) &&
(theFirstRecAttrs[1] == NULL))
if ((theFirstPkAttrs[0] == NULL) &&
(theFirstPkAttrs[1] == NULL) &&
(theFirstDataAttrs[0] == NULL) &&
(theFirstDataAttrs[1] == NULL))
{
DBUG_RETURN(r);
}
......@@ -385,11 +412,28 @@ NdbEventOperationImpl::next(int *pOverrun)
printf("\n");
#endif
NdbRecAttr *tWorkingRecAttr = theFirstRecAttrs[0];
// copy data into the RecAttr's
// we assume that the respective attribute lists are sorted
// first the pk's
{
NdbRecAttr *tAttr= theFirstPkAttrs[0];
while(tAttr)
{
assert(aAttrPtr < aAttrEndPtr);
unsigned tDataSz= AttributeHeader(*aAttrPtr).getDataSize();
assert(tAttr->attrId() ==
AttributeHeader(*aAttrPtr).getAttributeId());
assert(tAttr->receive_data(aDataPtr, tDataSz));
// next
aAttrPtr++;
aDataPtr+= tDataSz;
tAttr= tAttr->next();
}
}
NdbRecAttr *tWorkingRecAttr = theFirstDataAttrs[0];
Uint32 tRecAttrId;
Uint32 tAttrId;
Uint32 tDataSz;
......@@ -401,7 +445,7 @@ NdbEventOperationImpl::next(int *pOverrun)
while (tAttrId > tRecAttrId) {
//printf("[%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId);
tWorkingRecAttr->setNULL();
tWorkingRecAttr->setUNDEFINED();
tWorkingRecAttr = tWorkingRecAttr->next();
if (tWorkingRecAttr == NULL)
break;
......@@ -413,32 +457,25 @@ NdbEventOperationImpl::next(int *pOverrun)
//printf("[%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId);
if (tAttrId == tRecAttrId) {
if (!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey())
hasSomeData++;
hasSomeData++;
//printf("set!\n");
tWorkingRecAttr->receive_data(aDataPtr, tDataSz);
// move forward, data has already moved forward
aAttrPtr++;
aDataPtr += tDataSz;
assert(tWorkingRecAttr->receive_data(aDataPtr, tDataSz));
tWorkingRecAttr = tWorkingRecAttr->next();
} else {
// move only attr forward
aAttrPtr++;
aDataPtr += tDataSz;
}
aAttrPtr++;
aDataPtr += tDataSz;
}
while (tWorkingRecAttr != NULL) {
tRecAttrId = tWorkingRecAttr->attrId();
//printf("set undefined [%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId);
tWorkingRecAttr->setNULL();
tWorkingRecAttr->setUNDEFINED();
tWorkingRecAttr = tWorkingRecAttr->next();
}
tWorkingRecAttr = theFirstRecAttrs[1];
tWorkingRecAttr = theFirstDataAttrs[1];
aDataPtr = ptr[2].p;
Uint32 *aDataEndPtr = aDataPtr + ptr[2].sz;
while ((aDataPtr < aDataEndPtr) && (tWorkingRecAttr != NULL)) {
......@@ -447,7 +484,7 @@ NdbEventOperationImpl::next(int *pOverrun)
tDataSz = AttributeHeader(*aDataPtr).getDataSize();
aDataPtr++;
while (tAttrId > tRecAttrId) {
tWorkingRecAttr->setNULL();
tWorkingRecAttr->setUNDEFINED();
tWorkingRecAttr = tWorkingRecAttr->next();
if (tWorkingRecAttr == NULL)
break;
......@@ -456,20 +493,16 @@ NdbEventOperationImpl::next(int *pOverrun)
if (tWorkingRecAttr == NULL)
break;
if (tAttrId == tRecAttrId) {
if (!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey())
hasSomeData++;
assert(!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey());
hasSomeData++;
tWorkingRecAttr->receive_data(aDataPtr, tDataSz);
aDataPtr += tDataSz;
// move forward, data+attr has already moved forward
assert(tWorkingRecAttr->receive_data(aDataPtr, tDataSz));
tWorkingRecAttr = tWorkingRecAttr->next();
} else {
// move only data+attr forward
aDataPtr += tDataSz;
}
aDataPtr += tDataSz;
}
while (tWorkingRecAttr != NULL) {
tWorkingRecAttr->setNULL();
tWorkingRecAttr->setUNDEFINED();
tWorkingRecAttr = tWorkingRecAttr->next();
}
......@@ -504,7 +537,16 @@ NdbEventOperationImpl::print()
ndbout << "EventId " << m_eventId << "\n";
for (int i = 0; i < 2; i++) {
NdbRecAttr *p = theFirstRecAttrs[i];
NdbRecAttr *p = theFirstPkAttrs[i];
ndbout << " %u " << i;
while (p) {
ndbout << " : " << p->attrId() << " = " << *p;
p = p->next();
}
ndbout << "\n";
}
for (int i = 0; i < 2; i++) {
NdbRecAttr *p = theFirstDataAttrs[i];
ndbout << " %u " << i;
while (p) {
ndbout << " : " << p->attrId() << " = " << *p;
......
......@@ -60,8 +60,10 @@ public:
NdbEventImpl *m_eventImpl;
NdbGlobalEventBufferHandle *m_bufferHandle;
NdbRecAttr *theFirstRecAttrs[2];
NdbRecAttr *theCurrentRecAttrs[2];
NdbRecAttr *theFirstPkAttrs[2];
NdbRecAttr *theCurrentPkAttrs[2];
NdbRecAttr *theFirstDataAttrs[2];
NdbRecAttr *theCurrentDataAttrs[2];
NdbEventOperation::State m_state;
Uint32 m_eventId;
......
......@@ -95,6 +95,8 @@ public:
int numRecords = 1);
NdbIndexScanOperation* pIndexScanOp;
NDBT_ResultRow& get_row(Uint32 idx) { return *rows[idx];}
protected:
void allocRows(int rows);
void deallocRows();
......
......@@ -91,7 +91,11 @@ private:
; \
}
#define ERR(error) ERR_OUT(g_err, error)
#define ERR(error) \
{ \
const NdbError &_error= (error); \
ERR_OUT(g_err, _error); \
}
#define ERR_INFO(error) ERR_OUT(g_info, error)
#endif
......@@ -27,7 +27,7 @@ public:
const NdbRecAttr * attributeStore(int i) const ;
const NdbRecAttr * attributeStore(const char* name) const ;
BaseString c_str();
BaseString c_str() const ;
NdbOut & header (NdbOut &) const;
friend NdbOut & operator << (NdbOut&, const NDBT_ResultRow &);
......@@ -36,6 +36,11 @@ public:
* Make copy of NDBT_ResultRow
*/
NDBT_ResultRow * clone() const;
bool operator==(const NDBT_ResultRow&) const ;
bool operator!=(const NDBT_ResultRow& other) const {
return ! (*this == other);
}
private:
int cols;
......
......@@ -66,6 +66,14 @@ public:
int copyTableData(Ndb*,
const char* destName);
/**
* Compare this table with other_table
*
* return 0 - on equality
* -1 - on error
* >0 - otherwise
*/
int compare(Ndb*, const char * other_table, int flags);
private:
static int takeOverAndDeleteRecord(Ndb*,
......
......@@ -206,15 +206,10 @@ int runVerify(NDBT_Context* ctx, NDBT_Step* step)
char buf[1024];
sprintf(buf, "%s_SHADOW", table->getName());
const NdbDictionary::Table * table_shadow;
if ((table_shadow = GETNDB(step)->getDictionary()->getTable(buf)) == 0)
{
g_err << "Unable to get table " << buf << endl;
return NDBT_FAILED;
}
HugoTransactions hugoTrans(*table_shadow);
if (hugoTrans.pkReadRecords(GETNDB(step), records) != 0){
HugoTransactions hugoTrans(*table);
if (hugoTrans.compare(GETNDB(step), buf, 0))
{
return NDBT_FAILED;
}
......@@ -337,12 +332,36 @@ int runEventApplier(NDBT_Context* ctx, NDBT_Step* step)
DBUG_RETURN(NDBT_FAILED);
}
break;
case NdbDictionary::Event::TE_ALL:
default:
abort();
}
for (i= 0; i < n_columns; i++)
{
if (recAttr[i]->isNULL())
{
if (table->getColumn(i)->getPrimaryKey())
{
g_err << "internal error: primary key isNull()="
<< recAttr[i]->isNULL() << endl;
DBUG_RETURN(NDBT_FAILED);
}
switch (pOp->getEventType()) {
case NdbDictionary::Event::TE_INSERT:
if (recAttr[i]->isNULL() < 0)
{
g_err << "internal error: missing value for insert\n";
DBUG_RETURN(NDBT_FAILED);
}
break;
case NdbDictionary::Event::TE_DELETE:
break;
case NdbDictionary::Event::TE_UPDATE:
break;
default:
abort();
}
}
if (table->getColumn(i)->getPrimaryKey() &&
op->equal(i,recAttr[i]->aRef()))
{
......@@ -358,7 +377,7 @@ int runEventApplier(NDBT_Context* ctx, NDBT_Step* step)
for (i= 0; i < n_columns; i++)
{
if (!table->getColumn(i)->getPrimaryKey() &&
op->setValue(i,recAttr[i]->aRef()))
op->setValue(i,recAttr[i]->isNULL() ? 0:recAttr[i]->aRef()))
{
g_err << "setValue(insert) " << i << " "
<< op->getNdbError().code << " "
......@@ -374,7 +393,7 @@ int runEventApplier(NDBT_Context* ctx, NDBT_Step* step)
{
if (!table->getColumn(i)->getPrimaryKey() &&
recAttr[i]->isNULL() >= 0 &&
op->setValue(i,recAttr[i]->aRef()))
op->setValue(i,recAttr[i]->isNULL() ? 0:recAttr[i]->aRef()))
{
g_err << "setValue(update) " << i << " "
<< op->getNdbError().code << " "
......
......@@ -206,5 +206,5 @@ args: -l 1 -n SR9 T1
#
max-time: 2500
cmd: test_event
args: -n BasicEventOperation T1 T6
args: -n EventOperationApplier
......@@ -84,7 +84,7 @@ NDBT_ResultRow::header (NdbOut & out) const {
return out;
}
BaseString NDBT_ResultRow::c_str() {
BaseString NDBT_ResultRow::c_str() const {
BaseString str;
......@@ -133,3 +133,10 @@ NDBT_ResultRow::clone () const {
return row;
}
bool
NDBT_ResultRow::operator==(const NDBT_ResultRow& other) const
{
// quick and dirty
return c_str() == other.c_str();
}
......@@ -1300,3 +1300,152 @@ UtilTransactions::getOperation(NdbConnection* pTrans,
return 0;
}
}
#include <HugoOperations.hpp>
int
UtilTransactions::compare(Ndb* pNdb, const char* tab_name2, int flags){
NdbError err;
int return_code= -1, row_count= 0;
int retryAttempt = 0, retryMax = 10;
HugoCalculator calc(tab);
NDBT_ResultRow row(tab);
NdbTransaction* pTrans= 0;
const NdbDictionary::Table* tmp= pNdb->getDictionary()->getTable(tab_name2);
if(tmp == 0)
{
g_err << "Unable to lookup table: " << tab_name2
<< endl << pNdb->getDictionary()->getNdbError() << endl;
return -1;
}
const NdbDictionary::Table& tab2= *tmp;
HugoOperations cmp(tab2);
UtilTransactions count(tab2);
while (true){
if (retryAttempt++ >= retryMax){
g_info << "ERROR: has retried this operation " << retryAttempt
<< " times, failing!" << endl;
return -1;
}
NdbScanOperation *pOp= 0;
pTrans = pNdb->startTransaction();
if (pTrans == NULL) {
err = pNdb->getNdbError();
goto error;
}
pOp= pTrans->getNdbScanOperation(tab.getName());
if (pOp == NULL) {
ERR(err= pTrans->getNdbError());
goto error;
}
if( pOp->readTuples(NdbScanOperation::LM_Read) ) {
ERR(err= pTrans->getNdbError());
goto error;
}
if( pOp->interpret_exit_ok() == -1 ) {
ERR(err= pTrans->getNdbError());
goto error;
}
// Read all attributes
{
for (int a = 0; a < tab.getNoOfColumns(); a++){
if ((row.attributeStore(a) =
pOp->getValue(tab.getColumn(a)->getName())) == 0) {
ERR(err= pTrans->getNdbError());
goto error;
}
}
}
if( pTrans->execute(NoCommit) == -1 ) {
ERR(err= pTrans->getNdbError());
goto error;
}
{
int eof;
while((eof = pOp->nextResult(true)) == 0)
{
do {
row_count++;
if(cmp.startTransaction(pNdb) != NDBT_OK)
{
ERR(err= pNdb->getNdbError());
goto error;
}
int rowNo= calc.getIdValue(&row);
if(cmp.pkReadRecord(pNdb, rowNo, 1) != NDBT_OK)
{
ERR(err= cmp.getTransaction()->getNdbError());
goto error;
}
if(cmp.execute_Commit(pNdb) != NDBT_OK)
{
ERR(err= cmp.getTransaction()->getNdbError());
goto error;
}
if(row != cmp.get_row(0))
{
g_err << "COMPARE FAILED" << endl;
g_err << row << endl;
g_err << cmp.get_row(0) << endl;
return_code= 1;
goto close;
}
retryAttempt= 0;
cmp.closeTransaction(pNdb);
} while((eof = pOp->nextResult(false)) == 0);
}
if (eof == -1)
{
err = pTrans->getNdbError();
goto error;
}
}
pTrans->close(); pTrans= 0;
g_info << row_count << " rows compared" << endl;
{
int row_count2;
if(count.selectCount(pNdb, 0, &row_count2) != NDBT_OK)
{
g_err << "Failed to count rows in tab_name2" << endl;
return -1;
}
g_info << row_count2 << " rows in tab_name2" << endl;
return (row_count == row_count2 ? 0 : 1);
}
error:
if(err.status == NdbError::TemporaryError)
{
NdbSleep_MilliSleep(50);
if(pTrans != 0)
{
pTrans->close();
pTrans= 0;
}
if(cmp.getTransaction())
cmp.closeTransaction(pNdb);
continue;
}
break;
}
close:
if(pTrans != 0) pTrans->close();
return return_code;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment