Commit 883247ea authored by pekka@mysql.com's avatar pekka@mysql.com

testOIBasic.cpp:

  make it fail in more useful ways
parent de8e5c06
......@@ -33,13 +33,16 @@
struct Opt {
// common options
unsigned m_batch;
const char* m_case;
bool m_core;
bool m_dups;
NdbDictionary::Object::FragmentType m_fragtype;
unsigned m_idxloop;
const char* m_index;
unsigned m_loop;
bool m_nologging;
bool m_msglock;
unsigned m_rows;
unsigned m_samples;
unsigned m_scanrd;
......@@ -50,18 +53,21 @@ struct Opt {
unsigned m_threads;
unsigned m_v;
Opt() :
m_batch(32),
m_case(0),
m_core(false),
m_dups(false),
m_fragtype(NdbDictionary::Object::FragUndefined),
m_idxloop(4),
m_index(0),
m_loop(1),
m_nologging(false),
m_msglock(true),
m_rows(1000),
m_samples(0),
m_scanrd(240),
m_scanex(240),
m_seed(1),
m_seed(0),
m_subloop(4),
m_table(0),
m_threads(4),
......@@ -80,6 +86,7 @@ printhelp()
Opt d;
ndbout
<< "usage: testOIbasic [options]" << endl
<< " -batch N pk operations in batch [" << d.m_batch << "]" << endl
<< " -case abc only given test cases (letters a-z)" << endl
<< " -core core dump on error [" << d.m_core << "]" << endl
<< " -dups allow duplicate tuples from index scan [" << d.m_dups << "]" << endl
......@@ -91,7 +98,7 @@ printhelp()
<< " -samples N samples for some timings (0=all) [" << d.m_samples << "]" << endl
<< " -scanrd N scan read parallelism [" << d.m_scanrd << "]" << endl
<< " -scanex N scan exclusive parallelism [" << d.m_scanex << "]" << endl
<< " -seed N srandom seed [" << d.m_seed << "]" << endl
<< " -seed N srandom seed 0=loop number[" << d.m_seed << "]" << endl
<< " -subloop N subtest loop count [" << d.m_subloop << "]" << endl
<< " -table xyz only given table numbers (digits 1-9)" << endl
<< " -threads N number of threads [" << d.m_threads << "]" << endl
......@@ -133,9 +140,9 @@ getthrstr()
#define LLN(n, s) \
do { \
if ((n) > g_opt.m_v) break; \
NdbMutex_Lock(&ndbout_mutex); \
if (g_opt.m_msglock) NdbMutex_Lock(&ndbout_mutex); \
ndbout << getthrstr() << s << endl; \
NdbMutex_Unlock(&ndbout_mutex); \
if (g_opt.m_msglock) NdbMutex_Unlock(&ndbout_mutex); \
} while(0)
#define LL0(s) LLN(0, s)
......@@ -148,11 +155,10 @@ getthrstr()
// following check a condition and return -1 on failure
#undef CHK // simple check
#undef CHKTRY // execute action (try-catch) on failure
#undef CHKMSG // print extra message on failure
#undef CHKTRY // check with action on fail
#undef CHKCON // print NDB API errors on failure
#define CHK(x) CHKTRY(x, ;)
#define CHK(x) CHKTRY(x, ;)
#define CHKTRY(x, act) \
do { \
......@@ -163,14 +169,6 @@ getthrstr()
return -1; \
} while (0)
#define CHKMSG(x, msg) \
do { \
if (x) break; \
LL0("line " << __LINE__ << ": " << #x << " failed: " << msg); \
if (g_opt.m_core) abort(); \
return -1; \
} while (0)
#define CHKCON(x, con) \
do { \
if (x) break; \
......@@ -199,13 +197,14 @@ struct Par : public Opt {
Tmr* m_tmr;
Tmr& tmr() const { assert(m_tmr != 0); return *m_tmr; }
unsigned m_totrows;
unsigned m_batch;
// value calculation
unsigned m_pctnull;
unsigned m_range;
unsigned m_pctrange;
// do verify after read
bool m_verify;
// deadlock possible
bool m_deadlock;
// timer location
Par(const Opt& opt) :
Opt(opt),
......@@ -215,11 +214,11 @@ struct Par : public Opt {
m_set(0),
m_tmr(0),
m_totrows(m_threads * m_rows),
m_batch(32),
m_pctnull(10),
m_range(m_rows),
m_pctrange(0),
m_verify(false) {
m_verify(false),
m_deadlock(false) {
}
};
......@@ -313,13 +312,51 @@ const char*
Tmr::over(const Tmr& t1)
{
if (0 < t1.m_ms) {
sprintf(m_text, "%d pct", (100 * (m_ms - t1.m_ms)) / t1.m_ms);
if (t1.m_ms <= m_ms)
sprintf(m_text, "%u pct", (100 * (m_ms - t1.m_ms)) / t1.m_ms);
else
sprintf(m_text, "-%u pct", (100 * (t1.m_ms - m_ms)) / t1.m_ms);
} else {
sprintf(m_text, "[cannot measure]");
}
return m_text;
}
// list of ints
struct Lst {
Lst();
unsigned m_arr[1000];
unsigned m_cnt;
void push(unsigned i);
unsigned cnt() const;
void reset();
};
Lst::Lst() :
m_cnt(0)
{
}
void
Lst::push(unsigned i)
{
assert(m_cnt < sizeof(m_arr)/sizeof(m_arr[0]));
m_arr[m_cnt++] = i;
}
unsigned
Lst::cnt() const
{
return m_cnt;
}
void
Lst::reset()
{
m_cnt = 0;
}
// tables and indexes
// Col - table column
......@@ -624,15 +661,14 @@ struct Con {
Con() :
m_ndb(0), m_dic(0), m_tx(0), m_op(0),
m_scanop(0), m_indexscanop(0), m_resultset(0), m_scanmode(ScanNo), m_errtype(ErrNone) {}
~Con(){
if(m_tx) closeTransaction();
~Con() {
if (m_tx != 0)
closeTransaction();
}
int connect();
void connect(const Con& con);
void disconnect();
int startTransaction();
int startBuddyTransaction(const Con& con);
int getNdbOperation(const Tab& tab);
int getNdbScanOperation(const Tab& tab);
int getNdbScanOperation(const ITab& itab, const Tab& tab);
......@@ -641,20 +677,16 @@ struct Con {
int setValue(int num, const char* addr);
int setBound(int num, int type, const void* value);
int execute(ExecType t);
int execute(ExecType t, bool& deadlock);
int openScanRead(unsigned parallelism);
int openScanExclusive(unsigned parallelism);
int executeScan();
int nextScanResult();
int takeOverForUpdate(Con& scan);
int takeOverForDelete(Con& scan);
int nextScanResult(bool fetchAllowed);
int nextScanResult(bool fetchAllowed, bool& deadlock);
int updateScanTuple(Con& con2);
int deleteScanTuple(Con& con2);
void closeTransaction();
void printerror(NdbOut& out);
// flush dict cache
int bugger() {
//disconnect();
//CHK(connect() == 0);
return 0;
}
};
int
......@@ -664,11 +696,17 @@ Con::connect()
m_ndb = new Ndb("TEST_DB");
CHKCON(m_ndb->init() == 0, *this);
CHKCON(m_ndb->waitUntilReady(30) == 0, *this);
m_dic = m_ndb->getDictionary();
m_tx = 0, m_op = 0;
return 0;
}
void
Con::connect(const Con& con)
{
assert(m_ndb == 0);
m_ndb = con.m_ndb;
}
void
Con::disconnect()
{
......@@ -680,19 +718,12 @@ int
Con::startTransaction()
{
assert(m_ndb != 0);
if(m_tx) closeTransaction();
if (m_tx != 0)
closeTransaction();
CHKCON((m_tx = m_ndb->startTransaction()) != 0, *this);
return 0;
}
int
Con::startBuddyTransaction(const Con& con)
{
assert(m_ndb != 0 && m_tx == 0 && con.m_ndb == m_ndb && con.m_tx != 0);
CHKCON((m_tx = m_ndb->hupp(con.m_tx)) != 0, *this);
return 0;
}
int
Con::getNdbOperation(const Tab& tab)
{
......@@ -757,6 +788,22 @@ Con::execute(ExecType t)
return 0;
}
int
Con::execute(ExecType t, bool& deadlock)
{
int ret = execute(t);
if (ret != 0) {
if (deadlock && m_errtype == ErrDeadlock) {
LL3("caught deadlock");
ret = 0;
}
} else {
deadlock = false;
}
CHK(ret == 0);
return 0;
}
int
Con::openScanRead(unsigned parallelism)
{
......@@ -781,28 +828,44 @@ Con::executeScan()
}
int
Con::nextScanResult()
Con::nextScanResult(bool fetchAllowed)
{
int ret;
assert(m_resultset != 0);
CHKCON((ret = m_resultset->nextResult()) != -1, *this);
assert(ret == 0 || ret == 1);
CHKCON((ret = m_resultset->nextResult(fetchAllowed)) != -1, *this);
assert(ret == 0 || ret == 1 || (! fetchAllowed && ret == 2));
return ret;
}
int
Con::takeOverForUpdate(Con& scan)
Con::nextScanResult(bool fetchAllowed, bool& deadlock)
{
assert(m_tx != 0 && scan.m_op != 0);
CHKCON((m_op = scan.m_resultset->updateTuple(m_tx)) != 0, scan);
int ret = nextScanResult(fetchAllowed);
if (ret == -1) {
if (deadlock && m_errtype == ErrDeadlock) {
LL3("caught deadlock");
ret = 0;
}
} else {
deadlock = false;
}
CHK(ret == 0 || ret == 1 || (! fetchAllowed && ret == 2));
return ret;
}
int
Con::updateScanTuple(Con& con2)
{
assert(con2.m_tx != 0);
CHKCON((con2.m_op = m_resultset->updateTuple(con2.m_tx)) != 0, *this);
return 0;
}
int
Con::takeOverForDelete(Con& scan)
Con::deleteScanTuple(Con& con2)
{
assert(m_tx != 0 && scan.m_op != 0);
CHKCON(scan.m_resultset->deleteTuple(m_tx) == 0, scan);
assert(con2.m_tx != 0);
CHKCON(m_resultset->deleteTuple(con2.m_tx) == 0, *this);
return 0;
}
......@@ -850,7 +913,7 @@ invalidateindex(Par par, const ITab& itab)
{
Con& con = par.con();
const Tab& tab = par.tab();
con.m_dic->invalidateIndex(itab.m_name, tab.m_name);
con.m_ndb->getDictionary()->invalidateIndex(itab.m_name, tab.m_name);
return 0;
}
......@@ -874,7 +937,7 @@ invalidatetable(Par par)
Con& con = par.con();
const Tab& tab = par.tab();
invalidateindex(par);
con.m_dic->invalidateTable(tab.m_name);
con.m_ndb->getDictionary()->invalidateTable(tab.m_name);
return 0;
}
......@@ -883,6 +946,7 @@ droptable(Par par)
{
Con& con = par.con();
const Tab& tab = par.tab();
con.m_dic = con.m_ndb->getDictionary();
if (con.m_dic->getTable(tab.m_name) == 0) {
// how to check for error
LL4("no table " << tab.m_name);
......@@ -890,6 +954,7 @@ droptable(Par par)
LL3("drop table " << tab.m_name);
CHKCON(con.m_dic->dropTable(tab.m_name) == 0, con);
}
con.m_dic = 0;
return 0;
}
......@@ -897,7 +962,6 @@ static int
createtable(Par par)
{
Con& con = par.con();
CHK(con.bugger() == 0);
const Tab& tab = par.tab();
LL3("create table " << tab.m_name);
LL4(tab);
......@@ -917,7 +981,9 @@ createtable(Par par)
c.setNullable(col.m_nullable);
t.addColumn(c);
}
con.m_dic = con.m_ndb->getDictionary();
CHKCON(con.m_dic->createTable(t) == 0, con);
con.m_dic = 0;
return 0;
}
......@@ -926,6 +992,7 @@ dropindex(Par par, const ITab& itab)
{
Con& con = par.con();
const Tab& tab = par.tab();
con.m_dic = con.m_ndb->getDictionary();
if (con.m_dic->getIndex(itab.m_name, tab.m_name) == 0) {
// how to check for error
LL4("no index " << itab.m_name);
......@@ -933,6 +1000,7 @@ dropindex(Par par, const ITab& itab)
LL3("drop index " << itab.m_name);
CHKCON(con.m_dic->dropIndex(itab.m_name, tab.m_name) == 0, con);
}
con.m_dic = 0;
return 0;
}
......@@ -953,7 +1021,6 @@ static int
createindex(Par par, const ITab& itab)
{
Con& con = par.con();
CHK(con.bugger() == 0);
const Tab& tab = par.tab();
LL3("create index " << itab.m_name);
LL4(itab);
......@@ -965,7 +1032,9 @@ createindex(Par par, const ITab& itab)
const Col& col = itab.m_icol[k].m_col;
x.addColumnName(col.m_name);
}
con.m_dic = con.m_ndb->getDictionary();
CHKCON(con.m_dic->createIndex(x) == 0, con);
con.m_dic = 0;
return 0;
}
......@@ -1240,6 +1309,8 @@ struct Row {
const Tab& m_tab;
Val** m_val;
bool m_exist;
enum Op { NoOp = 0, ReadOp, InsOp, UpdOp, DelOp };
Op m_pending;
Row(const Tab& tab);
~Row();
void copy(const Row& row2);
......@@ -1264,6 +1335,7 @@ Row::Row(const Tab& tab) :
m_val[k] = new Val(col);
}
m_exist = false;
m_pending = NoOp;
}
Row::~Row()
......@@ -1301,7 +1373,7 @@ int
Row::verify(const Row& row2) const
{
const Tab& tab = m_tab;
assert(&tab == &row2.m_tab);
assert(&tab == &row2.m_tab && m_exist && row2.m_exist);
for (unsigned k = 0; k < tab.m_cols; k++) {
const Val& val = *m_val[k];
const Val& val2 = *row2.m_val[k];
......@@ -1322,7 +1394,7 @@ Row::insrow(Par par)
const Val& val = *m_val[k];
CHK(val.setval(par) == 0);
}
m_exist = true;
m_pending = InsOp;
return 0;
}
......@@ -1338,6 +1410,7 @@ Row::updrow(Par par)
const Val& val = *m_val[k];
CHK(val.setval(par) == 0);
}
m_pending = UpdOp;
return 0;
}
......@@ -1355,7 +1428,7 @@ Row::delrow(Par par)
if (col.m_pk)
CHK(val.setval(par) == 0);
}
m_exist = false;
m_pending = DelOp;
return 0;
}
......@@ -1372,7 +1445,6 @@ Row::selrow(Par par)
if (col.m_pk)
CHK(val.setval(par) == 0);
}
m_exist = false;
return 0;
}
......@@ -1387,6 +1459,7 @@ Row::setrow(Par par)
if (! col.m_pk)
CHK(val.setval(par) == 0);
}
m_pending = UpdOp;
return 0;
}
......@@ -1414,6 +1487,10 @@ operator<<(NdbOut& out, const Row& row)
out << " ";
out << *row.m_val[i];
}
out << " [exist=" << row.m_exist;
if (row.m_pending)
out << " pending=" << row.m_pending;
out << "]";
return out;
}
......@@ -1432,6 +1509,9 @@ struct Set {
unsigned count() const;
// row methods
bool exist(unsigned i) const;
Row::Op pending(unsigned i) const;
void notpending(unsigned i);
void notpending(const Lst& lst);
void calc(Par par, unsigned i);
int insrow(Par par, unsigned i);
int updrow(Par par, unsigned i);
......@@ -1446,7 +1526,7 @@ struct Set {
void savepoint();
void commit();
void rollback();
// locking (not perfect since ops may complete in different order)
// protect structure
NdbMutex* m_mutex;
void lock() {
NdbMutex_Lock(m_mutex);
......@@ -1464,6 +1544,7 @@ Set::Set(const Tab& tab, unsigned rows) :
m_rows = rows;
m_row = new Row* [m_rows];
for (unsigned i = 0; i < m_rows; i++) {
// allocate on need to save space
m_row[i] = 0;
}
m_saverow = 0;
......@@ -1519,7 +1600,18 @@ bool
Set::exist(unsigned i) const
{
assert(i < m_rows);
return m_row[i] != 0 && m_row[i]->m_exist;
if (m_row[i] == 0) // not allocated => not exist
return false;
return m_row[i]->m_exist;
}
Row::Op
Set::pending(unsigned i) const
{
assert(i < m_rows);
if (m_row[i] == 0) // not allocated => not pending
return Row::NoOp;
return m_row[i]->m_pending;
}
void
......@@ -1598,7 +1690,7 @@ Set::getkey(Par par, unsigned* i)
assert(m_rec[0] != 0);
const char* aRef0 = m_rec[0]->aRef();
Uint32 key = *(const Uint32*)aRef0;
CHKMSG(key < m_rows, "key=" << key << " rows=" << m_rows);
CHK(key < m_rows);
*i = key;
return 0;
}
......@@ -1628,12 +1720,32 @@ Set::putval(unsigned i, bool force)
return 0;
}
void
Set::notpending(unsigned i)
{
assert(m_row[i] != 0);
Row& row = *m_row[i];
if (row.m_pending == Row::InsOp)
row.m_exist = true;
if (row.m_pending == Row::DelOp)
row.m_exist = false;
row.m_pending = Row::NoOp;
}
void
Set::notpending(const Lst& lst)
{
for (unsigned j = 0; j < lst.m_cnt; j++) {
unsigned i = lst.m_arr[j];
notpending(i);
}
}
int
Set::verify(const Set& set2) const
{
const Tab& tab = m_tab;
assert(&tab == &set2.m_tab && m_rows == set2.m_rows);
CHKMSG(count() == set2.count(), "set=" << count() << " set2=" << set2.count());
for (unsigned i = 0; i < m_rows; i++) {
CHK(exist(i) == set2.exist(i));
if (! exist(i))
......@@ -1924,28 +2036,46 @@ pkinsert(Par par)
Set& set = par.set();
LL3("pkinsert");
CHK(con.startTransaction() == 0);
unsigned n = 0;
Lst lst;
for (unsigned j = 0; j < par.m_rows; j++) {
unsigned i = thrrow(par, j);
set.lock();
if (set.exist(i)) {
if (set.exist(i) || set.pending(i)) {
set.unlock();
continue;
}
set.calc(par, i);
LL4("pkinsert " << i << ": " << *set.m_row[i]);
CHKTRY(set.insrow(par, i) == 0, set.unlock());
CHK(set.insrow(par, i) == 0);
set.unlock();
if (++n == par.m_batch) {
CHK(con.execute(Commit) == 0);
LL4("pkinsert " << i << ": " << *set.m_row[i]);
lst.push(i);
if (lst.cnt() == par.m_batch) {
bool deadlock = par.m_deadlock;
CHK(con.execute(Commit, deadlock) == 0);
con.closeTransaction();
if (deadlock) {
LL1("pkinsert: stop on deadlock");
return 0;
}
set.lock();
set.notpending(lst);
set.unlock();
lst.reset();
CHK(con.startTransaction() == 0);
n = 0;
}
}
if (n != 0) {
CHK(con.execute(Commit) == 0);
n = 0;
if (lst.cnt() != 0) {
bool deadlock = par.m_deadlock;
CHK(con.execute(Commit, deadlock) == 0);
con.closeTransaction();
if (deadlock) {
LL1("pkinsert: stop on deadlock");
return 0;
}
set.lock();
set.notpending(lst);
set.unlock();
return 0;
}
con.closeTransaction();
return 0;
......@@ -1958,28 +2088,45 @@ pkupdate(Par par)
Set& set = par.set();
LL3("pkupdate");
CHK(con.startTransaction() == 0);
unsigned n = 0;
Lst lst;
bool deadlock = false;
for (unsigned j = 0; j < par.m_rows; j++) {
unsigned i = thrrow(par, j);
set.lock();
if (! set.exist(i)) {
if (! set.exist(i) || set.pending(i)) {
set.unlock();
continue;
}
set.calc(par, i);
LL4("pkupdate " << i << ": " << *set.m_row[i]);
CHKTRY(set.updrow(par, i) == 0, set.unlock());
CHK(set.updrow(par, i) == 0);
set.unlock();
if (++n == par.m_batch) {
CHK(con.execute(Commit) == 0);
LL4("pkupdate " << i << ": " << *set.m_row[i]);
lst.push(i);
if (lst.cnt() == par.m_batch) {
deadlock = par.m_deadlock;
CHK(con.execute(Commit, deadlock) == 0);
if (deadlock) {
LL1("pkupdate: stop on deadlock");
break;
}
con.closeTransaction();
set.lock();
set.notpending(lst);
set.unlock();
lst.reset();
CHK(con.startTransaction() == 0);
n = 0;
}
}
if (n != 0) {
CHK(con.execute(Commit) == 0);
n = 0;
if (! deadlock && lst.cnt() != 0) {
deadlock = par.m_deadlock;
CHK(con.execute(Commit, deadlock) == 0);
if (deadlock) {
LL1("pkupdate: stop on deadlock");
} else {
set.lock();
set.notpending(lst);
set.unlock();
}
}
con.closeTransaction();
return 0;
......@@ -1992,27 +2139,44 @@ pkdelete(Par par)
Set& set = par.set();
LL3("pkdelete");
CHK(con.startTransaction() == 0);
unsigned n = 0;
Lst lst;
bool deadlock = false;
for (unsigned j = 0; j < par.m_rows; j++) {
unsigned i = thrrow(par, j);
set.lock();
if (! set.exist(i)) {
if (! set.exist(i) || set.pending(i)) {
set.unlock();
continue;
}
LL4("pkdelete " << i << ": " << *set.m_row[i]);
CHKTRY(set.delrow(par, i) == 0, set.unlock());
CHK(set.delrow(par, i) == 0);
set.unlock();
if (++n == par.m_batch) {
CHK(con.execute(Commit) == 0);
LL4("pkdelete " << i << ": " << *set.m_row[i]);
lst.push(i);
if (lst.cnt() == par.m_batch) {
deadlock = par.m_deadlock;
CHK(con.execute(Commit, deadlock) == 0);
if (deadlock) {
LL1("pkdelete: stop on deadlock");
break;
}
con.closeTransaction();
set.lock();
set.notpending(lst);
set.unlock();
lst.reset();
CHK(con.startTransaction() == 0);
n = 0;
}
}
if (n != 0) {
CHK(con.execute(Commit) == 0);
n = 0;
if (! deadlock && lst.cnt() != 0) {
deadlock = par.m_deadlock;
CHK(con.execute(Commit, deadlock) == 0);
if (deadlock) {
LL1("pkdelete: stop on deadlock");
} else {
set.lock();
set.notpending(lst);
set.unlock();
}
}
con.closeTransaction();
return 0;
......@@ -2023,14 +2187,18 @@ pkread(Par par)
{
Con& con = par.con();
const Tab& tab = par.tab();
const Set& set = par.set();
Set& set = par.set();
LL3((par.m_verify ? "pkverify " : "pkread ") << tab.m_name);
// expected
const Set& set1 = set;
Set set2(tab, set.m_rows);
for (unsigned i = 0; i < set.m_rows; i++) {
if (! set.exist(i))
set.lock();
if (! set.exist(i) || set.pending(i)) {
set.unlock();
continue;
}
set.unlock();
CHK(con.startTransaction() == 0);
CHK(set2.selrow(par, i) == 0);
CHK(con.execute(Commit) == 0);
......@@ -2053,6 +2221,7 @@ pkreadfast(Par par, unsigned count)
const Set& set = par.set();
LL3("pkfast " << tab.m_name);
Row keyrow(tab);
// not batched on purpose
for (unsigned j = 0; j < count; j++) {
unsigned i = urandom(set.m_rows);
assert(set.exist(i));
......@@ -2089,7 +2258,7 @@ scanreadtable(Par par)
CHK(con.executeScan() == 0);
while (1) {
int ret;
CHK((ret = con.nextScanResult()) == 0 || ret == 1);
CHK((ret = con.nextScanResult(true)) == 0 || ret == 1);
if (ret == 1)
break;
unsigned i = (unsigned)-1;
......@@ -2120,7 +2289,7 @@ scanreadtablefast(Par par, unsigned countcheck)
unsigned count = 0;
while (1) {
int ret;
CHK((ret = con.nextScanResult()) == 0 || ret == 1);
CHK((ret = con.nextScanResult(true)) == 0 || ret == 1);
if (ret == 1)
break;
count++;
......@@ -2150,7 +2319,7 @@ scanreadindex(Par par, const ITab& itab, const BSet& bset)
CHK(con.executeScan() == 0);
while (1) {
int ret;
CHK((ret = con.nextScanResult()) == 0 || ret == 1);
CHK((ret = con.nextScanResult(true)) == 0 || ret == 1);
if (ret == 1)
break;
unsigned i = (unsigned)-1;
......@@ -2184,7 +2353,7 @@ scanreadindexfast(Par par, const ITab& itab, const BSet& bset, unsigned countche
unsigned count = 0;
while (1) {
int ret;
CHK((ret = con.nextScanResult()) == 0 || ret == 1);
CHK((ret = con.nextScanResult(true)) == 0 || ret == 1);
if (ret == 1)
break;
count++;
......@@ -2198,7 +2367,7 @@ static int
scanreadindex(Par par, const ITab& itab)
{
const Tab& tab = par.tab();
for (unsigned i = 0; i < par.m_subloop; i++) {
for (unsigned i = 0; i < par.m_idxloop; i++) {
BSet bset(tab, itab, par.m_rows);
bset.calc(par);
CHK(scanreadindex(par, itab, bset) == 0);
......@@ -2300,29 +2469,63 @@ scanupdatetable(Par par)
unsigned count = 0;
// updating trans
Con con2;
con2.m_ndb = con.m_ndb;
con2.connect(con);
CHK(con2.startTransaction() == 0);
Lst lst;
bool deadlock = false;
while (1) {
int ret;
CHK((ret = con.nextScanResult()) == 0 || ret == 1);
deadlock = par.m_deadlock;
CHK((ret = con.nextScanResult(true, deadlock)) == 0 || ret == 1);
if (ret == 1)
break;
if (deadlock) {
LL1("scanupdatetable: stop on deadlock");
break;
}
do {
unsigned i = (unsigned)-1;
CHK(set2.getkey(par, &i) == 0);
const Row& row = *set.m_row[i];
set.lock();
if (! set.exist(i) || set.pending(i)) {
LL4("scan update " << tab.m_name << ": skip: " << row);
} else {
CHKTRY(set2.putval(i, false) == 0, set.unlock());
CHKTRY(con.updateScanTuple(con2) == 0, set.unlock());
Par par2 = par;
par2.m_con = &con2;
set.calc(par, i);
CHKTRY(set.setrow(par2, i) == 0, set.unlock());
LL4("scan update " << tab.m_name << ": " << row);
lst.push(i);
}
set.unlock();
if (lst.cnt() == par.m_batch) {
CHK(con2.execute(Commit) == 0);
con2.closeTransaction();
set.lock();
set.notpending(lst);
set.unlock();
count += lst.cnt();
lst.reset();
CHK(con2.startTransaction() == 0);
}
CHK((ret = con.nextScanResult(false)) == 0 || ret == 1 || ret == 2);
if (ret == 2 && lst.cnt() != 0) {
CHK(con2.execute(Commit) == 0);
con2.closeTransaction();
set.lock();
set.notpending(lst);
set.unlock();
count += lst.cnt();
lst.reset();
CHK(con2.startTransaction() == 0);
}
} while (ret == 0);
if (ret == 1)
break;
unsigned i = (unsigned)-1;
CHK(set2.getkey(par, &i) == 0);
LL4("key " << i);
CHK(set2.putval(i, false) == 0);
CHK(con2.takeOverForUpdate(con) == 0);
Par par2 = par;
par2.m_con = &con2;
set.lock();
set.calc(par, i);
LL4("scan update " << tab.m_name << ": " << *set.m_row[i]);
CHKTRY(set.setrow(par2, i) == 0, set.unlock());
set.unlock();
CHK(con2.execute(NoCommit) == 0);
count++;
}
CHK(con2.execute(Commit) == 0);
con2.closeTransaction();
LL3("scan update " << tab.m_name << " rows updated=" << count);
con.closeTransaction();
......@@ -2346,32 +2549,61 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
unsigned count = 0;
// updating trans
Con con2;
con2.m_ndb = con.m_ndb;
con2.connect(con);
CHK(con2.startTransaction() == 0);
Lst lst;
bool deadlock = false;
while (1) {
int ret;
CHK((ret = con.nextScanResult()) == 0 || ret == 1);
deadlock = par.m_deadlock;
CHK((ret = con.nextScanResult(true, deadlock)) == 0 || ret == 1);
if (ret == 1)
break;
unsigned i = (unsigned)-1;
CHK(set2.getkey(par, &i) == 0);
LL4("key " << i);
CHK(set2.putval(i, par.m_dups) == 0);
// avoid deadlock for now
//if (! isthrrow(par, i))
//continue;
CHK(con2.takeOverForUpdate(con) == 0);
Par par2 = par;
par2.m_con = &con2;
set.lock();
set.calc(par, i);
LL4("scan update " << itab.m_name << ": " << *set.m_row[i]);
CHKTRY(set.setrow(par2, i) == 0, set.unlock());
set.unlock();
CHK(con2.execute(NoCommit) == 0);
count++;
if (deadlock) {
LL1("scanupdateindex: stop on deadlock");
break;
}
do {
unsigned i = (unsigned)-1;
CHK(set2.getkey(par, &i) == 0);
const Row& row = *set.m_row[i];
set.lock();
if (! set.exist(i) || set.pending(i)) {
LL4("scan update " << itab.m_name << ": skip: " << row);
} else {
CHKTRY(set2.putval(i, par.m_dups) == 0, set.unlock());
CHKTRY(con.updateScanTuple(con2) == 0, set.unlock());
Par par2 = par;
par2.m_con = &con2;
set.calc(par, i);
CHKTRY(set.setrow(par2, i) == 0, set.unlock());
LL4("scan update " << itab.m_name << ": " << row);
lst.push(i);
}
set.unlock();
if (lst.cnt() == par.m_batch) {
CHK(con2.execute(Commit) == 0);
con2.closeTransaction();
set.lock();
set.notpending(lst);
set.unlock();
count += lst.cnt();
lst.reset();
CHK(con2.startTransaction() == 0);
}
CHK((ret = con.nextScanResult(false)) == 0 || ret == 1 || ret == 2);
if (ret == 2 && lst.cnt() != 0) {
CHK(con2.execute(Commit) == 0);
con2.closeTransaction();
set.lock();
set.notpending(lst);
set.unlock();
count += lst.cnt();
lst.reset();
CHK(con2.startTransaction() == 0);
}
} while (ret == 0);
}
CHK(con2.execute(Commit) == 0);
con2.closeTransaction();
LL3("scan update " << itab.m_name << " rows updated=" << count);
con.closeTransaction();
......@@ -2382,7 +2614,7 @@ static int
scanupdateindex(Par par, const ITab& itab)
{
const Tab& tab = par.tab();
for (unsigned i = 0; i < par.m_subloop; i++) {
for (unsigned i = 0; i < par.m_idxloop; i++) {
BSet bset(tab, itab, par.m_rows);
bset.calc(par);
CHK(scanupdateindex(par, itab, bset) == 0);
......@@ -2413,41 +2645,15 @@ scanupdateall(Par par)
// medium level routines
static bool
ignoreverifyerror(Par par)
{
Con& con = par.con();
bool b = par.m_threads > 1;
if (b) {
LL1("ignore verify error");
if (con.m_tx != 0)
con.closeTransaction();
return true;
}
return b;
}
static int
readverify(Par par)
{
par.m_verify = true;
CHK(pkread(par) == 0 || ignoreverifyerror(par));
CHK(scanreadall(par) == 0 || ignoreverifyerror(par));
CHK(pkread(par) == 0);
CHK(scanreadall(par) == 0);
return 0;
}
static bool
ignoredeadlock(Par par)
{
Con& con = par.con();
if (con.m_errtype == Con::ErrDeadlock) {
LL1("ignore deadlock");
con.closeTransaction();
return true;
}
return false;
}
static int
pkupdatescanread(Par par)
{
......@@ -2469,15 +2675,16 @@ static int
mixedoperations(Par par)
{
par.m_dups = true;
par.m_deadlock = true;
unsigned sel = urandom(10);
if (sel < 2) {
CHK(pkdelete(par) == 0 || ignoredeadlock(par));
CHK(pkdelete(par) == 0);
} else if (sel < 4) {
CHK(pkupdate(par) == 0 || ignoredeadlock(par));
CHK(pkupdate(par) == 0);
} else if (sel < 6) {
CHK(scanupdatetable(par) == 0 || ignoredeadlock(par));
CHK(scanupdatetable(par) == 0);
} else {
CHK(scanupdateindex(par) == 0 || ignoredeadlock(par));
CHK(scanupdateindex(par) == 0);
}
return 0;
}
......@@ -2611,7 +2818,6 @@ Thr::run()
break;
}
LL4("start");
CHK(con.bugger() == 0);
assert(m_state == Start);
m_ret = (*m_func)(m_par);
m_state = Stopped;
......@@ -2936,7 +3142,8 @@ static int
runtest(Par par)
{
LL1("start");
srandom(par.m_seed);
if (par.m_seed != 0)
srandom(par.m_seed);
Con con;
CHK(con.connect() == 0);
par.m_con = &con;
......@@ -2951,6 +3158,8 @@ runtest(Par par)
}
for (unsigned l = 0; par.m_loop == 0 || l < par.m_loop; l++) {
LL1("loop " << l);
if (par.m_seed == 0)
srandom(l);
for (unsigned i = 0; i < tcasecount; i++) {
const TCase& tcase = tcaselist[i];
if (par.m_case != 0 && strchr(par.m_case, tcase.m_name[0]) == 0)
......@@ -2992,6 +3201,12 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535)
ndbout << "testOIBasic: unknown argument " << arg;
goto usage;
}
if (strcmp(arg, "-batch") == 0) {
if (++argv, --argc > 0) {
g_opt.m_batch = atoi(argv[0]);
continue;
}
}
if (strcmp(arg, "-case") == 0) {
if (++argv, --argc > 0) {
g_opt.m_case = strdup(argv[0]);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment