Commit 37f0ff9b authored by pekka@mysql.com's avatar pekka@mysql.com

ndb: fix blob performance in long transactions

parent e1dc7b06
...@@ -607,8 +607,8 @@ private: ...@@ -607,8 +607,8 @@ private:
NdbOperation* theLastExecOpInList; // Last executing operation in list. NdbOperation* theLastExecOpInList; // Last executing operation in list.
NdbOperation* theCompletedFirstOp; // First operation in completed NdbOperation* theCompletedFirstOp; // First & last operation in completed
// operation list. NdbOperation* theCompletedLastOp; // operation list.
Uint32 theNoOfOpSent; // How many operations have been sent Uint32 theNoOfOpSent; // How many operations have been sent
Uint32 theNoOfOpCompleted; // How many operations have completed Uint32 theNoOfOpCompleted; // How many operations have completed
......
...@@ -55,6 +55,7 @@ NdbConnection::NdbConnection( Ndb* aNdb ) : ...@@ -55,6 +55,7 @@ NdbConnection::NdbConnection( Ndb* aNdb ) :
theFirstExecOpInList(NULL), theFirstExecOpInList(NULL),
theLastExecOpInList(NULL), theLastExecOpInList(NULL),
theCompletedFirstOp(NULL), theCompletedFirstOp(NULL),
theCompletedLastOp(NULL),
theNoOfOpSent(0), theNoOfOpSent(0),
theNoOfOpCompleted(0), theNoOfOpCompleted(0),
theNoOfOpFetched(0), theNoOfOpFetched(0),
...@@ -124,6 +125,7 @@ NdbConnection::init() ...@@ -124,6 +125,7 @@ NdbConnection::init()
theLastExecOpInList = NULL; theLastExecOpInList = NULL;
theCompletedFirstOp = NULL; theCompletedFirstOp = NULL;
theCompletedLastOp = NULL;
theGlobalCheckpointId = 0; theGlobalCheckpointId = 0;
theCommitStatus = Started; theCommitStatus = Started;
...@@ -256,6 +258,8 @@ NdbConnection::handleExecuteCompletion() ...@@ -256,6 +258,8 @@ NdbConnection::handleExecuteCompletion()
if (tLastExecOp != NULL) { if (tLastExecOp != NULL) {
tLastExecOp->next(theCompletedFirstOp); tLastExecOp->next(theCompletedFirstOp);
theCompletedFirstOp = tFirstExecOp; theCompletedFirstOp = tFirstExecOp;
if (theCompletedLastOp == NULL)
theCompletedLastOp = tLastExecOp;
theFirstExecOpInList = NULL; theFirstExecOpInList = NULL;
theLastExecOpInList = NULL; theLastExecOpInList = NULL;
}//if }//if
...@@ -292,6 +296,8 @@ NdbConnection::execute(ExecType aTypeOfExec, ...@@ -292,6 +296,8 @@ NdbConnection::execute(ExecType aTypeOfExec,
ExecType tExecType; ExecType tExecType;
NdbOperation* tPrepOp; NdbOperation* tPrepOp;
NdbOperation* tCompletedFirstOp = NULL;
NdbOperation* tCompletedLastOp = NULL;
int ret = 0; int ret = 0;
do { do {
...@@ -314,6 +320,7 @@ NdbConnection::execute(ExecType aTypeOfExec, ...@@ -314,6 +320,7 @@ NdbConnection::execute(ExecType aTypeOfExec,
} }
tPrepOp = tPrepOp->next(); tPrepOp = tPrepOp->next();
} }
// save rest of prepared ops if batch // save rest of prepared ops if batch
NdbOperation* tRestOp= 0; NdbOperation* tRestOp= 0;
NdbOperation* tLastOp= 0; NdbOperation* tLastOp= 0;
...@@ -323,6 +330,7 @@ NdbConnection::execute(ExecType aTypeOfExec, ...@@ -323,6 +330,7 @@ NdbConnection::execute(ExecType aTypeOfExec,
tLastOp = theLastOpInList; tLastOp = theLastOpInList;
theLastOpInList = tPrepOp; theLastOpInList = tPrepOp;
} }
if (tExecType == Commit) { if (tExecType == Commit) {
NdbOperation* tOp = theCompletedFirstOp; NdbOperation* tOp = theCompletedFirstOp;
while (tOp != NULL) { while (tOp != NULL) {
...@@ -338,6 +346,19 @@ NdbConnection::execute(ExecType aTypeOfExec, ...@@ -338,6 +346,19 @@ NdbConnection::execute(ExecType aTypeOfExec,
} }
} }
// completed ops are in unspecified order
if (theCompletedFirstOp != NULL) {
if (tCompletedFirstOp == NULL) {
tCompletedFirstOp = theCompletedFirstOp;
tCompletedLastOp = theCompletedLastOp;
} else {
tCompletedLastOp->next(theCompletedFirstOp);
tCompletedLastOp = theCompletedLastOp;
}
theCompletedFirstOp = NULL;
theCompletedLastOp = NULL;
}
if (executeNoBlobs(tExecType, abortOption, forceSend) == -1) if (executeNoBlobs(tExecType, abortOption, forceSend) == -1)
ret = -1; ret = -1;
#ifndef VM_TRACE #ifndef VM_TRACE
...@@ -362,6 +383,7 @@ NdbConnection::execute(ExecType aTypeOfExec, ...@@ -362,6 +383,7 @@ NdbConnection::execute(ExecType aTypeOfExec,
tOp = tOp->next(); tOp = tOp->next();
} }
} }
// add saved prepared ops if batch // add saved prepared ops if batch
if (tPrepOp != NULL && tRestOp != NULL) { if (tPrepOp != NULL && tRestOp != NULL) {
if (theFirstOpInList == NULL) if (theFirstOpInList == NULL)
...@@ -373,6 +395,18 @@ NdbConnection::execute(ExecType aTypeOfExec, ...@@ -373,6 +395,18 @@ NdbConnection::execute(ExecType aTypeOfExec,
assert(theFirstOpInList == NULL || tExecType == NoCommit); assert(theFirstOpInList == NULL || tExecType == NoCommit);
} while (theFirstOpInList != NULL || tExecType != aTypeOfExec); } while (theFirstOpInList != NULL || tExecType != aTypeOfExec);
if (tCompletedFirstOp != NULL) {
tCompletedLastOp->next(theCompletedFirstOp);
theCompletedFirstOp = tCompletedFirstOp;
if (theCompletedLastOp == NULL)
theCompletedLastOp = tCompletedLastOp;
}
#if ndb_api_count_completed_ops_after_blob_execute
{ NdbOperation* tOp; unsigned n = 0;
for (tOp = theCompletedFirstOp; tOp != NULL; tOp = tOp->next()) n++;
ndbout << "completed ops: " << n << endl;
}
#endif
DBUG_RETURN(ret); DBUG_RETURN(ret);
} }
...@@ -894,6 +928,7 @@ NdbConnection::releaseOperations() ...@@ -894,6 +928,7 @@ NdbConnection::releaseOperations()
releaseOps(theFirstExecOpInList); releaseOps(theFirstExecOpInList);
theCompletedFirstOp = NULL; theCompletedFirstOp = NULL;
theCompletedLastOp = NULL;
theFirstOpInList = NULL; theFirstOpInList = NULL;
theFirstExecOpInList = NULL; theFirstExecOpInList = NULL;
theLastOpInList = NULL; theLastOpInList = NULL;
...@@ -909,6 +944,7 @@ NdbConnection::releaseCompletedOperations() ...@@ -909,6 +944,7 @@ NdbConnection::releaseCompletedOperations()
{ {
releaseOps(theCompletedFirstOp); releaseOps(theCompletedFirstOp);
theCompletedFirstOp = NULL; theCompletedFirstOp = NULL;
theCompletedLastOp = NULL;
}//NdbConnection::releaseOperations() }//NdbConnection::releaseOperations()
/****************************************************************************** /******************************************************************************
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include <NdbMain.h> #include <NdbMain.h>
#include <NdbOut.hpp> #include <NdbOut.hpp>
#include <NdbTest.hpp> #include <NdbTest.hpp>
#include <NdbTick.h>
struct Bcol { struct Bcol {
bool m_nullable; bool m_nullable;
...@@ -59,6 +60,9 @@ struct Opt { ...@@ -59,6 +60,9 @@ struct Opt {
bool m_oneblob; bool m_oneblob;
Bcol m_blob1; Bcol m_blob1;
Bcol m_blob2; Bcol m_blob2;
// perf
const char* m_tnameperf;
unsigned m_rowsperf;
// bugs // bugs
int m_bug; int m_bug;
int (*m_bugtest)(); int (*m_bugtest)();
...@@ -84,6 +88,9 @@ struct Opt { ...@@ -84,6 +88,9 @@ struct Opt {
m_oneblob(false), m_oneblob(false),
m_blob1(false, 7, 1137, 10), m_blob1(false, 7, 1137, 10),
m_blob2(true, 99, 55, 1), m_blob2(true, 99, 55, 1),
// perf
m_tnameperf("TBLOB2"),
m_rowsperf(10000),
// bugs // bugs
m_bug(0), m_bug(0),
m_bugtest(0) { m_bugtest(0) {
...@@ -107,6 +114,7 @@ printusage() ...@@ -107,6 +114,7 @@ printusage()
<< " -loop N loop N times 0=forever [" << d.m_loop << "]" << endl << " -loop N loop N times 0=forever [" << d.m_loop << "]" << endl
<< " -parts N max parts in blob value [" << d.m_parts << "]" << endl << " -parts N max parts in blob value [" << d.m_parts << "]" << endl
<< " -rows N number of rows [" << d.m_rows << "]" << endl << " -rows N number of rows [" << d.m_rows << "]" << endl
<< " -rowsperf N rows for performace test [" << d.m_rowsperf << "]" << endl
<< " -seed N random seed 0=loop number [" << d.m_seed << "]" << endl << " -seed N random seed 0=loop number [" << d.m_seed << "]" << endl
<< " -skip xxx skip given tests (see list) [no tests]" << endl << " -skip xxx skip given tests (see list) [no tests]" << endl
<< " -test xxx only given tests (see list) [all tests]" << endl << " -test xxx only given tests (see list) [all tests]" << endl
...@@ -118,6 +126,7 @@ printusage() ...@@ -118,6 +126,7 @@ printusage()
<< " i hash index ops" << endl << " i hash index ops" << endl
<< " s table scans" << endl << " s table scans" << endl
<< " r ordered index scans" << endl << " r ordered index scans" << endl
<< " p performance test" << endl
<< "additional flags for test/skip" << endl << "additional flags for test/skip" << endl
<< " u update existing blob value" << endl << " u update existing blob value" << endl
<< " n normal insert and update" << endl << " n normal insert and update" << endl
...@@ -1381,6 +1390,292 @@ testmain() ...@@ -1381,6 +1390,292 @@ testmain()
return 0; return 0;
} }
// separate performance test
struct Tmr { // stolen from testOIBasic
Tmr() {
clr();
}
void clr() {
m_on = m_ms = m_cnt = m_time[0] = m_text[0] = 0;
}
void on() {
assert(m_on == 0);
m_on = NdbTick_CurrentMillisecond();
}
void off(unsigned cnt = 0) {
NDB_TICKS off = NdbTick_CurrentMillisecond();
assert(m_on != 0 && off >= m_on);
m_ms += off - m_on;
m_cnt += cnt;
m_on = 0;
}
const char* time() {
if (m_cnt == 0)
sprintf(m_time, "%u ms", m_ms);
else
sprintf(m_time, "%u ms per %u ( %u ms per 1000 )", m_ms, m_cnt, (1000 * m_ms) / m_cnt);
return m_time;
}
const char* pct (const Tmr& t1) {
if (0 < t1.m_ms)
sprintf(m_text, "%u pct", (100 * m_ms) / t1.m_ms);
else
sprintf(m_text, "[cannot measure]");
return m_text;
}
const char* over(const Tmr& t1) {
if (0 < t1.m_ms) {
if (t1.m_ms <= m_ms)
sprintf(m_text, "%u pct", (100 * (m_ms - t1.m_ms)) / t1.m_ms);
else
sprintf(m_text, "-%u pct", (100 * (t1.m_ms - m_ms)) / t1.m_ms);
} else
sprintf(m_text, "[cannot measure]");
return m_text;
}
NDB_TICKS m_on;
unsigned m_ms;
unsigned m_cnt;
char m_time[100];
char m_text[100];
};
static int
testperf()
{
if (! testcase('p'))
return 0;
DBG("=== perf test ===");
g_ndb = new Ndb("TEST_DB");
CHK(g_ndb->init() == 0);
CHK(g_ndb->waitUntilReady() == 0);
g_dic = g_ndb->getDictionary();
NdbDictionary::Table tab(g_opt.m_tnameperf);
if (g_dic->getTable(tab.getName()) != 0)
CHK(g_dic->dropTable(tab) == 0);
// col A - pk
{ NdbDictionary::Column col("A");
col.setType(NdbDictionary::Column::Unsigned);
col.setPrimaryKey(true);
tab.addColumn(col);
}
// col B - char 20
{ NdbDictionary::Column col("B");
col.setType(NdbDictionary::Column::Char);
col.setLength(20);
col.setNullable(true);
tab.addColumn(col);
}
// col C - text
{ NdbDictionary::Column col("C");
col.setType(NdbDictionary::Column::Text);
col.setInlineSize(20);
col.setPartSize(512);
col.setStripeSize(1);
col.setNullable(true);
tab.addColumn(col);
}
// create
CHK(g_dic->createTable(tab) == 0);
Uint32 cA = 0, cB = 1, cC = 2;
// timers
Tmr t1;
Tmr t2;
// insert char (one trans)
{
DBG("--- insert char ---");
t1.on();
CHK((g_con = g_ndb->startTransaction()) != 0);
for (Uint32 k = 0; k < g_opt.m_rowsperf; k++) {
CHK((g_opr = g_con->getNdbOperation(tab.getName())) != 0);
CHK(g_opr->insertTuple() == 0);
CHK(g_opr->equal(cA, (char*)&k) == 0);
CHK(g_opr->setValue(cB, "b") == 0);
CHK(g_con->execute(NoCommit) == 0);
}
t1.off(g_opt.m_rowsperf);
CHK(g_con->execute(Rollback) == 0);
DBG(t1.time());
g_opr = 0;
g_con = 0;
}
// insert text (one trans)
{
DBG("--- insert text ---");
t2.on();
CHK((g_con = g_ndb->startTransaction()) != 0);
for (Uint32 k = 0; k < g_opt.m_rowsperf; k++) {
CHK((g_opr = g_con->getNdbOperation(tab.getName())) != 0);
CHK(g_opr->insertTuple() == 0);
CHK(g_opr->equal(cA, (char*)&k) == 0);
CHK((g_bh1 = g_opr->getBlobHandle(cC)) != 0);
CHK((g_bh1->setValue("c", 1) == 0));
CHK(g_con->execute(NoCommit) == 0);
}
t2.off(g_opt.m_rowsperf);
CHK(g_con->execute(Rollback) == 0);
DBG(t2.time());
g_bh1 = 0;
g_opr = 0;
g_con = 0;
}
// insert overhead
DBG("insert overhead: " << t2.over(t1));
t1.clr();
t2.clr();
// insert
{
DBG("--- insert for read test ---");
unsigned n = 0;
CHK((g_con = g_ndb->startTransaction()) != 0);
for (Uint32 k = 0; k < g_opt.m_rowsperf; k++) {
CHK((g_opr = g_con->getNdbOperation(tab.getName())) != 0);
CHK(g_opr->insertTuple() == 0);
CHK(g_opr->equal(cA, (char*)&k) == 0);
CHK(g_opr->setValue(cB, "b") == 0);
CHK((g_bh1 = g_opr->getBlobHandle(cC)) != 0);
CHK((g_bh1->setValue("c", 1) == 0));
if (++n == g_opt.m_batch) {
CHK(g_con->execute(Commit) == 0);
g_ndb->closeTransaction(g_con);
CHK((g_con = g_ndb->startTransaction()) != 0);
n = 0;
}
}
if (n != 0) {
CHK(g_con->execute(Commit) == 0);
n = 0;
}
g_bh1 = 0;
g_opr = 0;
g_con = 0;
}
// pk read char (one trans)
{
DBG("--- pk read char ---");
CHK((g_con = g_ndb->startTransaction()) != 0);
Uint32 a;
char b[20];
t1.on();
for (Uint32 k = 0; k < g_opt.m_rowsperf; k++) {
CHK((g_opr = g_con->getNdbOperation(tab.getName())) != 0);
CHK(g_opr->readTuple() == 0);
CHK(g_opr->equal(cA, (char*)&k) == 0);
CHK(g_opr->getValue(cA, (char*)&a) != 0);
CHK(g_opr->getValue(cB, b) != 0);
a = (Uint32)-1;
b[0] = 0;
CHK(g_con->execute(NoCommit) == 0);
CHK(a == k && strcmp(b, "b") == 0);
}
CHK(g_con->execute(Commit) == 0);
t1.off(g_opt.m_rowsperf);
DBG(t1.time());
g_opr = 0;
g_con = 0;
}
// pk read text (one trans)
{
DBG("--- pk read text ---");
CHK((g_con = g_ndb->startTransaction()) != 0);
Uint32 a;
char c[20];
t2.on();
for (Uint32 k = 0; k < g_opt.m_rowsperf; k++) {
CHK((g_opr = g_con->getNdbOperation(tab.getName())) != 0);
CHK(g_opr->readTuple() == 0);
CHK(g_opr->equal(cA, (char*)&k) == 0);
CHK(g_opr->getValue(cA, (char*)&a) != 0);
CHK((g_bh1 = g_opr->getBlobHandle(cC)) != 0);
a = (Uint32)-1;
c[0] = 0;
CHK(g_con->execute(NoCommit) == 0);
Uint32 m = 20;
CHK(g_bh1->readData(c, m) == 0);
CHK(a == k && m == 1 && strcmp(c, "c") == 0);
}
CHK(g_con->execute(Commit) == 0);
t2.off(g_opt.m_rowsperf);
DBG(t2.time());
g_opr = 0;
g_con = 0;
}
// pk read overhead
DBG("pk read overhead: " << t2.over(t1));
t1.clr();
t2.clr();
// scan read char
{
DBG("--- scan read char ---");
NdbResultSet* rs;
Uint32 a;
char b[20];
CHK((g_con = g_ndb->startTransaction()) != 0);
CHK((g_ops = g_con->getNdbScanOperation(tab.getName())) != 0);
CHK((rs = g_ops->readTuples(NdbScanOperation::LM_Read)) != 0);
CHK(g_ops->getValue(cA, (char*)&a) != 0);
CHK(g_ops->getValue(cB, b) != 0);
CHK(g_con->execute(NoCommit) == 0);
unsigned n = 0;
t1.on();
while (1) {
a = (Uint32)-1;
b[0] = 0;
int ret;
CHK((ret = rs->nextResult(true)) == 0 || ret == 1);
if (ret == 1)
break;
CHK(a < g_opt.m_rowsperf && strcmp(b, "b") == 0);
n++;
}
CHK(n == g_opt.m_rowsperf);
t1.off(g_opt.m_rowsperf);
DBG(t1.time());
g_ops = 0;
g_con = 0;
}
// scan read text
{
DBG("--- read text ---");
NdbResultSet* rs;
Uint32 a;
char c[20];
CHK((g_con = g_ndb->startTransaction()) != 0);
CHK((g_ops = g_con->getNdbScanOperation(tab.getName())) != 0);
CHK((rs = g_ops->readTuples(NdbScanOperation::LM_Read)) != 0);
CHK(g_ops->getValue(cA, (char*)&a) != 0);
CHK((g_bh1 = g_ops->getBlobHandle(cC)) != 0);
CHK(g_con->execute(NoCommit) == 0);
unsigned n = 0;
t2.on();
while (1) {
a = (Uint32)-1;
c[0] = 0;
int ret;
CHK((ret = rs->nextResult(true)) == 0 || ret == 1);
if (ret == 1)
break;
Uint32 m = 20;
CHK(g_bh1->readData(c, m) == 0);
CHK(a < g_opt.m_rowsperf && m == 1 && strcmp(c, "c") == 0);
n++;
}
CHK(n == g_opt.m_rowsperf);
t2.off(g_opt.m_rowsperf);
DBG(t2.time());
g_bh1 = 0;
g_ops = 0;
g_con = 0;
}
// scan read overhead
DBG("scan read overhead: " << t2.over(t1));
t1.clr();
t2.clr();
delete g_ndb;
return 0;
}
// bug tests // bug tests
static int static int
...@@ -1498,6 +1793,12 @@ NDB_COMMAND(testOdbcDriver, "testBlobs", "testBlobs", "testBlobs", 65535) ...@@ -1498,6 +1793,12 @@ NDB_COMMAND(testOdbcDriver, "testBlobs", "testBlobs", "testBlobs", 65535)
continue; continue;
} }
} }
if (strcmp(arg, "-rowsperf") == 0) {
if (++argv, --argc > 0) {
g_opt.m_rowsperf = atoi(argv[0]);
continue;
}
}
if (strcmp(arg, "-seed") == 0) { if (strcmp(arg, "-seed") == 0) {
if (++argv, --argc > 0) { if (++argv, --argc > 0) {
g_opt.m_seed = atoi(argv[0]); g_opt.m_seed = atoi(argv[0]);
...@@ -1558,7 +1859,7 @@ NDB_COMMAND(testOdbcDriver, "testBlobs", "testBlobs", "testBlobs", 65535) ...@@ -1558,7 +1859,7 @@ NDB_COMMAND(testOdbcDriver, "testBlobs", "testBlobs", "testBlobs", 65535)
strcat(b, "r"); strcat(b, "r");
g_opt.m_skip = strdup(b); g_opt.m_skip = strdup(b);
} }
if (testmain() == -1) { if (testmain() == -1 || testperf() == -1) {
ndbout << "line " << __LINE__ << " FAIL loop=" << g_loop << endl; ndbout << "line " << __LINE__ << " FAIL loop=" << g_loop << endl;
return NDBT_ProgramExit(NDBT_FAILED); return NDBT_ProgramExit(NDBT_FAILED);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment