Commit 25c8705d authored by joreland@mysql.com's avatar joreland@mysql.com

ndb - major update HugoOperations/Transactions

     +more code reuse
     +easier testing of indexes
parent b439d7f1
......@@ -754,8 +754,6 @@ public:
Undefined = 0, ///< Undefined object type (initial value)
UniqueHashIndex = 3, ///< Unique un-ordered hash index
///< (only one currently supported)
HashIndex = 4, ///< Non-unique un-ordered hash index
UniqueOrderedIndex = 5, ///< Unique ordered index
OrderedIndex = 6 ///< Non-unique ordered index
};
......
......@@ -24,7 +24,9 @@
class HugoOperations : public UtilTransactions {
public:
HugoOperations(const NdbDictionary::Table&);
HugoOperations(const NdbDictionary::Table&,
const NdbDictionary::Index* idx = 0);
~HugoOperations();
int startTransaction(Ndb*);
int closeTransaction(Ndb*);
......@@ -98,8 +100,6 @@ protected:
struct RsPair { NdbResultSet* m_result_set; int records; };
Vector<RsPair> m_result_sets;
Vector<RsPair> m_executed_result_sets;
private:
NdbConnection* pTrans;
};
#endif
......@@ -25,7 +25,8 @@
class HugoTransactions : public HugoOperations {
public:
HugoTransactions(const NdbDictionary::Table&);
HugoTransactions(const NdbDictionary::Table&,
const NdbDictionary::Index* idx = 0);
~HugoTransactions();
int createEvent(Ndb*);
int eventOperation(Ndb*, void* stats,
......
......@@ -23,14 +23,10 @@ typedef int (ReadCallBackFn)(NDBT_ResultRow*);
class UtilTransactions {
public:
enum ScanLock {
SL_Read = 0,
SL_ReadHold = 1,
SL_Exclusive = 2
};
UtilTransactions(const NdbDictionary::Table&);
UtilTransactions(Ndb* ndb, const char * tableName);
UtilTransactions(const NdbDictionary::Table&,
const NdbDictionary::Index* idx = 0);
UtilTransactions(Ndb* ndb,
const char * tableName, const char * indexName = 0);
int clearTable(Ndb*,
int records = 0,
......@@ -114,6 +110,12 @@ private:
protected:
int m_defaultClearMethod;
const NdbDictionary::Table& tab;
const NdbDictionary::Index* idx;
NdbConnection* pTrans;
NdbOperation* getOperation(NdbConnection*,
NdbOperation::OperationType);
NdbScanOperation* getScanOperation(NdbConnection*);
};
#endif
......@@ -17,11 +17,12 @@
#include <NdbSleep.h>
#include <HugoAsynchTransactions.hpp>
HugoAsynchTransactions::HugoAsynchTransactions(const NdbDictionary::Table& _tab):
HugoTransactions(_tab),
HugoAsynchTransactions::HugoAsynchTransactions(const NdbDictionary::Table& _t)
: HugoTransactions(_t),
transactionsCompleted(0),
numTransactions(0),
transactions(NULL){
transactions(NULL)
{
}
HugoAsynchTransactions::~HugoAsynchTransactions(){
......
......@@ -57,8 +57,10 @@ int HugoOperations::pkReadRecord(Ndb* pNdb,
int a;
allocRows(numRecords);
int check;
for(int r=0; r < numRecords; r++){
NdbOperation* pOp = pTrans->getNdbOperation(tab.getName());
NdbOperation* pOp = getOperation(pTrans, NdbOperation::ReadRequest);
if (pOp == NULL) {
ERR(pTrans->getNdbError());
return NDBT_FAILED;
......@@ -115,7 +117,7 @@ int HugoOperations::pkUpdateRecord(Ndb* pNdb,
allocRows(numRecords);
int check;
for(int r=0; r < numRecords; r++){
NdbOperation* pOp = pTrans->getNdbOperation(tab.getName());
NdbOperation* pOp = getOperation(pTrans, NdbOperation::UpdateRequest);
if (pOp == NULL) {
ERR(pTrans->getNdbError());
return NDBT_FAILED;
......@@ -157,7 +159,7 @@ int HugoOperations::pkInsertRecord(Ndb* pNdb,
int a, check;
for(int r=0; r < numRecords; r++){
NdbOperation* pOp = pTrans->getNdbOperation(tab.getName());
NdbOperation* pOp = getOperation(pTrans, NdbOperation::InsertRequest);
if (pOp == NULL) {
ERR(pTrans->getNdbError());
return NDBT_FAILED;
......@@ -198,7 +200,7 @@ int HugoOperations::pkDeleteRecord(Ndb* pNdb,
int a, check;
for(int r=0; r < numRecords; r++){
NdbOperation* pOp = pTrans->getNdbOperation(tab.getName());
NdbOperation* pOp = getOperation(pTrans, NdbOperation::DeleteRequest);
if (pOp == NULL) {
ERR(pTrans->getNdbError());
return NDBT_FAILED;
......@@ -222,65 +224,6 @@ int HugoOperations::pkDeleteRecord(Ndb* pNdb,
}
return NDBT_OK;
}
#if 0
NdbResultSet*
HugoOperations::scanReadRecords(Ndb* pNdb, ScanLock lock){
NDBT_ResultRow * m_tmpRow = new NDBT_ResultRow(tab);
NdbScanOperation* pOp = pTrans->getNdbScanOperation(tab.getName());
if (pOp == NULL) {
ERR(pTrans->getNdbError());
return 0;
}
int check = 0;
NdbResultSet * rs = 0;
switch(lock){
case SL_ReadHold:
rs = pOp->readTuples(NdbScanOperation::LM_Read, 1, 1);
break;
case SL_Exclusive:
rs = pOp->readTuples(NdbScanOperation::LM_Exclusive, 1, 1);
break;
case SL_Read:
default:
rs = pOp->readTuples(NdbScanOperation::LM_Dirty, 1, 1);
}
if( rs == 0) {
ERR(pTrans->getNdbError());
return 0;
}
check = pOp->interpret_exit_ok();
if( check == -1 ) {
ERR(pTrans->getNdbError());
return 0;
}
// Define attributes to read
for(int a = 0; a<tab.getNoOfColumns(); a++){
if((m_tmpRow->attributeStore(a) =
pOp->getValue(tab.getColumn(a)->getName())) == 0) {
ERR(pTrans->getNdbError());
return 0;
}
}
return rs;
}
int
HugoOperations::readTuples(NdbResultSet* rs){
int res = 0;
while((res = rs->nextResult()) == 0){
}
if(res != 1)
return NDBT_FAILED;
return NDBT_OK;
}
#endif
int HugoOperations::execute_Commit(Ndb* pNdb,
AbortOption eao){
......@@ -392,11 +335,11 @@ int HugoOperations::execute_Rollback(Ndb* pNdb){
return NDBT_OK;
}
HugoOperations::HugoOperations(const NdbDictionary::Table& _tab):
UtilTransactions(_tab),
calc(_tab),
pTrans(NULL){
HugoOperations::HugoOperations(const NdbDictionary::Table& _tab,
const NdbDictionary::Index* idx):
UtilTransactions(_tab, idx),
calc(_tab)
{
}
HugoOperations::~HugoOperations(){
......@@ -531,14 +474,12 @@ HugoOperations::verifyUpdatesValue(int updatesValue, int _numRows){
}
void HugoOperations::allocRows(int _numRows){
deallocRows();
if(_numRows <= 0){
g_info << "Illegal value for num rows : " << _numRows << endl;
abort();
}
for(int b=0; b<_numRows; b++){
for(int b=rows.size(); b<_numRows; b++){
rows.push_back(new NDBT_ResultRow(tab));
}
}
......
......@@ -18,8 +18,9 @@
#include <NdbSleep.h>
HugoTransactions::HugoTransactions(const NdbDictionary::Table& _tab):
HugoOperations(_tab),
HugoTransactions::HugoTransactions(const NdbDictionary::Table& _tab,
const NdbDictionary::Index* idx):
HugoOperations(_tab, idx),
row(_tab){
m_defaultScanUpdateMethod = 3;
......@@ -40,7 +41,6 @@ HugoTransactions::scanReadRecords(Ndb* pNdb,
int retryAttempt = 0;
const int retryMax = 100;
int check, a;
NdbConnection *pTrans;
NdbScanOperation *pOp;
while (true){
......@@ -65,7 +65,7 @@ HugoTransactions::scanReadRecords(Ndb* pNdb,
return NDBT_FAILED;
}
pOp = pTrans->getNdbScanOperation(tab.getName());
pOp = getScanOperation(pTrans);
if (pOp == NULL) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
......@@ -196,7 +196,6 @@ HugoTransactions::scanReadRecords(Ndb* pNdb,
int retryAttempt = 0;
const int retryMax = 100;
int check, a;
NdbConnection *pTrans;
NdbIndexScanOperation *pOp;
while (true){
......@@ -363,162 +362,7 @@ HugoTransactions::scanUpdateRecords1(Ndb* pNdb,
int records,
int abortPercent,
int parallelism){
#if 1
return scanUpdateRecords3(pNdb, records, abortPercent, 1);
#else
int retryAttempt = 0;
const int retryMax = 100;
int check, a;
NdbConnection *pTrans;
NdbOperation *pOp;
while (true){
if (retryAttempt >= retryMax){
g_info << "ERROR: has retried this operation " << retryAttempt
<< " times, failing!" << endl;
return NDBT_FAILED;
}
pTrans = pNdb->startTransaction();
if (pTrans == NULL) {
const NdbError err = pNdb->getNdbError();
if (err.status == NdbError::TemporaryError){
ERR(err);
NdbSleep_MilliSleep(50);
retryAttempt++;
continue;
}
ERR(err);
return NDBT_FAILED;
}
pOp = pTrans->getNdbOperation(tab.getName());
if (pOp == NULL) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
check = pOp->openScanExclusive(parallelism);
if( check == -1 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
check = pOp->interpret_exit_ok();
if( check == -1 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
// Read all attributes from this table
for(a=0; a<tab.getNoOfColumns(); a++){
if((row.attributeStore(a) = pOp->getValue(tab.getColumn(a)->getName())) == NULL){
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
}
check = pTrans->executeScan();
if( check == -1 ) {
const NdbError err = pTrans->getNdbError();
if (err.status == NdbError::TemporaryError){
ERR(err);
pNdb->closeTransaction(pTrans);
NdbSleep_MilliSleep(50);
retryAttempt++;
continue;
}
ERR(err);
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
// Abort after 1-100 or 1-records rows
int ranVal = rand();
int abortCount = ranVal % (records == 0 ? 100 : records);
bool abortTrans = false;
if (abort > 0){
// Abort if abortCount is less then abortPercent
if (abortCount < abortPercent)
abortTrans = true;
}
int eof;
int rows = 0;
eof = pTrans->nextScanResult();
while(eof == 0){
rows++;
if (abortCount == rows && abortTrans == true){
g_info << "Scan is aborted" << endl;
// This scan should be aborted
check = pTrans->stopScan();
if( check == -1 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
pNdb->closeTransaction(pTrans);
return NDBT_OK;
}
int res = takeOverAndUpdateRecord(pNdb, pOp);
if(res == RESTART_SCAN){
eof = -2;
continue;
}
if (res != 0){
pNdb->closeTransaction(pTrans);
return res;
}
eof = pTrans->nextScanResult();
}
if (eof == -1) {
const NdbError err = pTrans->getNdbError();
if (err.status == NdbError::TemporaryError){
ERR(err);
NdbSleep_MilliSleep(50);
switch (err.code){
case 488:
case 245:
case 490:
// Too many active scans, no limit on number of retry attempts
break;
default:
retryAttempt++;
}
continue;
}
ERR(err);
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
if(eof == -2){
pNdb->closeTransaction(pTrans);
NdbSleep_MilliSleep(50);
retryAttempt++;
continue;
}
pNdb->closeTransaction(pTrans);
g_info << rows << " rows have been updated" << endl;
return NDBT_OK;
}
return NDBT_FAILED;
#endif
}
// Scan all records exclusive and update
......@@ -530,168 +374,7 @@ HugoTransactions::scanUpdateRecords2(Ndb* pNdb,
int records,
int abortPercent,
int parallelism){
#if 1
return scanUpdateRecords3(pNdb, records, abortPercent, parallelism);
#else
int retryAttempt = 0;
const int retryMax = 100;
int check, a;
NdbConnection *pTrans;
NdbOperation *pOp;
while (true){
if (retryAttempt >= retryMax){
g_info << "ERROR: has retried this operation " << retryAttempt
<< " times, failing!" << endl;
return NDBT_FAILED;
}
pTrans = pNdb->startTransaction();
if (pTrans == NULL) {
const NdbError err = pNdb->getNdbError();
if (err.status == NdbError::TemporaryError){
ERR(err);
NdbSleep_MilliSleep(50);
retryAttempt++;
continue;
}
ERR(err);
return NDBT_FAILED;
}
pOp = pTrans->getNdbOperation(tab.getName());
if (pOp == NULL) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
check = pOp->openScanExclusive(parallelism);
if( check == -1 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
check = pOp->interpret_exit_ok();
if( check == -1 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
// Read all attributes from this table
for(a=0; a<tab.getNoOfColumns(); a++){
if((row.attributeStore(a) = pOp->getValue(tab.getColumn(a)->getName())) == NULL){
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
}
check = pTrans->executeScan();
if( check == -1 ) {
const NdbError err = pTrans->getNdbError();
if (err.status == NdbError::TemporaryError){
ERR(err);
pNdb->closeTransaction(pTrans);
NdbSleep_MilliSleep(50);
retryAttempt++;
continue;
}
ERR(err);
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
// Abort after 1-100 or 1-records rows
int ranVal = rand();
int abortCount = ranVal % (records == 0 ? 100 : records);
bool abortTrans = false;
if (abort > 0){
// Abort if abortCount is less then abortPercent
if (abortCount < abortPercent)
abortTrans = true;
}
int eof;
int rows = 0;
NdbConnection* pUpTrans;
while((eof = pTrans->nextScanResult(true)) == 0){
pUpTrans = pNdb->startTransaction();
if (pUpTrans == NULL) {
const NdbError err = pNdb->getNdbError();
if (err.status == NdbError::TemporaryError){
ERR(err);
NdbSleep_MilliSleep(50);
retryAttempt++;
continue;
}
ERR(err);
return NDBT_FAILED;
}
do {
rows++;
if (addRowToUpdate(pNdb, pUpTrans, pOp) != 0){
pNdb->closeTransaction(pUpTrans);
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
} while((eof = pTrans->nextScanResult(false)) == 0);
if (abortCount == rows && abortTrans == true){
g_info << "Scan is aborted" << endl;
// This scan should be aborted
check = pTrans->stopScan();
if( check == -1 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
pNdb->closeTransaction(pUpTrans);
return NDBT_FAILED;
}
pNdb->closeTransaction(pTrans);
pNdb->closeTransaction(pUpTrans);
return NDBT_OK;
}
check = pUpTrans->execute(Commit);
if( check == -1 ) {
const NdbError err = pUpTrans->getNdbError();
ERR(err);
pNdb->closeTransaction(pUpTrans);
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
pNdb->closeTransaction(pUpTrans);
}
if (eof == -1) {
const NdbError err = pTrans->getNdbError();
if (err.status == NdbError::TemporaryError){
ERR(err);
pNdb->closeTransaction(pTrans);
NdbSleep_MilliSleep(50);
retryAttempt++;
continue;
}
ERR(err);
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
pNdb->closeTransaction(pTrans);
g_info << rows << " rows have been updated" << endl;
return NDBT_OK;
}
return NDBT_FAILED;
#endif
}
int
......@@ -702,7 +385,6 @@ HugoTransactions::scanUpdateRecords3(Ndb* pNdb,
int retryAttempt = 0;
const int retryMax = 100;
int check, a;
NdbConnection *pTrans;
NdbScanOperation *pOp;
......@@ -725,7 +407,7 @@ HugoTransactions::scanUpdateRecords3(Ndb* pNdb,
return NDBT_FAILED;
}
pOp = pTrans->getNdbScanOperation(tab.getName());
pOp = getScanOperation(pTrans);
if (pOp == NULL) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
......@@ -846,7 +528,6 @@ HugoTransactions::loadTable(Ndb* pNdb,
int check, a;
int retryAttempt = 0;
int retryMax = 5;
NdbConnection *pTrans;
NdbOperation *pOp;
bool first_batch = true;
......@@ -866,6 +547,10 @@ HugoTransactions::loadTable(Ndb* pNdb,
g_info << "|- Inserting records..." << endl;
for (int c=0 ; c<records ; ){
bool closeTrans = true;
if(c + batch > records)
batch = records - c;
if (retryAttempt >= retryMax){
g_info << "Record " << c << " could not be inserted, has retried "
<< retryAttempt << " times " << endl;
......@@ -894,31 +579,12 @@ HugoTransactions::loadTable(Ndb* pNdb,
}
}
for(int b = 0; b < batch && c+b<records; b++){
pOp = pTrans->getNdbOperation(tab.getName());
if (pOp == NULL) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
check = pOp->insertTuple();
if( check == -1 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
// Set a calculated value for each attribute in this table
for (a = 0; a<tab.getNoOfColumns(); a++){
if(setValueForAttr(pOp, a, c+b, 0 ) != 0){
if(pkInsertRecord(pNdb, c, batch) != NDBT_OK)
{
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
}
}
// Execute the transaction and insert the record
if (!oneTrans || (c + batch) >= records) {
......@@ -995,10 +661,21 @@ HugoTransactions::fillTable(Ndb* pNdb,
int check, a, b;
int retryAttempt = 0;
int retryMax = 5;
NdbConnection *pTrans;
NdbOperation *pOp;
g_info << "|- Inserting records..." << endl;
const int org = batch;
const int cols = tab.getNoOfColumns();
const int brow = tab.getRowSizeInBytes();
const int bytes = 12 + brow + 4 * cols;
batch = (batch * 256); // -> 512 -> 65536k per commit
batch = batch/bytes; //
batch = batch == 0 ? 1 : batch;
if(batch != org){
g_info << "batch = " << org << " rowsize = " << bytes
<< " -> rows/commit = " << batch << endl;
}
for (int c=0 ; ; ){
if (retryAttempt >= retryMax){
......@@ -1023,32 +700,13 @@ HugoTransactions::fillTable(Ndb* pNdb,
return NDBT_FAILED;
}
for(b = 0; b < batch; b++){
pOp = pTrans->getNdbOperation(tab.getName());
if (pOp == NULL) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
check = pOp->insertTuple();
if( check == -1 ) {
if(pkInsertRecord(pNdb, c, batch) != NDBT_OK)
{
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
// Set a calculated value for each attribute in this table
for (a = 0; a<tab.getNoOfColumns(); a++){
if(setValueForAttr(pOp, a, c+b, 0 ) != 0){
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
}
}
// Execute the transaction and insert the record
check = pTrans->execute( Commit, CommitAsMuchAsPossible );
if(check == -1 ) {
......@@ -1387,7 +1045,6 @@ HugoTransactions::pkReadRecords(Ndb* pNdb,
int retryAttempt = 0;
const int retryMax = 100;
int check, a;
NdbConnection *pTrans;
NdbOperation *pOp;
if (batchsize == 0) {
......@@ -1395,9 +1052,10 @@ HugoTransactions::pkReadRecords(Ndb* pNdb,
return NDBT_FAILED;
}
allocRows(batchsize);
while (r < records){
if(r + batchsize > records)
batchsize = records - r;
if (retryAttempt >= retryMax){
g_info << "ERROR: has retried this operation " << retryAttempt
<< " times, failing!" << endl;
......@@ -1418,58 +1076,12 @@ HugoTransactions::pkReadRecords(Ndb* pNdb,
return NDBT_FAILED;
}
for(int b=0; (b<batchsize) && (r+b < records); b++){
pOp = pTrans->getNdbOperation(tab.getName());
if (pOp == NULL) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
rand_lock_mode:
switch(lm){
case NdbOperation::LM_Read:
check = pOp->readTuple();
break;
case NdbOperation::LM_Exclusive:
check = pOp->readTupleExclusive();
break;
case NdbOperation::LM_CommittedRead:
check = pOp->dirtyRead();
break;
default:
lm = (NdbOperation::LockMode)((rand() >> 16) & 3);
goto rand_lock_mode;
}
if( check == -1 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
// Define primary keys
for(a = 0; a<tab.getNoOfColumns(); a++){
if (tab.getColumn(a)->getPrimaryKey() == true){
if(equalForAttr(pOp, a, r+b) != 0){
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
}
}
// Define attributes to read
for(a = 0; a<tab.getNoOfColumns(); a++){
if((rows[b]->attributeStore(a) =
pOp->getValue(tab.getColumn(a)->getName())) == 0) {
if(pkReadRecord(pNdb, r, batchsize, lm) != NDBT_OK)
{
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
}
}
check = pTrans->execute(Commit);
if( check == -1 ) {
......@@ -1524,13 +1136,14 @@ HugoTransactions::pkUpdateRecords(Ndb* pNdb,
int retryAttempt = 0;
const int retryMax = 100;
int check, a, b;
NdbConnection *pTrans;
NdbOperation *pOp;
allocRows(batch);
g_info << "|- Updating records (batch=" << batch << ")..." << endl;
while (r < records){
if(r + batch > records)
batch = records - r;
if (retryAttempt >= retryMax){
g_info << "ERROR: has retried this operation " << retryAttempt
......@@ -1555,42 +1168,12 @@ HugoTransactions::pkUpdateRecords(Ndb* pNdb,
return NDBT_FAILED;
}
for(b = 0; b<batch && (r+b) < records; b++){
pOp = pTrans->getNdbOperation(tab.getName());
if (pOp == NULL) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
check = pOp->readTupleExclusive();
if( check == -1 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
// Define primary keys
for(a = 0; a<tab.getNoOfColumns(); a++){
if (tab.getColumn(a)->getPrimaryKey() == true){
if(equalForAttr(pOp, a, r+b) != 0){
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
}
}
// Define attributes to read
for(a = 0; a<tab.getNoOfColumns(); a++){
if((rows[b]->attributeStore(a) =
pOp->getValue(tab.getColumn(a)->getName())) == 0) {
if(pkReadRecord(pNdb, r, batch, NdbOperation::LM_Exclusive) != NDBT_OK)
{
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
}
}
check = pTrans->execute(NoCommit);
if( check == -1 ) {
......@@ -1608,49 +1191,23 @@ HugoTransactions::pkUpdateRecords(Ndb* pNdb,
return NDBT_FAILED;
}
for(b = 0; b<batch && (b+r)<records; b++){
if (calc.verifyRowValues(rows[b]) != 0){
for(b = 0; b<batch && (b+r)<records; b++)
{
if (calc.verifyRowValues(rows[b]) != 0)
{
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
int updates = calc.getUpdatesValue(rows[b]) + 1;
NdbOperation* pUpdOp;
pUpdOp = pTrans->getNdbOperation(tab.getName());
if (pUpdOp == NULL) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
check = pUpdOp->updateTuple();
if( check == -1 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
for(a = 0; a<tab.getNoOfColumns(); a++){
if (tab.getColumn(a)->getPrimaryKey() == true){
if(equalForAttr(pUpdOp, a, r+b) != 0){
if(pkUpdateRecord(pNdb, r+b, 1, updates) != NDBT_OK)
{
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
}
}
for(a = 0; a<tab.getNoOfColumns(); a++){
if (tab.getColumn(a)->getPrimaryKey() == false){
if(setValueForAttr(pUpdOp, a, r+b, updates ) != 0){
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
}
}
}
check = pTrans->execute(Commit);
if( check == -1 ) {
......@@ -1692,7 +1249,6 @@ HugoTransactions::pkInterpretedUpdateRecords(Ndb* pNdb,
int retryAttempt = 0;
const int retryMax = 100;
int check, a;
NdbConnection *pTrans;
while (r < records){
......@@ -1871,11 +1427,12 @@ HugoTransactions::pkDelRecords(Ndb* pNdb,
int retryAttempt = 0;
const int retryMax = 100;
int check, a;
NdbConnection *pTrans;
NdbOperation *pOp;
g_info << "|- Deleting records..." << endl;
while (r < records){
if(r + batch > records)
batch = records - r;
if (retryAttempt >= retryMax){
g_info << "ERROR: has retried this operation " << retryAttempt
......@@ -1900,30 +1457,13 @@ HugoTransactions::pkDelRecords(Ndb* pNdb,
return NDBT_FAILED;
}
pOp = pTrans->getNdbOperation(tab.getName());
if (pOp == NULL) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
check = pOp->deleteTuple();
if( check == -1 ) {
if(pkDeleteRecord(pNdb, r, batch) != NDBT_OK)
{
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
// Define primary keys
for(a = 0; a<tab.getNoOfColumns(); a++){
if (tab.getColumn(a)->getPrimaryKey() == true){
if(equalForAttr(pOp, a, r) != 0){
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
}
}
check = pTrans->execute(Commit);
if( check == -1) {
const NdbError err = pTrans->getNdbError();
......@@ -1961,11 +1501,11 @@ HugoTransactions::pkDelRecords(Ndb* pNdb,
}
}
else {
deleted++;
deleted += batch;
}
pNdb->closeTransaction(pTrans);
r++; // Read next record
r += batch; // Read next record
}
......@@ -1986,8 +1526,8 @@ HugoTransactions::lockRecords(Ndb* pNdb,
int retryAttempt = 0;
const int retryMax = 100;
int check, a, b;
NdbConnection *pTrans;
NdbOperation *pOp;
NdbOperation::LockMode lm = NdbOperation::LM_Exclusive;
// Calculate how many records to lock in each batch
if (percentToLock <= 0)
......@@ -2000,6 +1540,9 @@ HugoTransactions::lockRecords(Ndb* pNdb,
allocRows(lockBatch);
while (r < records){
if(r + lockBatch > records)
lockBatch = records - r;
g_info << "|- Locking " << lockBatch << " records..." << endl;
if (retryAttempt >= retryMax){
......@@ -2022,42 +1565,13 @@ HugoTransactions::lockRecords(Ndb* pNdb,
return NDBT_FAILED;
}
for(b = 0; (b<lockBatch) && (r+b < records); b++){
pOp = pTrans->getNdbOperation(tab.getName());
if (pOp == NULL) {
if(pkReadRecord(pNdb, r, lockBatch, lm) != NDBT_OK)
{
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
check = pOp->readTupleExclusive();
if( check == -1 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
// Define primary keys
for(a = 0; a<tab.getNoOfColumns(); a++){
if (tab.getColumn(a)->getPrimaryKey() == true){
if(equalForAttr(pOp, a, r+b) != 0){
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
}
}
// Define attributes to read
for(a = 0; a<tab.getNoOfColumns(); a++){
if((rows[b]->attributeStore(a) =
pOp->getValue(tab.getColumn(a)->getName())) == 0) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
}
}
// NoCommit lockTime times with 100 millis interval
int sleepInterval = 50;
int lockCount = lockTime / sleepInterval;
......@@ -2116,7 +1630,6 @@ HugoTransactions::lockRecords(Ndb* pNdb,
pNdb->closeTransaction(pTrans);
}
deallocRows();
g_info << "|- Record locking completed" << endl;
......@@ -2133,7 +1646,6 @@ HugoTransactions::indexReadRecords(Ndb* pNdb,
int retryAttempt = 0;
const int retryMax = 100;
int check, a;
NdbConnection *pTrans;
NdbOperation *pOp;
NdbIndexScanOperation *sOp;
NdbResultSet * rs;
......@@ -2284,7 +1796,6 @@ HugoTransactions::indexUpdateRecords(Ndb* pNdb,
int retryAttempt = 0;
const int retryMax = 100;
int check, a, b;
NdbConnection *pTrans;
NdbOperation *pOp;
NdbScanOperation * sOp;
NdbResultSet * rs;
......
......@@ -20,13 +20,20 @@
#define VERBOSE 0
UtilTransactions::UtilTransactions(const NdbDictionary::Table& _tab):
tab(_tab){
UtilTransactions::UtilTransactions(const NdbDictionary::Table& _tab,
const NdbDictionary::Index* _idx):
tab(_tab), idx(_idx), pTrans(0)
{
m_defaultClearMethod = 3;
}
UtilTransactions::UtilTransactions(Ndb* ndb, const char * name) :
tab(* ndb->getDictionary()->getTable(name)){
UtilTransactions::UtilTransactions(Ndb* ndb,
const char * name,
const char * index) :
tab(* ndb->getDictionary()->getTable(name)),
idx(index ? ndb->getDictionary()->getIndex(index, name) : 0),
pTrans(0)
{
m_defaultClearMethod = 3;
}
......@@ -51,300 +58,17 @@ UtilTransactions::clearTable(Ndb* pNdb,
int
UtilTransactions::clearTable1(Ndb* pNdb,
int records,
int parallelism){
#if 1
int parallelism)
{
return clearTable3(pNdb, records, 1);
#else
// Scan all records exclusive and delete
// them one by one
int retryAttempt = 0;
const int retryMax = 100;
int check;
NdbConnection *pTrans;
NdbOperation *pOp;
while (true){
if (retryAttempt >= retryMax){
g_info << "ERROR: Has retried this operation " << retryAttempt
<< " times, failing!" << endl;
return NDBT_FAILED;
}
pTrans = pNdb->startTransaction();
if (pTrans == NULL) {
NdbError err = pNdb->getNdbError();
if (err.status == NdbError::TemporaryError){
ERR(err);
NdbSleep_MilliSleep(50);
retryAttempt++;
continue;
}
ERR(err);
RETURN_FAIL(err);
}
pOp = pTrans->getNdbOperation(tab.getName());
if (pOp == NULL) {
NdbError err = pNdb->getNdbError();
ERR(err);
pNdb->closeTransaction(pTrans);
RETURN_FAIL(err);
}
check = pOp->openScanExclusive(parallelism);
if( check == -1 ) {
NdbError err = pNdb->getNdbError();
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
RETURN_FAIL(err);
}
check = pOp->interpret_exit_ok();
if( check == -1 ) {
NdbError err = pNdb->getNdbError();
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
RETURN_FAIL(err);
}
#if 0
// It's not necessary to read and PK's
// Information about the PK's are sent in
// KEYINFO20 signals anyway and used by takeOverScan
// Read the primary keys from this table
for(int a=0; a<tab.getNoOfColumns(); a++){
if (tab.getColumn(a)->getPrimaryKey()){
if(pOp->getValue(tab.getColumn(a)->getName()) == NULL){
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
RETURN_FAIL(err);
}
}
}
#endif
check = pTrans->executeScan();
if( check == -1 ) {
NdbError err = pTrans->getNdbError();
if (err.status == NdbError::TemporaryError){
ERR(err);
pNdb->closeTransaction(pTrans);
NdbSleep_MilliSleep(50);
retryAttempt++;
continue;
}
ERR(err);
pNdb->closeTransaction(pTrans);
RETURN_FAIL(err);
}
int eof;
int rows = 0;
eof = pTrans->nextScanResult();
while(eof == 0){
rows++;
int res = takeOverAndDeleteRecord(pNdb, pOp);
if(res == RESTART_SCAN){
eof = -2;
continue;
}
if (res != 0){
NdbError err = pNdb->getNdbError(res);
pNdb->closeTransaction(pTrans);
RETURN_FAIL(err);
}
eof = pTrans->nextScanResult();
}
if (eof == -1) {
const NdbError err = pTrans->getNdbError();
if (err.status == NdbError::TemporaryError){
ERR(err);
pNdb->closeTransaction(pTrans);
NdbSleep_MilliSleep(50);
// If error = 488 there should be no limit on number of retry attempts
if (err.code != 488)
retryAttempt++;
continue;
}
ERR(err);
pNdb->closeTransaction(pTrans);
RETURN_FAIL(err);
}
if(eof == -2){
pNdb->closeTransaction(pTrans);
NdbSleep_MilliSleep(50);
retryAttempt++;
continue;
}
pNdb->closeTransaction(pTrans);
g_info << rows << " deleted" << endl;
return NDBT_OK;
}
return NDBT_FAILED;
#endif
}
int
UtilTransactions::clearTable2(Ndb* pNdb,
int records,
int parallelism){
#if 1
int parallelism)
{
return clearTable3(pNdb, records, parallelism);
#else
// Scan all records exclusive and delete
// them one by one
int retryAttempt = 0;
const int retryMax = 10;
int deletedRows = 0;
int check;
NdbConnection *pTrans;
NdbOperation *pOp;
while (true){
if (retryAttempt >= retryMax){
g_info << "ERROR: has retried this operation " << retryAttempt
<< " times, failing!" << endl;
return NDBT_FAILED;
}
pTrans = pNdb->startTransaction();
if (pTrans == NULL) {
const NdbError err = pNdb->getNdbError();
if (err.status == NdbError::TemporaryError){
ERR(err);
NdbSleep_MilliSleep(50);
retryAttempt++;
continue;
}
ERR(err);
return NDBT_FAILED;
}
pOp = pTrans->getNdbOperation(tab.getName());
if (pOp == NULL) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
check = pOp->openScanExclusive(parallelism);
if( check == -1 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
check = pOp->interpret_exit_ok();
if( check == -1 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
#if 0
// It's not necessary to read any PK's
// Information about the PK's are sent in
// KEYINFO20 signals anyway and used by takeOverScan
// Read the primary keys from this table
for(int a=0; a<tab.getNoOfColumns(); a++){
if (tab.getColumn(a)->getPrimaryKey()){
if(pOp->getValue(tab.getColumn(a)->getName()) == NULL){
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return -1;
}
}
}
#endif
check = pTrans->executeScan();
if( check == -1 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
int eof;
NdbConnection* pDelTrans;
while((eof = pTrans->nextScanResult(true)) == 0){
pDelTrans = pNdb->startTransaction();
if (pDelTrans == NULL) {
const NdbError err = pNdb->getNdbError();
#if 0
if (err.status == NdbError::TemporaryError){
ERR(err);
NdbSleep_MilliSleep(50);
retryAttempt++;
continue;
}
#endif
ERR(err);
pNdb->closeTransaction(pDelTrans);
return NDBT_FAILED;
}
do {
deletedRows++;
if (addRowToDelete(pNdb, pDelTrans, pOp) != 0){
pNdb->closeTransaction(pDelTrans);
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
} while((eof = pTrans->nextScanResult(false)) == 0);
check = pDelTrans->execute(Commit);
if( check == -1 ) {
const NdbError err = pDelTrans->getNdbError();
ERR(err);
pNdb->closeTransaction(pDelTrans);
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
pNdb->closeTransaction(pDelTrans);
}
if (eof == -1) {
const NdbError err = pTrans->getNdbError();
if (err.status == NdbError::TemporaryError){
ERR(err);
pNdb->closeTransaction(pTrans);
NdbSleep_MilliSleep(50);
// If error = 488 there should be no limit on number of retry attempts
if (err.code != 488)
retryAttempt++;
continue;
}
ERR(err);
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
pNdb->closeTransaction(pTrans);
g_info << deletedRows << " rows deleted" << endl;
return NDBT_OK;
}
return NDBT_FAILED;
#endif
}
int
......@@ -357,7 +81,6 @@ UtilTransactions::clearTable3(Ndb* pNdb,
const int retryMax = 10;
int deletedRows = 0;
int check;
NdbConnection *pTrans;
NdbScanOperation *pOp;
NdbError err;
......@@ -381,7 +104,7 @@ UtilTransactions::clearTable3(Ndb* pNdb,
goto failed;
}
pOp = pTrans->getNdbScanOperation(tab.getName());
pOp = getScanOperation(pTrans);
if (pOp == NULL) {
err = pTrans->getNdbError();
if(err.status == NdbError::TemporaryError){
......@@ -468,7 +191,6 @@ UtilTransactions::copyTableData(Ndb* pNdb,
int insertedRows = 0;
int parallelism = 240;
int check;
NdbConnection *pTrans;
NdbScanOperation *pOp;
NDBT_ResultRow row(tab);
......@@ -628,7 +350,6 @@ UtilTransactions::scanReadRecords(Ndb* pNdb,
int retryAttempt = 0;
const int retryMax = 100;
int check;
NdbConnection *pTrans;
NdbScanOperation *pOp;
NDBT_ResultRow row(tab);
......@@ -654,7 +375,7 @@ UtilTransactions::scanReadRecords(Ndb* pNdb,
return NDBT_FAILED;
}
pOp = pTrans->getNdbScanOperation(tab.getName());
pOp = getScanOperation(pTrans);
if (pOp == NULL) {
const NdbError err = pNdb->getNdbError();
pNdb->closeTransaction(pTrans);
......@@ -768,6 +489,7 @@ UtilTransactions::selectCount(Ndb* pNdb,
if(!pTrans)
pTrans = pNdb->startTransaction();
while (true){
if (retryAttempt >= retryMax){
......@@ -775,7 +497,7 @@ UtilTransactions::selectCount(Ndb* pNdb,
<< " times, failing!" << endl;
return NDBT_FAILED;
}
pOp = pTrans->getNdbScanOperation(tab.getName());
pOp = getScanOperation(pTrans);
if (pOp == NULL) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
......@@ -906,7 +628,6 @@ UtilTransactions::scanAndCompareUniqueIndex(Ndb* pNdb,
int retryAttempt = 0;
const int retryMax = 100;
int check;
NdbConnection *pTrans;
NdbScanOperation *pOp;
NDBT_ResultRow row(tab);
......@@ -1295,7 +1016,6 @@ UtilTransactions::verifyOrderedIndex(Ndb* pNdb,
int retryAttempt = 0;
const int retryMax = 100;
int check;
NdbConnection *pTrans;
NdbScanOperation *pOp;
NdbIndexScanOperation * iop = 0;
NdbResultSet* cursor= 0;
......@@ -1521,3 +1241,60 @@ UtilTransactions::equal(const NdbDictionary::Table* pTable,
}
return 0;
}
NdbScanOperation*
UtilTransactions::getScanOperation(NdbConnection* pTrans)
{
return (NdbScanOperation*)
getOperation(pTrans, NdbOperation::OpenScanRequest);
}
NdbOperation*
UtilTransactions::getOperation(NdbConnection* pTrans,
NdbOperation::OperationType type)
{
switch(type){
case NdbOperation::ReadRequest:
case NdbOperation::ReadExclusive:
if(idx)
{
switch(idx->getType()){
case NdbDictionary::Index::UniqueHashIndex:
return pTrans->getNdbIndexOperation(idx->getName(), tab.getName());
case NdbDictionary::Index::OrderedIndex:
return pTrans->getNdbIndexScanOperation(idx->getName(), tab.getName());
}
}
case NdbOperation::InsertRequest:
case NdbOperation::WriteRequest:
return pTrans->getNdbOperation(tab.getName());
case NdbOperation::UpdateRequest:
case NdbOperation::DeleteRequest:
if(idx)
{
switch(idx->getType()){
case NdbDictionary::Index::UniqueHashIndex:
return pTrans->getNdbIndexOperation(idx->getName(), tab.getName());
}
}
return pTrans->getNdbOperation(tab.getName());
case NdbOperation::OpenScanRequest:
if(idx)
{
switch(idx->getType()){
case NdbDictionary::Index::OrderedIndex:
return pTrans->getNdbIndexScanOperation(idx->getName(), tab.getName());
}
}
return pTrans->getNdbScanOperation(tab.getName());
case NdbOperation::OpenRangeScanRequest:
if(idx)
{
switch(idx->getType()){
case NdbDictionary::Index::OrderedIndex:
return pTrans->getNdbIndexScanOperation(idx->getName(), tab.getName());
}
}
return 0;
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment