ndb - hugo - add new method for setting primary key(s)

parent 3ddd25f2
...@@ -84,6 +84,8 @@ public: ...@@ -84,6 +84,8 @@ public:
int attrId, int attrId,
int rowId); int rowId);
int equalForRow(NdbOperation*, int rowid);
int setValues(NdbOperation*, int rowId, int updateId); int setValues(NdbOperation*, int rowId, int updateId);
int verifyUpdatesValue(int updatesValue, int _numRows = 0); int verifyUpdatesValue(int updatesValue, int _numRows = 0);
......
...@@ -188,15 +188,12 @@ HugoAsynchTransactions::pkUpdateRecordsAsynch(Ndb* pNdb, ...@@ -188,15 +188,12 @@ HugoAsynchTransactions::pkUpdateRecordsAsynch(Ndb* pNdb,
// Read // Read
// Define primary keys // Define primary keys
check = pOp->readTupleExclusive(); check = pOp->readTupleExclusive();
for (a = 0; a < tab.getNoOfColumns(); a++) { if (equalForRow(pOp, cReadRecords) != 0)
if (tab.getColumn(a)->getPrimaryKey() == true) { {
if (equalForAttr(pOp, a, cReadRecords) != 0){
ERR(transactions[t]->getNdbError()); ERR(transactions[t]->getNdbError());
pNdb->closeTransaction(transactions[t]); pNdb->closeTransaction(transactions[t]);
return NDBT_FAILED; return NDBT_FAILED;
} }
}
}
// Define attributes to read // Define attributes to read
for (a = 0; a < tab.getNoOfColumns(); a++) { for (a = 0; a < tab.getNoOfColumns(); a++) {
if ((rows[cReadIndex]->attributeStore(a) = if ((rows[cReadIndex]->attributeStore(a) =
...@@ -259,15 +256,12 @@ HugoAsynchTransactions::pkUpdateRecordsAsynch(Ndb* pNdb, ...@@ -259,15 +256,12 @@ HugoAsynchTransactions::pkUpdateRecordsAsynch(Ndb* pNdb,
} }
// Set search condition for the record // Set search condition for the record
for (a = 0; a < tab.getNoOfColumns(); a++) { if (equalForRow(pOp, cReadRecords) != 0)
if (tab.getColumn(a)->getPrimaryKey() == true) { {
if (equalForAttr(pOp, a, cRecords) != 0) {
ERR(transactions[t]->getNdbError()); ERR(transactions[t]->getNdbError());
pNdb->closeTransaction(transactions[t]); pNdb->closeTransaction(transactions[t]);
return NDBT_FAILED; return NDBT_FAILED;
} }
}
}
// Update the record // Update the record
for (a = 0; a < tab.getNoOfColumns(); a++) { for (a = 0; a < tab.getNoOfColumns(); a++) {
...@@ -396,15 +390,12 @@ HugoAsynchTransactions::executeAsynchOperation(Ndb* pNdb, ...@@ -396,15 +390,12 @@ HugoAsynchTransactions::executeAsynchOperation(Ndb* pNdb,
case NO_READ: case NO_READ:
// Define primary keys // Define primary keys
check = pOp->readTuple(); check = pOp->readTuple();
for (a = 0; a < tab.getNoOfColumns(); a++) { if (equalForRow(pOp, cRecords) != 0)
if (tab.getColumn(a)->getPrimaryKey() == true) { {
if (equalForAttr(pOp, a, cRecords) != 0){
ERR(transactions[t]->getNdbError()); ERR(transactions[t]->getNdbError());
pNdb->closeTransaction(transactions[t]); pNdb->closeTransaction(transactions[t]);
return NDBT_FAILED; return NDBT_FAILED;
} }
}
}
// Define attributes to read // Define attributes to read
for (a = 0; a < tab.getNoOfColumns(); a++) { for (a = 0; a < tab.getNoOfColumns(); a++) {
if ((rows[cIndex]->attributeStore(a) = if ((rows[cIndex]->attributeStore(a) =
...@@ -425,15 +416,12 @@ HugoAsynchTransactions::executeAsynchOperation(Ndb* pNdb, ...@@ -425,15 +416,12 @@ HugoAsynchTransactions::executeAsynchOperation(Ndb* pNdb,
} }
// Define primary keys // Define primary keys
for (a = 0; a < tab.getNoOfColumns(); a++) { if (equalForRow(pOp, cRecords) != 0)
if (tab.getColumn(a)->getPrimaryKey() == true){ {
if (equalForAttr(pOp, a, cRecords) != 0) {
ERR(transactions[t]->getNdbError()); ERR(transactions[t]->getNdbError());
pNdb->closeTransaction(transactions[t]); pNdb->closeTransaction(transactions[t]);
return NDBT_FAILED; return NDBT_FAILED;
} }
}
}
break; break;
default: default:
// Should not happen... // Should not happen...
......
...@@ -111,14 +111,8 @@ rand_lock_mode: ...@@ -111,14 +111,8 @@ rand_lock_mode:
} }
// Define primary keys // Define primary keys
for(a = 0; a<tab.getNoOfColumns(); a++){ if (equalForRow(pOp, r+recordNo) != 0)
if (tab.getColumn(a)->getPrimaryKey() == true){
if(equalForAttr(pOp, a, r+recordNo) != 0){
ERR(pTrans->getNdbError());
return NDBT_FAILED; return NDBT_FAILED;
}
}
}
if(pIndexScanOp) if(pIndexScanOp)
pIndexScanOp->end_of_bound(r); pIndexScanOp->end_of_bound(r);
...@@ -143,7 +137,6 @@ int HugoOperations::pkUpdateRecord(Ndb* pNdb, ...@@ -143,7 +137,6 @@ int HugoOperations::pkUpdateRecord(Ndb* pNdb,
int recordNo, int recordNo,
int numRecords, int numRecords,
int updatesValue){ int updatesValue){
int a;
allocRows(numRecords); allocRows(numRecords);
int check; int check;
for(int r=0; r < numRecords; r++){ for(int r=0; r < numRecords; r++){
...@@ -172,14 +165,8 @@ HugoOperations::setValues(NdbOperation* pOp, int rowId, int updateId) ...@@ -172,14 +165,8 @@ HugoOperations::setValues(NdbOperation* pOp, int rowId, int updateId)
{ {
// Define primary keys // Define primary keys
int a; int a;
for(a = 0; a<tab.getNoOfColumns(); a++){ if (equalForRow(pOp, rowId) != 0)
if (tab.getColumn(a)->getPrimaryKey() == true){
if(equalForAttr(pOp, a, rowId) != 0){
ERR(pTrans->getNdbError());
return NDBT_FAILED; return NDBT_FAILED;
}
}
}
for(a = 0; a<tab.getNoOfColumns(); a++){ for(a = 0; a<tab.getNoOfColumns(); a++){
if (tab.getColumn(a)->getPrimaryKey() == false){ if (tab.getColumn(a)->getPrimaryKey() == false){
...@@ -198,7 +185,7 @@ int HugoOperations::pkInsertRecord(Ndb* pNdb, ...@@ -198,7 +185,7 @@ int HugoOperations::pkInsertRecord(Ndb* pNdb,
int numRecords, int numRecords,
int updatesValue){ int updatesValue){
int a, check; int check;
for(int r=0; r < numRecords; r++){ for(int r=0; r < numRecords; r++){
NdbOperation* pOp = getOperation(pTrans, NdbOperation::InsertRequest); NdbOperation* pOp = getOperation(pTrans, NdbOperation::InsertRequest);
if (pOp == NULL) { if (pOp == NULL) {
...@@ -240,14 +227,8 @@ int HugoOperations::pkWriteRecord(Ndb* pNdb, ...@@ -240,14 +227,8 @@ int HugoOperations::pkWriteRecord(Ndb* pNdb,
} }
// Define primary keys // Define primary keys
for(a = 0; a<tab.getNoOfColumns(); a++){ if (equalForRow(pOp, r+recordNo) != 0)
if (tab.getColumn(a)->getPrimaryKey() == true){
if(equalForAttr(pOp, a, r+recordNo) != 0){
ERR(pTrans->getNdbError());
return NDBT_FAILED; return NDBT_FAILED;
}
}
}
// Define attributes to update // Define attributes to update
for(a = 0; a<tab.getNoOfColumns(); a++){ for(a = 0; a<tab.getNoOfColumns(); a++){
...@@ -266,7 +247,7 @@ int HugoOperations::pkWritePartialRecord(Ndb* pNdb, ...@@ -266,7 +247,7 @@ int HugoOperations::pkWritePartialRecord(Ndb* pNdb,
int recordNo, int recordNo,
int numRecords){ int numRecords){
int a, check; int check;
for(int r=0; r < numRecords; r++){ for(int r=0; r < numRecords; r++){
NdbOperation* pOp = pTrans->getNdbOperation(tab.getName()); NdbOperation* pOp = pTrans->getNdbOperation(tab.getName());
if (pOp == NULL) { if (pOp == NULL) {
...@@ -281,15 +262,9 @@ int HugoOperations::pkWritePartialRecord(Ndb* pNdb, ...@@ -281,15 +262,9 @@ int HugoOperations::pkWritePartialRecord(Ndb* pNdb,
} }
// Define primary keys // Define primary keys
for(a = 0; a<tab.getNoOfColumns(); a++){ if (equalForRow(pOp, r+recordNo) != 0)
if (tab.getColumn(a)->getPrimaryKey() == true){
if(equalForAttr(pOp, a, r+recordNo) != 0){
ERR(pTrans->getNdbError());
return NDBT_FAILED; return NDBT_FAILED;
} }
}
}
}
return NDBT_OK; return NDBT_OK;
} }
...@@ -297,7 +272,7 @@ int HugoOperations::pkDeleteRecord(Ndb* pNdb, ...@@ -297,7 +272,7 @@ int HugoOperations::pkDeleteRecord(Ndb* pNdb,
int recordNo, int recordNo,
int numRecords){ int numRecords){
int a, check; int check;
for(int r=0; r < numRecords; r++){ for(int r=0; r < numRecords; r++){
NdbOperation* pOp = getOperation(pTrans, NdbOperation::DeleteRequest); NdbOperation* pOp = getOperation(pTrans, NdbOperation::DeleteRequest);
if (pOp == NULL) { if (pOp == NULL) {
...@@ -312,15 +287,9 @@ int HugoOperations::pkDeleteRecord(Ndb* pNdb, ...@@ -312,15 +287,9 @@ int HugoOperations::pkDeleteRecord(Ndb* pNdb,
} }
// Define primary keys // Define primary keys
for(a = 0; a<tab.getNoOfColumns(); a++){ if (equalForRow(pOp, r+recordNo) != 0)
if (tab.getColumn(a)->getPrimaryKey() == true){
if(equalForAttr(pOp, a, r+recordNo) != 0){
ERR(pTrans->getNdbError());
return NDBT_FAILED; return NDBT_FAILED;
} }
}
}
}
return NDBT_OK; return NDBT_OK;
} }
...@@ -518,6 +487,22 @@ HugoOperations::~HugoOperations(){ ...@@ -518,6 +487,22 @@ HugoOperations::~HugoOperations(){
} }
} }
int
HugoOperations::equalForRow(NdbOperation* pOp, int row)
{
for(int a = 0; a<tab.getNoOfColumns(); a++)
{
if (tab.getColumn(a)->getPrimaryKey() == true)
{
if(equalForAttr(pOp, a, row) != 0)
{
ERR(pOp->getNdbError());
return NDBT_FAILED;
}
}
}
return NDBT_OK;
}
int HugoOperations::equalForAttr(NdbOperation* pOp, int HugoOperations::equalForAttr(NdbOperation* pOp,
int attrId, int attrId,
...@@ -676,14 +661,8 @@ int HugoOperations::indexReadRecords(Ndb*, const char * idxName, int recordNo, ...@@ -676,14 +661,8 @@ int HugoOperations::indexReadRecords(Ndb*, const char * idxName, int recordNo,
} }
// Define primary keys // Define primary keys
for(a = 0; a<tab.getNoOfColumns(); a++){ if (equalForRow(pOp, r+recordNo) != 0)
if (tab.getColumn(a)->getPrimaryKey() == true){
if(equalForAttr(pOp, a, r+recordNo) != 0){
ERR(pTrans->getNdbError());
return NDBT_FAILED; return NDBT_FAILED;
}
}
}
// Define attributes to read // Define attributes to read
for(a = 0; a<tab.getNoOfColumns(); a++){ for(a = 0; a<tab.getNoOfColumns(); a++){
...@@ -720,14 +699,8 @@ HugoOperations::indexUpdateRecord(Ndb*, ...@@ -720,14 +699,8 @@ HugoOperations::indexUpdateRecord(Ndb*,
} }
// Define primary keys // Define primary keys
for(a = 0; a<tab.getNoOfColumns(); a++){ if (equalForRow(pOp, r+recordNo) != 0)
if (tab.getColumn(a)->getPrimaryKey() == true){
if(equalForAttr(pOp, a, r+recordNo) != 0){
ERR(pTrans->getNdbError());
return NDBT_FAILED; return NDBT_FAILED;
}
}
}
// Define attributes to update // Define attributes to update
for(a = 0; a<tab.getNoOfColumns(); a++){ for(a = 0; a<tab.getNoOfColumns(); a++){
......
...@@ -520,10 +520,9 @@ HugoTransactions::loadTable(Ndb* pNdb, ...@@ -520,10 +520,9 @@ HugoTransactions::loadTable(Ndb* pNdb,
bool oneTrans, bool oneTrans,
int value, int value,
bool abort){ bool abort){
int check, a; int check;
int retryAttempt = 0; int retryAttempt = 0;
int retryMax = 5; int retryMax = 5;
NdbOperation *pOp;
bool first_batch = true; bool first_batch = true;
const int org = batch; const int org = batch;
...@@ -667,10 +666,9 @@ HugoTransactions::loadTable(Ndb* pNdb, ...@@ -667,10 +666,9 @@ HugoTransactions::loadTable(Ndb* pNdb,
int int
HugoTransactions::fillTable(Ndb* pNdb, HugoTransactions::fillTable(Ndb* pNdb,
int batch){ int batch){
int check, a, b; int check;
int retryAttempt = 0; int retryAttempt = 0;
int retryMax = 5; int retryMax = 5;
NdbOperation *pOp;
const int org = batch; const int org = batch;
const int cols = tab.getNoOfColumns(); const int cols = tab.getNoOfColumns();
...@@ -791,7 +789,7 @@ HugoTransactions::pkReadRecords(Ndb* pNdb, ...@@ -791,7 +789,7 @@ HugoTransactions::pkReadRecords(Ndb* pNdb,
int reads = 0; int reads = 0;
int r = 0; int r = 0;
int retryAttempt = 0; int retryAttempt = 0;
int check, a; int check;
if (batch == 0) { if (batch == 0) {
g_info << "ERROR: Argument batch == 0 in pkReadRecords(). Not allowed." << endl; g_info << "ERROR: Argument batch == 0 in pkReadRecords(). Not allowed." << endl;
...@@ -910,8 +908,7 @@ HugoTransactions::pkUpdateRecords(Ndb* pNdb, ...@@ -910,8 +908,7 @@ HugoTransactions::pkUpdateRecords(Ndb* pNdb,
int updated = 0; int updated = 0;
int r = 0; int r = 0;
int retryAttempt = 0; int retryAttempt = 0;
int check, a, b; int check, b;
NdbOperation *pOp;
allocRows(batch); allocRows(batch);
...@@ -1097,15 +1094,11 @@ HugoTransactions::pkInterpretedUpdateRecords(Ndb* pNdb, ...@@ -1097,15 +1094,11 @@ HugoTransactions::pkInterpretedUpdateRecords(Ndb* pNdb,
} }
// Define primary keys // Define primary keys
for(a = 0; a<tab.getNoOfColumns(); a++){ if (equalForRow(pOp, r) != 0)
if (tab.getColumn(a)->getPrimaryKey() == true){ {
if(equalForAttr(pOp, a, r) != 0){
ERR(pTrans->getNdbError());
closeTransaction(pNdb); closeTransaction(pNdb);
return NDBT_FAILED; return NDBT_FAILED;
} }
}
}
// Read update value // Read update value
for(a = 0; a<tab.getNoOfColumns(); a++){ for(a = 0; a<tab.getNoOfColumns(); a++){
...@@ -1153,15 +1146,11 @@ HugoTransactions::pkInterpretedUpdateRecords(Ndb* pNdb, ...@@ -1153,15 +1146,11 @@ HugoTransactions::pkInterpretedUpdateRecords(Ndb* pNdb,
} }
// PKs // PKs
for(a = 0; a<tab.getNoOfColumns(); a++){ if (equalForRow(pOp, r) != 0)
if (tab.getColumn(a)->getPrimaryKey() == true){ {
if(equalForAttr(pUpdOp, a, r) != 0){
ERR(pTrans->getNdbError());
closeTransaction(pNdb); closeTransaction(pNdb);
return NDBT_FAILED; return NDBT_FAILED;
} }
}
}
// Update col // Update col
for(a = 0; a<tab.getNoOfColumns(); a++){ for(a = 0; a<tab.getNoOfColumns(); a++){
...@@ -1236,8 +1225,7 @@ HugoTransactions::pkDelRecords(Ndb* pNdb, ...@@ -1236,8 +1225,7 @@ HugoTransactions::pkDelRecords(Ndb* pNdb,
int deleted = 0; int deleted = 0;
int r = 0; int r = 0;
int retryAttempt = 0; int retryAttempt = 0;
int check, a; int check;
NdbOperation *pOp;
g_info << "|- Deleting records..." << endl; g_info << "|- Deleting records..." << endl;
while (r < records){ while (r < records){
...@@ -1335,8 +1323,7 @@ HugoTransactions::lockRecords(Ndb* pNdb, ...@@ -1335,8 +1323,7 @@ HugoTransactions::lockRecords(Ndb* pNdb,
// and lock som other records // and lock som other records
int r = 0; int r = 0;
int retryAttempt = 0; int retryAttempt = 0;
int check, a, b; int check;
NdbOperation *pOp;
NdbOperation::LockMode lm = NdbOperation::LM_Exclusive; NdbOperation::LockMode lm = NdbOperation::LM_Exclusive;
// Calculate how many records to lock in each batch // Calculate how many records to lock in each batch
...@@ -1522,15 +1509,11 @@ HugoTransactions::indexReadRecords(Ndb* pNdb, ...@@ -1522,15 +1509,11 @@ HugoTransactions::indexReadRecords(Ndb* pNdb,
} }
// Define primary keys // Define primary keys
for(a = 0; a<tab.getNoOfColumns(); a++){ if (equalForRow(pOp, r+b) != 0)
if (tab.getColumn(a)->getPrimaryKey() == true){ {
if(equalForAttr(pOp, a, r+b) != 0){
ERR(pTrans->getNdbError());
closeTransaction(pNdb); closeTransaction(pNdb);
return NDBT_FAILED; return NDBT_FAILED;
} }
}
}
// Define attributes to read // Define attributes to read
for(a = 0; a<tab.getNoOfColumns(); a++){ for(a = 0; a<tab.getNoOfColumns(); a++){
...@@ -1663,15 +1646,11 @@ HugoTransactions::indexUpdateRecords(Ndb* pNdb, ...@@ -1663,15 +1646,11 @@ HugoTransactions::indexUpdateRecords(Ndb* pNdb,
} }
// Define primary keys // Define primary keys
for(a = 0; a<tab.getNoOfColumns(); a++){ if (equalForRow(pOp, r+b) != 0)
if (tab.getColumn(a)->getPrimaryKey() == true){ {
if(equalForAttr(pOp, a, r+b) != 0){
ERR(pTrans->getNdbError());
closeTransaction(pNdb); closeTransaction(pNdb);
return NDBT_FAILED; return NDBT_FAILED;
} }
}
}
// Define attributes to read // Define attributes to read
for(a = 0; a<tab.getNoOfColumns(); a++){ for(a = 0; a<tab.getNoOfColumns(); a++){
...@@ -1733,17 +1712,14 @@ HugoTransactions::indexUpdateRecords(Ndb* pNdb, ...@@ -1733,17 +1712,14 @@ HugoTransactions::indexUpdateRecords(Ndb* pNdb,
return NDBT_FAILED; return NDBT_FAILED;
} }
if(!ordered){ if(!ordered)
for(a = 0; a<tab.getNoOfColumns(); a++){ {
if (tab.getColumn(a)->getPrimaryKey() == true){ if (equalForRow(pOp, r+b) != 0)
if(equalForAttr(pUpdOp, a, r+b) != 0){ {
ERR(pTrans->getNdbError());
closeTransaction(pNdb); closeTransaction(pNdb);
return NDBT_FAILED; return NDBT_FAILED;
} }
} }
}
}
for(a = 0; a<tab.getNoOfColumns(); a++){ for(a = 0; a<tab.getNoOfColumns(); a++){
if (tab.getColumn(a)->getPrimaryKey() == false){ if (tab.getColumn(a)->getPrimaryKey() == false){
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment