Commit e7ce0018 authored by joreland@mysql.com's avatar joreland@mysql.com

Merge joreland@bk-internal.mysql.com:/home/bk/mysql-4.1-ndb

into mysql.com:/home/jonas/src/mysql-4.1-ndb
parents f278255d 5e154e57
......@@ -755,6 +755,10 @@ NdbConnection::set_send_size(Uint32 send_size)
return;
}
#ifdef NDB_NO_DROPPED_SIGNAL
#include <stdlib.h>
#endif
inline
int
NdbConnection::checkMagicNumber()
......
......@@ -1098,6 +1098,11 @@ protected:
};
#ifdef NDB_NO_DROPPED_SIGNAL
#include <stdlib.h>
#endif
inline
int
NdbOperation::checkMagicNumber()
......
......@@ -56,6 +56,10 @@ private:
void* m_owner;
};
#ifdef NDB_NO_DROPPED_SIGNAL
#include <stdlib.h>
#endif
inline
bool
NdbReceiver::checkMagicNumber() const {
......
......@@ -560,7 +560,6 @@ public:
UintR scanLocalFragid;
UintR scanSchemaVersion;
Uint32 fragPtrI;
UintR scanSearchCondFalseCount;
UintR scanStoredProcId;
ScanState scanState;
UintR scanTcrec;
......
......@@ -2064,8 +2064,6 @@ void Dblqh::execTIME_SIGNAL(Signal* signal)
<< " scanLocalFragid="<<TscanPtr.p->scanLocalFragid
<< endl;
ndbout << " scanSchemaVersion="<<TscanPtr.p->scanSchemaVersion
<< " scanSearchCondFalseCount="<<
TscanPtr.p->scanSearchCondFalseCount
<< " scanStoredProcId="<<TscanPtr.p->scanStoredProcId
<< " scanTcrec="<<TscanPtr.p->scanTcrec
<< endl;
......@@ -7099,14 +7097,26 @@ void Dblqh::scanLockReleasedLab(Signal* signal)
sendScanFragConf(signal, ZFALSE);
} else {
jam();
/*
We came here after releasing locks after receiving SCAN_NEXTREQ from TC. We only
come here when scanHoldLock == ZTRUE
*/
continueScanNextReqLab(signal);
}//if
} else {
ndbrequire(scanptr.p->scanReleaseCounter <=
scanptr.p->scanCompletedOperations);
} else if (scanptr.p->scanReleaseCounter < scanptr.p->scanCompletedOperations) {
jam();
scanptr.p->scanReleaseCounter++;
scanReleaseLocksLab(signal);
} else {
jam();
/*
We come here when we have been scanning for a long time and not been able
to find scanConcurrentOperations records to return. We needed to release
the record we didn't want, but now we are returning all found records to
the API.
*/
scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
sendScanFragConf(signal, ZFALSE);
}//if
}//Dblqh::scanLockReleasedLab()
......@@ -8000,28 +8010,28 @@ void Dblqh::scanTupkeyRefLab(Signal* signal)
scanReleaseLocksLab(signal);
return;
}//if
Uint32 time_passed= tcConnectptr.p->tcTimer - cLqhTimeOutCount;
if (scanptr.p->scanCompletedOperations > 0) {
if (time_passed > 1) {
/* -----------------------------------------------------------------------
* WE NEED TO ENSURE THAT WE DO NOT SEARCH FOR THE NEXT TUPLE FOR A
* LONG TIME WHILE WE KEEP A LOCK ON A FOUND TUPLE. WE RATHER REPORT
* THE FOUND TUPLE IF FOUND TUPLES ARE RARE. WE SELECT 20 TUPLES
* WHICH SHOULD BE ROUGHLY 10 MS OF LOCK HOLD TIME.
* THE FOUND TUPLE IF FOUND TUPLES ARE RARE. If more than 10 ms passed we
* send the found tuples to the API.
* ----------------------------------------------------------------------- */
scanptr.p->scanSearchCondFalseCount++;
#if 0
// MASV Uncomment this feature since it forgets
// to release on operation record in DBACC
// This is the quick fix and should be changed in
// the future
if (scanptr.p->scanSearchCondFalseCount > 20) {
if (scanptr.p->scanCompletedOperations > 0) {
jam();
scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
sendScanFragConf(signal, ZFALSE);
scanptr.p->scanReleaseCounter = scanptr.p->scanCompletedOperations + 1;
scanReleaseLocksLab(signal);
return;
}//if
}//if
#endif
}
} else {
if (time_passed > 10) {
jam();
signal->theData[0]= scanptr.i;
signal->theData[1]= tcConnectptr.p->transid[0];
signal->theData[2]= tcConnectptr.p->transid[1];
execSCAN_HBREP(signal);
}
}
scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT_ABORT;
scanNextLoopLab(signal);
}//Dblqh::scanTupkeyRefLab()
......@@ -8179,7 +8189,6 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
scanptr.p->scanLockMode = scanLockMode;
scanptr.p->readCommitted = readCommitted;
scanptr.p->rangeScan = idx;
scanptr.p->scanSearchCondFalseCount = 0;
scanptr.p->scanState = ScanRecord::SCAN_FREE;
scanptr.p->scanFlag = ZFALSE;
scanptr.p->scanLocalref[0] = 0;
......@@ -8480,7 +8489,6 @@ void Dblqh::sendKeyinfo20(Signal* signal,
* ------------------------------------------------------------------------ */
void Dblqh::sendScanFragConf(Signal* signal, Uint32 scanCompleted)
{
scanptr.p->scanSearchCondFalseCount = 0;
scanptr.p->scanTcWaiting = ZFALSE;
ScanFragConf * conf = (ScanFragConf*)&signal->theData[0];
......@@ -17956,11 +17964,10 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal)
sp.p->scanAiLength,
sp.p->scanCompletedOperations,
sp.p->scanConcurrentOperations);
infoEvent(" errCnt=%d, localFid=%d, schV=%d, searcCondFalseC=%d",
infoEvent(" errCnt=%d, localFid=%d, schV=%d",
sp.p->scanErrorCounter,
sp.p->scanLocalFragid,
sp.p->scanSchemaVersion,
sp.p->scanSearchCondFalseCount);
sp.p->scanSchemaVersion);
infoEvent(" stpid=%d, flag=%d, lhold=%d, lmode=%d, num=%d",
sp.p->scanStoredProcId,
sp.p->scanFlag,
......
......@@ -108,11 +108,11 @@ NdbResultSet* NdbScanOperation::readTuples(Uint32 parallell,
break;
case NdbCursorOperation::LM_Exclusive:
parallell = (parallell == 0 ? 1 : parallell);
res = openScan(parallell, true, /*irrelevant*/true, /*irrelevant*/false);
res = openScan(parallell, true, true, false);
break;
case NdbCursorOperation::LM_Dirty:
parallell = (parallell == 0 ? 240 : parallell);
res = openScan(parallell, true, /*irrelevant*/true, /*irrelevant*/false);
res = openScan(parallell, false, false, true);
break;
default:
res = -1;
......
include .defs.mk
TYPE = ndbapitest
BIN_TARGET = testScanPerf
SOURCES = testScanPerf.cpp
include $(NDB_TOP)/Epilogue.mk
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <NDBT.hpp>
#include <NDBT_Test.hpp>
#include <HugoTransactions.hpp>
#include <UtilTransactions.hpp>
#include <random.h>
#include <getarg.h>
struct Parameter {
char * name;
unsigned value;
unsigned min;
unsigned max;
};
#define P_BATCH 0
#define P_PARRA 1
#define P_LOCK 2
#define P_FILT 3
#define P_BOUND 4
#define P_ACCESS 5
#define P_FETCH 6
#define P_ROWS 7
#define P_LOOPS 8
#define P_CREATE 9
#define P_LOAD 10
#define P_MAX 11
static
Parameter
g_paramters[] = {
{ "batch", 0, 0, 1 }, // 0, 15
{ "parallelism", 0, 0, 1 }, // 0, 1
{ "lock", 0, 0, 2 }, // read, exclusive, dirty
{ "filter", 0, 0, 3 }, // all, none, 1, 100
{ "range", 0, 0, 3 }, // all, none, 1, 100
{ "access", 0, 0, 2 }, // scan, idx, idx sorted
{ "fetch", 0, 0, 1 }, // No, yes
{ "size", 1000000, 1, ~0 },
{ "iterations", 3, 1, ~0 },
{ "create_drop", 1, 0, 1 },
{ "data", 1, 0, 1 }
};
static Ndb* g_ndb = 0;
static const NdbDictionary::Table * g_table;
static const NdbDictionary::Index * g_index;
static char g_tablename[256];
static char g_indexname[256];
int create_table();
int load_table();
int run_scan();
int clear_table();
int drop_table();
int
main(int argc, const char** argv){
int verbose = 1;
int optind = 0;
struct getargs args[1+P_MAX] = {
{ "verbose", 'v', arg_flag, &verbose, "Print verbose status", "verbose" }
};
const int num_args = 1 + P_MAX;
for(int i = 0; i<P_MAX; i++){
args[i+1].long_name = g_paramters[i].name;
args[i+1].short_name = * g_paramters[i].name;
args[i+1].type = arg_integer;
args[i+1].value = &g_paramters[i].value;
BaseString tmp;
tmp.assfmt("min: %d max: %d", g_paramters[i].min, g_paramters[i].max);
args[i+1].help = strdup(tmp.c_str());
args[i+1].arg_help = 0;
}
if(getarg(args, num_args, argc, argv, &optind)) {
arg_printusage(args, num_args, argv[0], "tabname1 tabname2 ...");
return NDBT_WRONGARGS;
}
myRandom48Init(NdbTick_CurrentMillisecond());
g_ndb = new Ndb("TEST_DB");
if(g_ndb->init() != 0){
g_err << "init() failed" << endl;
goto error;
}
if(g_ndb->waitUntilReady() != 0){
g_err << "Wait until ready failed" << endl;
goto error;
}
for(int i = optind; i<argc; i++){
const char * T = argv[i];
g_info << "Testing " << T << endl;
snprintf(g_tablename, sizeof(g_tablename), T);
snprintf(g_indexname, sizeof(g_indexname), "IDX_%s", T);
if(create_table())
goto error;
if(load_table())
goto error;
if(run_scan())
goto error;
if(clear_table())
goto error;
if(drop_table())
goto error;
}
if(g_ndb) delete g_ndb;
return NDBT_OK;
error:
if(g_ndb) delete g_ndb;
return NDBT_FAILED;
}
int
create_table(){
NdbDictionary::Dictionary* dict = g_ndb->getDictionary();
assert(dict);
if(g_paramters[P_CREATE].value){
const NdbDictionary::Table * pTab = NDBT_Tables::getTable(g_tablename);
assert(pTab);
NdbDictionary::Table copy = * pTab;
copy.setLogging(false);
if(dict->createTable(copy) != 0){
g_err << "Failed to create table: " << g_tablename << endl;
return -1;
}
NdbDictionary::Index x(g_indexname);
x.setTable(g_tablename);
x.setType(NdbDictionary::Index::OrderedIndex);
x.setLogging(false);
for (unsigned k = 0; k < copy.getNoOfColumns(); k++){
if(copy.getColumn(k)->getPrimaryKey()){
x.addColumnName(copy.getColumn(k)->getName());
}
}
if(dict->createIndex(x) != 0){
g_err << "Failed to create index: " << endl;
return -1;
}
}
g_table = dict->getTable(g_tablename);
g_index = dict->getIndex(g_indexname, g_tablename);
assert(g_table);
assert(g_index);
return 0;
}
int
drop_table(){
if(!g_paramters[P_CREATE].value)
return 0;
if(g_ndb->getDictionary()->dropTable(g_table->getName()) != 0){
g_err << "Failed to drop table: " << g_table->getName() << endl;
return -1;
}
g_table = 0;
return 0;
}
int
load_table(){
if(!g_paramters[P_LOAD].value)
return 0;
int rows = g_paramters[P_ROWS].value;
HugoTransactions hugoTrans(* g_table);
if (hugoTrans.loadTable(g_ndb, rows)){
g_err.println("Failed to load %s with %d rows", g_table->getName(), rows);
return -1;
}
return 0;
}
int
clear_table(){
if(!g_paramters[P_LOAD].value)
return 0;
int rows = g_paramters[P_ROWS].value;
UtilTransactions utilTrans(* g_table);
if (utilTrans.clearTable(g_ndb, rows) != 0){
g_err.println("Failed to clear table %s", g_table->getName());
return -1;
}
return 0;
}
inline
void err(NdbError e){
ndbout << e << endl;
}
int
run_scan(){
int iter = g_paramters[P_LOOPS].value;
Uint64 start1;
Uint64 sum1 = 0;
Uint32 tot = g_paramters[P_ROWS].value;
for(int i = 0; i<iter; i++){
start1 = NdbTick_CurrentMillisecond();
NdbConnection * pTrans = g_ndb->startTransaction();
if(!pTrans){
g_err << "Failed to start transaction" << endl;
err(g_ndb->getNdbError());
return -1;
}
NdbScanOperation * pOp;
#ifdef NdbIndexScanOperation_H
NdbIndexScanOperation * pIOp;
#else
NdbScanOperation * pIOp;
#endif
NdbResultSet * rs;
int par = g_paramters[P_PARRA].value;
int bat = g_paramters[P_BATCH].value;
NdbScanOperation::LockMode lm;
switch(g_paramters[P_LOCK].value){
case 0:
lm = NdbScanOperation::LM_Read;
break;
case 1:
lm = NdbScanOperation::LM_Exclusive;
break;
case 2:
lm = NdbScanOperation::LM_CommittedRead;
break;
default:
abort();
}
if(g_paramters[P_ACCESS].value == 0){
pOp = pTrans->getNdbScanOperation(g_tablename);
assert(pOp);
#ifdef NdbIndexScanOperation_H
rs = pOp->readTuples(lm, bat, par);
#else
int oldp = (par == 0 ? 240 : par) * (bat == 0 ? 15 : bat);
rs = pOp->readTuples(oldp > 240 ? 240 : oldp, lm);
#endif
} else {
#ifdef NdbIndexScanOperation_H
pOp = pIOp = pTrans->getNdbIndexScanOperation(g_indexname, g_tablename);
bool ord = g_paramters[P_ACCESS].value == 2;
rs = pIOp->readTuples(lm, bat, par, ord);
#else
pOp = pIOp = pTrans->getNdbScanOperation(g_indexname, g_tablename);
assert(pOp);
int oldp = (par == 0 ? 240 : par) * (bat == 0 ? 15 : bat);
rs = pIOp->readTuples(oldp > 240 ? 240 : oldp, lm);
#endif
switch(g_paramters[P_BOUND].value){
case 0: // All
break;
case 1: // None
#ifdef NdbIndexScanOperation_H
pIOp->setBound((Uint32)0, NdbIndexScanOperation::BoundEQ, 0);
#else
pIOp->setBound((Uint32)0, NdbOperation::BoundEQ, 0);
#endif
break;
case 2: { // 1 row
default:
assert(g_table->getNoOfPrimaryKeys() == 1); // only impl. so far
abort();
#if 0
int tot = g_paramters[P_ROWS].value;
int row = rand() % tot;
fix_eq_bound(pIOp, row);
#endif
break;
}
}
}
assert(pOp);
assert(rs);
int check = 0;
switch(g_paramters[P_FILT].value){
case 0: // All
check = pOp->interpret_exit_ok();
break;
case 1: // None
check = pOp->interpret_exit_nok();
break;
case 2: { // 1 row
default:
assert(g_table->getNoOfPrimaryKeys() == 1); // only impl. so far
abort();
#if 0
int tot = g_paramters[P_ROWS].value;
int row = rand() % tot;
NdbScanFilter filter(pOp) ;
filter.begin(NdbScanFilter::AND);
fix_eq(filter, pOp, row);
filter.end();
break;
#endif
}
}
if(check != 0){
err(pOp->getNdbError());
return -1;
}
assert(check == 0);
for(int i = 0; i<g_table->getNoOfColumns(); i++){
pOp->getValue(i);
}
int rows = 0;
check = pTrans->execute(NoCommit);
assert(check == 0);
int fetch = g_paramters[P_FETCH].value;
while((check = rs->nextResult(true)) == 0){
do {
rows++;
} while(!fetch && ((check = rs->nextResult(false)) == 0));
if(check == -1){
err(pTrans->getNdbError());
return -1;
}
assert(check == 2);
}
if(check == -1){
err(pTrans->getNdbError());
return -1;
}
assert(check == 1);
g_info << "Found " << rows << " rows" << endl;
pTrans->close();
Uint64 stop = NdbTick_CurrentMillisecond();
start1 = (stop - start1);
sum1 += start1;
}
sum1 /= iter;
g_err.println("Avg time: %Ldms = %d rows/sec", sum1, (1000*tot)/sum1);
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment