Commit 1824f2e8 authored by unknown's avatar unknown

testOIBasic.cpp:

  make it fail in more useful ways


ndb/test/ndbapi/testOIBasic.cpp:
  try to get correct result even on deadlock
parent f8820128
...@@ -33,13 +33,16 @@ ...@@ -33,13 +33,16 @@
struct Opt { struct Opt {
// common options // common options
unsigned m_batch;
const char* m_case; const char* m_case;
bool m_core; bool m_core;
bool m_dups; bool m_dups;
NdbDictionary::Object::FragmentType m_fragtype; NdbDictionary::Object::FragmentType m_fragtype;
unsigned m_idxloop;
const char* m_index; const char* m_index;
unsigned m_loop; unsigned m_loop;
bool m_nologging; bool m_nologging;
bool m_msglock;
unsigned m_rows; unsigned m_rows;
unsigned m_samples; unsigned m_samples;
unsigned m_scanrd; unsigned m_scanrd;
...@@ -50,18 +53,21 @@ struct Opt { ...@@ -50,18 +53,21 @@ struct Opt {
unsigned m_threads; unsigned m_threads;
unsigned m_v; unsigned m_v;
Opt() : Opt() :
m_batch(32),
m_case(0), m_case(0),
m_core(false), m_core(false),
m_dups(false), m_dups(false),
m_fragtype(NdbDictionary::Object::FragUndefined), m_fragtype(NdbDictionary::Object::FragUndefined),
m_idxloop(4),
m_index(0), m_index(0),
m_loop(1), m_loop(1),
m_nologging(false), m_nologging(false),
m_msglock(true),
m_rows(1000), m_rows(1000),
m_samples(0), m_samples(0),
m_scanrd(240), m_scanrd(240),
m_scanex(240), m_scanex(240),
m_seed(1), m_seed(0),
m_subloop(4), m_subloop(4),
m_table(0), m_table(0),
m_threads(4), m_threads(4),
...@@ -80,6 +86,7 @@ printhelp() ...@@ -80,6 +86,7 @@ printhelp()
Opt d; Opt d;
ndbout ndbout
<< "usage: testOIbasic [options]" << endl << "usage: testOIbasic [options]" << endl
<< " -batch N pk operations in batch [" << d.m_batch << "]" << endl
<< " -case abc only given test cases (letters a-z)" << endl << " -case abc only given test cases (letters a-z)" << endl
<< " -core core dump on error [" << d.m_core << "]" << endl << " -core core dump on error [" << d.m_core << "]" << endl
<< " -dups allow duplicate tuples from index scan [" << d.m_dups << "]" << endl << " -dups allow duplicate tuples from index scan [" << d.m_dups << "]" << endl
...@@ -91,7 +98,7 @@ printhelp() ...@@ -91,7 +98,7 @@ printhelp()
<< " -samples N samples for some timings (0=all) [" << d.m_samples << "]" << endl << " -samples N samples for some timings (0=all) [" << d.m_samples << "]" << endl
<< " -scanrd N scan read parallelism [" << d.m_scanrd << "]" << endl << " -scanrd N scan read parallelism [" << d.m_scanrd << "]" << endl
<< " -scanex N scan exclusive parallelism [" << d.m_scanex << "]" << endl << " -scanex N scan exclusive parallelism [" << d.m_scanex << "]" << endl
<< " -seed N srandom seed [" << d.m_seed << "]" << endl << " -seed N srandom seed 0=loop number[" << d.m_seed << "]" << endl
<< " -subloop N subtest loop count [" << d.m_subloop << "]" << endl << " -subloop N subtest loop count [" << d.m_subloop << "]" << endl
<< " -table xyz only given table numbers (digits 1-9)" << endl << " -table xyz only given table numbers (digits 1-9)" << endl
<< " -threads N number of threads [" << d.m_threads << "]" << endl << " -threads N number of threads [" << d.m_threads << "]" << endl
...@@ -133,9 +140,9 @@ getthrstr() ...@@ -133,9 +140,9 @@ getthrstr()
#define LLN(n, s) \ #define LLN(n, s) \
do { \ do { \
if ((n) > g_opt.m_v) break; \ if ((n) > g_opt.m_v) break; \
NdbMutex_Lock(&ndbout_mutex); \ if (g_opt.m_msglock) NdbMutex_Lock(&ndbout_mutex); \
ndbout << getthrstr() << s << endl; \ ndbout << getthrstr() << s << endl; \
NdbMutex_Unlock(&ndbout_mutex); \ if (g_opt.m_msglock) NdbMutex_Unlock(&ndbout_mutex); \
} while(0) } while(0)
#define LL0(s) LLN(0, s) #define LL0(s) LLN(0, s)
...@@ -148,11 +155,10 @@ getthrstr() ...@@ -148,11 +155,10 @@ getthrstr()
// following check a condition and return -1 on failure // following check a condition and return -1 on failure
#undef CHK // simple check #undef CHK // simple check
#undef CHKTRY // execute action (try-catch) on failure #undef CHKTRY // check with action on fail
#undef CHKMSG // print extra message on failure
#undef CHKCON // print NDB API errors on failure #undef CHKCON // print NDB API errors on failure
#define CHK(x) CHKTRY(x, ;) #define CHK(x) CHKTRY(x, ;)
#define CHKTRY(x, act) \ #define CHKTRY(x, act) \
do { \ do { \
...@@ -163,14 +169,6 @@ getthrstr() ...@@ -163,14 +169,6 @@ getthrstr()
return -1; \ return -1; \
} while (0) } while (0)
#define CHKMSG(x, msg) \
do { \
if (x) break; \
LL0("line " << __LINE__ << ": " << #x << " failed: " << msg); \
if (g_opt.m_core) abort(); \
return -1; \
} while (0)
#define CHKCON(x, con) \ #define CHKCON(x, con) \
do { \ do { \
if (x) break; \ if (x) break; \
...@@ -199,13 +197,14 @@ struct Par : public Opt { ...@@ -199,13 +197,14 @@ struct Par : public Opt {
Tmr* m_tmr; Tmr* m_tmr;
Tmr& tmr() const { assert(m_tmr != 0); return *m_tmr; } Tmr& tmr() const { assert(m_tmr != 0); return *m_tmr; }
unsigned m_totrows; unsigned m_totrows;
unsigned m_batch;
// value calculation // value calculation
unsigned m_pctnull; unsigned m_pctnull;
unsigned m_range; unsigned m_range;
unsigned m_pctrange; unsigned m_pctrange;
// do verify after read // do verify after read
bool m_verify; bool m_verify;
// deadlock possible
bool m_deadlock;
// timer location // timer location
Par(const Opt& opt) : Par(const Opt& opt) :
Opt(opt), Opt(opt),
...@@ -215,11 +214,11 @@ struct Par : public Opt { ...@@ -215,11 +214,11 @@ struct Par : public Opt {
m_set(0), m_set(0),
m_tmr(0), m_tmr(0),
m_totrows(m_threads * m_rows), m_totrows(m_threads * m_rows),
m_batch(32),
m_pctnull(10), m_pctnull(10),
m_range(m_rows), m_range(m_rows),
m_pctrange(0), m_pctrange(0),
m_verify(false) { m_verify(false),
m_deadlock(false) {
} }
}; };
...@@ -313,13 +312,51 @@ const char* ...@@ -313,13 +312,51 @@ const char*
Tmr::over(const Tmr& t1) Tmr::over(const Tmr& t1)
{ {
if (0 < t1.m_ms) { if (0 < t1.m_ms) {
sprintf(m_text, "%d pct", (100 * (m_ms - t1.m_ms)) / t1.m_ms); if (t1.m_ms <= m_ms)
sprintf(m_text, "%u pct", (100 * (m_ms - t1.m_ms)) / t1.m_ms);
else
sprintf(m_text, "-%u pct", (100 * (t1.m_ms - m_ms)) / t1.m_ms);
} else { } else {
sprintf(m_text, "[cannot measure]"); sprintf(m_text, "[cannot measure]");
} }
return m_text; return m_text;
} }
// list of ints
struct Lst {
Lst();
unsigned m_arr[1000];
unsigned m_cnt;
void push(unsigned i);
unsigned cnt() const;
void reset();
};
Lst::Lst() :
m_cnt(0)
{
}
void
Lst::push(unsigned i)
{
assert(m_cnt < sizeof(m_arr)/sizeof(m_arr[0]));
m_arr[m_cnt++] = i;
}
unsigned
Lst::cnt() const
{
return m_cnt;
}
void
Lst::reset()
{
m_cnt = 0;
}
// tables and indexes // tables and indexes
// Col - table column // Col - table column
...@@ -624,15 +661,14 @@ struct Con { ...@@ -624,15 +661,14 @@ struct Con {
Con() : Con() :
m_ndb(0), m_dic(0), m_tx(0), m_op(0), m_ndb(0), m_dic(0), m_tx(0), m_op(0),
m_scanop(0), m_indexscanop(0), m_resultset(0), m_scanmode(ScanNo), m_errtype(ErrNone) {} m_scanop(0), m_indexscanop(0), m_resultset(0), m_scanmode(ScanNo), m_errtype(ErrNone) {}
~Con() {
~Con(){ if (m_tx != 0)
if(m_tx) closeTransaction(); closeTransaction();
} }
int connect(); int connect();
void connect(const Con& con);
void disconnect(); void disconnect();
int startTransaction(); int startTransaction();
int startBuddyTransaction(const Con& con);
int getNdbOperation(const Tab& tab); int getNdbOperation(const Tab& tab);
int getNdbScanOperation(const Tab& tab); int getNdbScanOperation(const Tab& tab);
int getNdbScanOperation(const ITab& itab, const Tab& tab); int getNdbScanOperation(const ITab& itab, const Tab& tab);
...@@ -641,20 +677,16 @@ struct Con { ...@@ -641,20 +677,16 @@ struct Con {
int setValue(int num, const char* addr); int setValue(int num, const char* addr);
int setBound(int num, int type, const void* value); int setBound(int num, int type, const void* value);
int execute(ExecType t); int execute(ExecType t);
int execute(ExecType t, bool& deadlock);
int openScanRead(unsigned parallelism); int openScanRead(unsigned parallelism);
int openScanExclusive(unsigned parallelism); int openScanExclusive(unsigned parallelism);
int executeScan(); int executeScan();
int nextScanResult(); int nextScanResult(bool fetchAllowed);
int takeOverForUpdate(Con& scan); int nextScanResult(bool fetchAllowed, bool& deadlock);
int takeOverForDelete(Con& scan); int updateScanTuple(Con& con2);
int deleteScanTuple(Con& con2);
void closeTransaction(); void closeTransaction();
void printerror(NdbOut& out); void printerror(NdbOut& out);
// flush dict cache
int bugger() {
//disconnect();
//CHK(connect() == 0);
return 0;
}
}; };
int int
...@@ -664,11 +696,17 @@ Con::connect() ...@@ -664,11 +696,17 @@ Con::connect()
m_ndb = new Ndb("TEST_DB"); m_ndb = new Ndb("TEST_DB");
CHKCON(m_ndb->init() == 0, *this); CHKCON(m_ndb->init() == 0, *this);
CHKCON(m_ndb->waitUntilReady(30) == 0, *this); CHKCON(m_ndb->waitUntilReady(30) == 0, *this);
m_dic = m_ndb->getDictionary();
m_tx = 0, m_op = 0; m_tx = 0, m_op = 0;
return 0; return 0;
} }
void
Con::connect(const Con& con)
{
assert(m_ndb == 0);
m_ndb = con.m_ndb;
}
void void
Con::disconnect() Con::disconnect()
{ {
...@@ -680,19 +718,12 @@ int ...@@ -680,19 +718,12 @@ int
Con::startTransaction() Con::startTransaction()
{ {
assert(m_ndb != 0); assert(m_ndb != 0);
if(m_tx) closeTransaction(); if (m_tx != 0)
closeTransaction();
CHKCON((m_tx = m_ndb->startTransaction()) != 0, *this); CHKCON((m_tx = m_ndb->startTransaction()) != 0, *this);
return 0; return 0;
} }
int
Con::startBuddyTransaction(const Con& con)
{
assert(m_ndb != 0 && m_tx == 0 && con.m_ndb == m_ndb && con.m_tx != 0);
CHKCON((m_tx = m_ndb->hupp(con.m_tx)) != 0, *this);
return 0;
}
int int
Con::getNdbOperation(const Tab& tab) Con::getNdbOperation(const Tab& tab)
{ {
...@@ -757,6 +788,22 @@ Con::execute(ExecType t) ...@@ -757,6 +788,22 @@ Con::execute(ExecType t)
return 0; return 0;
} }
int
Con::execute(ExecType t, bool& deadlock)
{
int ret = execute(t);
if (ret != 0) {
if (deadlock && m_errtype == ErrDeadlock) {
LL3("caught deadlock");
ret = 0;
}
} else {
deadlock = false;
}
CHK(ret == 0);
return 0;
}
int int
Con::openScanRead(unsigned parallelism) Con::openScanRead(unsigned parallelism)
{ {
...@@ -781,28 +828,44 @@ Con::executeScan() ...@@ -781,28 +828,44 @@ Con::executeScan()
} }
int int
Con::nextScanResult() Con::nextScanResult(bool fetchAllowed)
{ {
int ret; int ret;
assert(m_resultset != 0); assert(m_resultset != 0);
CHKCON((ret = m_resultset->nextResult()) != -1, *this); CHKCON((ret = m_resultset->nextResult(fetchAllowed)) != -1, *this);
assert(ret == 0 || ret == 1); assert(ret == 0 || ret == 1 || (! fetchAllowed && ret == 2));
return ret; return ret;
} }
int int
Con::takeOverForUpdate(Con& scan) Con::nextScanResult(bool fetchAllowed, bool& deadlock)
{ {
assert(m_tx != 0 && scan.m_op != 0); int ret = nextScanResult(fetchAllowed);
CHKCON((m_op = scan.m_resultset->updateTuple(m_tx)) != 0, scan); if (ret == -1) {
if (deadlock && m_errtype == ErrDeadlock) {
LL3("caught deadlock");
ret = 0;
}
} else {
deadlock = false;
}
CHK(ret == 0 || ret == 1 || (! fetchAllowed && ret == 2));
return ret;
}
int
Con::updateScanTuple(Con& con2)
{
assert(con2.m_tx != 0);
CHKCON((con2.m_op = m_resultset->updateTuple(con2.m_tx)) != 0, *this);
return 0; return 0;
} }
int int
Con::takeOverForDelete(Con& scan) Con::deleteScanTuple(Con& con2)
{ {
assert(m_tx != 0 && scan.m_op != 0); assert(con2.m_tx != 0);
CHKCON(scan.m_resultset->deleteTuple(m_tx) == 0, scan); CHKCON(m_resultset->deleteTuple(con2.m_tx) == 0, *this);
return 0; return 0;
} }
...@@ -850,7 +913,7 @@ invalidateindex(Par par, const ITab& itab) ...@@ -850,7 +913,7 @@ invalidateindex(Par par, const ITab& itab)
{ {
Con& con = par.con(); Con& con = par.con();
const Tab& tab = par.tab(); const Tab& tab = par.tab();
con.m_dic->invalidateIndex(itab.m_name, tab.m_name); con.m_ndb->getDictionary()->invalidateIndex(itab.m_name, tab.m_name);
return 0; return 0;
} }
...@@ -874,7 +937,7 @@ invalidatetable(Par par) ...@@ -874,7 +937,7 @@ invalidatetable(Par par)
Con& con = par.con(); Con& con = par.con();
const Tab& tab = par.tab(); const Tab& tab = par.tab();
invalidateindex(par); invalidateindex(par);
con.m_dic->invalidateTable(tab.m_name); con.m_ndb->getDictionary()->invalidateTable(tab.m_name);
return 0; return 0;
} }
...@@ -883,6 +946,7 @@ droptable(Par par) ...@@ -883,6 +946,7 @@ droptable(Par par)
{ {
Con& con = par.con(); Con& con = par.con();
const Tab& tab = par.tab(); const Tab& tab = par.tab();
con.m_dic = con.m_ndb->getDictionary();
if (con.m_dic->getTable(tab.m_name) == 0) { if (con.m_dic->getTable(tab.m_name) == 0) {
// how to check for error // how to check for error
LL4("no table " << tab.m_name); LL4("no table " << tab.m_name);
...@@ -890,6 +954,7 @@ droptable(Par par) ...@@ -890,6 +954,7 @@ droptable(Par par)
LL3("drop table " << tab.m_name); LL3("drop table " << tab.m_name);
CHKCON(con.m_dic->dropTable(tab.m_name) == 0, con); CHKCON(con.m_dic->dropTable(tab.m_name) == 0, con);
} }
con.m_dic = 0;
return 0; return 0;
} }
...@@ -897,7 +962,6 @@ static int ...@@ -897,7 +962,6 @@ static int
createtable(Par par) createtable(Par par)
{ {
Con& con = par.con(); Con& con = par.con();
CHK(con.bugger() == 0);
const Tab& tab = par.tab(); const Tab& tab = par.tab();
LL3("create table " << tab.m_name); LL3("create table " << tab.m_name);
LL4(tab); LL4(tab);
...@@ -917,7 +981,9 @@ createtable(Par par) ...@@ -917,7 +981,9 @@ createtable(Par par)
c.setNullable(col.m_nullable); c.setNullable(col.m_nullable);
t.addColumn(c); t.addColumn(c);
} }
con.m_dic = con.m_ndb->getDictionary();
CHKCON(con.m_dic->createTable(t) == 0, con); CHKCON(con.m_dic->createTable(t) == 0, con);
con.m_dic = 0;
return 0; return 0;
} }
...@@ -926,6 +992,7 @@ dropindex(Par par, const ITab& itab) ...@@ -926,6 +992,7 @@ dropindex(Par par, const ITab& itab)
{ {
Con& con = par.con(); Con& con = par.con();
const Tab& tab = par.tab(); const Tab& tab = par.tab();
con.m_dic = con.m_ndb->getDictionary();
if (con.m_dic->getIndex(itab.m_name, tab.m_name) == 0) { if (con.m_dic->getIndex(itab.m_name, tab.m_name) == 0) {
// how to check for error // how to check for error
LL4("no index " << itab.m_name); LL4("no index " << itab.m_name);
...@@ -933,6 +1000,7 @@ dropindex(Par par, const ITab& itab) ...@@ -933,6 +1000,7 @@ dropindex(Par par, const ITab& itab)
LL3("drop index " << itab.m_name); LL3("drop index " << itab.m_name);
CHKCON(con.m_dic->dropIndex(itab.m_name, tab.m_name) == 0, con); CHKCON(con.m_dic->dropIndex(itab.m_name, tab.m_name) == 0, con);
} }
con.m_dic = 0;
return 0; return 0;
} }
...@@ -953,7 +1021,6 @@ static int ...@@ -953,7 +1021,6 @@ static int
createindex(Par par, const ITab& itab) createindex(Par par, const ITab& itab)
{ {
Con& con = par.con(); Con& con = par.con();
CHK(con.bugger() == 0);
const Tab& tab = par.tab(); const Tab& tab = par.tab();
LL3("create index " << itab.m_name); LL3("create index " << itab.m_name);
LL4(itab); LL4(itab);
...@@ -965,7 +1032,9 @@ createindex(Par par, const ITab& itab) ...@@ -965,7 +1032,9 @@ createindex(Par par, const ITab& itab)
const Col& col = itab.m_icol[k].m_col; const Col& col = itab.m_icol[k].m_col;
x.addColumnName(col.m_name); x.addColumnName(col.m_name);
} }
con.m_dic = con.m_ndb->getDictionary();
CHKCON(con.m_dic->createIndex(x) == 0, con); CHKCON(con.m_dic->createIndex(x) == 0, con);
con.m_dic = 0;
return 0; return 0;
} }
...@@ -1240,6 +1309,8 @@ struct Row { ...@@ -1240,6 +1309,8 @@ struct Row {
const Tab& m_tab; const Tab& m_tab;
Val** m_val; Val** m_val;
bool m_exist; bool m_exist;
enum Op { NoOp = 0, ReadOp, InsOp, UpdOp, DelOp };
Op m_pending;
Row(const Tab& tab); Row(const Tab& tab);
~Row(); ~Row();
void copy(const Row& row2); void copy(const Row& row2);
...@@ -1264,6 +1335,7 @@ Row::Row(const Tab& tab) : ...@@ -1264,6 +1335,7 @@ Row::Row(const Tab& tab) :
m_val[k] = new Val(col); m_val[k] = new Val(col);
} }
m_exist = false; m_exist = false;
m_pending = NoOp;
} }
Row::~Row() Row::~Row()
...@@ -1301,7 +1373,7 @@ int ...@@ -1301,7 +1373,7 @@ int
Row::verify(const Row& row2) const Row::verify(const Row& row2) const
{ {
const Tab& tab = m_tab; const Tab& tab = m_tab;
assert(&tab == &row2.m_tab); assert(&tab == &row2.m_tab && m_exist && row2.m_exist);
for (unsigned k = 0; k < tab.m_cols; k++) { for (unsigned k = 0; k < tab.m_cols; k++) {
const Val& val = *m_val[k]; const Val& val = *m_val[k];
const Val& val2 = *row2.m_val[k]; const Val& val2 = *row2.m_val[k];
...@@ -1322,7 +1394,7 @@ Row::insrow(Par par) ...@@ -1322,7 +1394,7 @@ Row::insrow(Par par)
const Val& val = *m_val[k]; const Val& val = *m_val[k];
CHK(val.setval(par) == 0); CHK(val.setval(par) == 0);
} }
m_exist = true; m_pending = InsOp;
return 0; return 0;
} }
...@@ -1338,6 +1410,7 @@ Row::updrow(Par par) ...@@ -1338,6 +1410,7 @@ Row::updrow(Par par)
const Val& val = *m_val[k]; const Val& val = *m_val[k];
CHK(val.setval(par) == 0); CHK(val.setval(par) == 0);
} }
m_pending = UpdOp;
return 0; return 0;
} }
...@@ -1355,7 +1428,7 @@ Row::delrow(Par par) ...@@ -1355,7 +1428,7 @@ Row::delrow(Par par)
if (col.m_pk) if (col.m_pk)
CHK(val.setval(par) == 0); CHK(val.setval(par) == 0);
} }
m_exist = false; m_pending = DelOp;
return 0; return 0;
} }
...@@ -1372,7 +1445,6 @@ Row::selrow(Par par) ...@@ -1372,7 +1445,6 @@ Row::selrow(Par par)
if (col.m_pk) if (col.m_pk)
CHK(val.setval(par) == 0); CHK(val.setval(par) == 0);
} }
m_exist = false;
return 0; return 0;
} }
...@@ -1387,6 +1459,7 @@ Row::setrow(Par par) ...@@ -1387,6 +1459,7 @@ Row::setrow(Par par)
if (! col.m_pk) if (! col.m_pk)
CHK(val.setval(par) == 0); CHK(val.setval(par) == 0);
} }
m_pending = UpdOp;
return 0; return 0;
} }
...@@ -1414,6 +1487,10 @@ operator<<(NdbOut& out, const Row& row) ...@@ -1414,6 +1487,10 @@ operator<<(NdbOut& out, const Row& row)
out << " "; out << " ";
out << *row.m_val[i]; out << *row.m_val[i];
} }
out << " [exist=" << row.m_exist;
if (row.m_pending)
out << " pending=" << row.m_pending;
out << "]";
return out; return out;
} }
...@@ -1432,6 +1509,9 @@ struct Set { ...@@ -1432,6 +1509,9 @@ struct Set {
unsigned count() const; unsigned count() const;
// row methods // row methods
bool exist(unsigned i) const; bool exist(unsigned i) const;
Row::Op pending(unsigned i) const;
void notpending(unsigned i);
void notpending(const Lst& lst);
void calc(Par par, unsigned i); void calc(Par par, unsigned i);
int insrow(Par par, unsigned i); int insrow(Par par, unsigned i);
int updrow(Par par, unsigned i); int updrow(Par par, unsigned i);
...@@ -1446,7 +1526,7 @@ struct Set { ...@@ -1446,7 +1526,7 @@ struct Set {
void savepoint(); void savepoint();
void commit(); void commit();
void rollback(); void rollback();
// locking (not perfect since ops may complete in different order) // protect structure
NdbMutex* m_mutex; NdbMutex* m_mutex;
void lock() { void lock() {
NdbMutex_Lock(m_mutex); NdbMutex_Lock(m_mutex);
...@@ -1464,6 +1544,7 @@ Set::Set(const Tab& tab, unsigned rows) : ...@@ -1464,6 +1544,7 @@ Set::Set(const Tab& tab, unsigned rows) :
m_rows = rows; m_rows = rows;
m_row = new Row* [m_rows]; m_row = new Row* [m_rows];
for (unsigned i = 0; i < m_rows; i++) { for (unsigned i = 0; i < m_rows; i++) {
// allocate on need to save space
m_row[i] = 0; m_row[i] = 0;
} }
m_saverow = 0; m_saverow = 0;
...@@ -1519,7 +1600,18 @@ bool ...@@ -1519,7 +1600,18 @@ bool
Set::exist(unsigned i) const Set::exist(unsigned i) const
{ {
assert(i < m_rows); assert(i < m_rows);
return m_row[i] != 0 && m_row[i]->m_exist; if (m_row[i] == 0) // not allocated => not exist
return false;
return m_row[i]->m_exist;
}
Row::Op
Set::pending(unsigned i) const
{
assert(i < m_rows);
if (m_row[i] == 0) // not allocated => not pending
return Row::NoOp;
return m_row[i]->m_pending;
} }
void void
...@@ -1598,7 +1690,7 @@ Set::getkey(Par par, unsigned* i) ...@@ -1598,7 +1690,7 @@ Set::getkey(Par par, unsigned* i)
assert(m_rec[0] != 0); assert(m_rec[0] != 0);
const char* aRef0 = m_rec[0]->aRef(); const char* aRef0 = m_rec[0]->aRef();
Uint32 key = *(const Uint32*)aRef0; Uint32 key = *(const Uint32*)aRef0;
CHKMSG(key < m_rows, "key=" << key << " rows=" << m_rows); CHK(key < m_rows);
*i = key; *i = key;
return 0; return 0;
} }
...@@ -1628,12 +1720,32 @@ Set::putval(unsigned i, bool force) ...@@ -1628,12 +1720,32 @@ Set::putval(unsigned i, bool force)
return 0; return 0;
} }
void
Set::notpending(unsigned i)
{
assert(m_row[i] != 0);
Row& row = *m_row[i];
if (row.m_pending == Row::InsOp)
row.m_exist = true;
if (row.m_pending == Row::DelOp)
row.m_exist = false;
row.m_pending = Row::NoOp;
}
void
Set::notpending(const Lst& lst)
{
for (unsigned j = 0; j < lst.m_cnt; j++) {
unsigned i = lst.m_arr[j];
notpending(i);
}
}
int int
Set::verify(const Set& set2) const Set::verify(const Set& set2) const
{ {
const Tab& tab = m_tab; const Tab& tab = m_tab;
assert(&tab == &set2.m_tab && m_rows == set2.m_rows); assert(&tab == &set2.m_tab && m_rows == set2.m_rows);
CHKMSG(count() == set2.count(), "set=" << count() << " set2=" << set2.count());
for (unsigned i = 0; i < m_rows; i++) { for (unsigned i = 0; i < m_rows; i++) {
CHK(exist(i) == set2.exist(i)); CHK(exist(i) == set2.exist(i));
if (! exist(i)) if (! exist(i))
...@@ -1924,28 +2036,46 @@ pkinsert(Par par) ...@@ -1924,28 +2036,46 @@ pkinsert(Par par)
Set& set = par.set(); Set& set = par.set();
LL3("pkinsert"); LL3("pkinsert");
CHK(con.startTransaction() == 0); CHK(con.startTransaction() == 0);
unsigned n = 0; Lst lst;
for (unsigned j = 0; j < par.m_rows; j++) { for (unsigned j = 0; j < par.m_rows; j++) {
unsigned i = thrrow(par, j); unsigned i = thrrow(par, j);
set.lock(); set.lock();
if (set.exist(i)) { if (set.exist(i) || set.pending(i)) {
set.unlock(); set.unlock();
continue; continue;
} }
set.calc(par, i); set.calc(par, i);
LL4("pkinsert " << i << ": " << *set.m_row[i]); CHK(set.insrow(par, i) == 0);
CHKTRY(set.insrow(par, i) == 0, set.unlock());
set.unlock(); set.unlock();
if (++n == par.m_batch) { LL4("pkinsert " << i << ": " << *set.m_row[i]);
CHK(con.execute(Commit) == 0); lst.push(i);
if (lst.cnt() == par.m_batch) {
bool deadlock = par.m_deadlock;
CHK(con.execute(Commit, deadlock) == 0);
con.closeTransaction(); con.closeTransaction();
if (deadlock) {
LL1("pkinsert: stop on deadlock");
return 0;
}
set.lock();
set.notpending(lst);
set.unlock();
lst.reset();
CHK(con.startTransaction() == 0); CHK(con.startTransaction() == 0);
n = 0;
} }
} }
if (n != 0) { if (lst.cnt() != 0) {
CHK(con.execute(Commit) == 0); bool deadlock = par.m_deadlock;
n = 0; CHK(con.execute(Commit, deadlock) == 0);
con.closeTransaction();
if (deadlock) {
LL1("pkinsert: stop on deadlock");
return 0;
}
set.lock();
set.notpending(lst);
set.unlock();
return 0;
} }
con.closeTransaction(); con.closeTransaction();
return 0; return 0;
...@@ -1958,28 +2088,45 @@ pkupdate(Par par) ...@@ -1958,28 +2088,45 @@ pkupdate(Par par)
Set& set = par.set(); Set& set = par.set();
LL3("pkupdate"); LL3("pkupdate");
CHK(con.startTransaction() == 0); CHK(con.startTransaction() == 0);
unsigned n = 0; Lst lst;
bool deadlock = false;
for (unsigned j = 0; j < par.m_rows; j++) { for (unsigned j = 0; j < par.m_rows; j++) {
unsigned i = thrrow(par, j); unsigned i = thrrow(par, j);
set.lock(); set.lock();
if (! set.exist(i)) { if (! set.exist(i) || set.pending(i)) {
set.unlock(); set.unlock();
continue; continue;
} }
set.calc(par, i); set.calc(par, i);
LL4("pkupdate " << i << ": " << *set.m_row[i]); CHK(set.updrow(par, i) == 0);
CHKTRY(set.updrow(par, i) == 0, set.unlock());
set.unlock(); set.unlock();
if (++n == par.m_batch) { LL4("pkupdate " << i << ": " << *set.m_row[i]);
CHK(con.execute(Commit) == 0); lst.push(i);
if (lst.cnt() == par.m_batch) {
deadlock = par.m_deadlock;
CHK(con.execute(Commit, deadlock) == 0);
if (deadlock) {
LL1("pkupdate: stop on deadlock");
break;
}
con.closeTransaction(); con.closeTransaction();
set.lock();
set.notpending(lst);
set.unlock();
lst.reset();
CHK(con.startTransaction() == 0); CHK(con.startTransaction() == 0);
n = 0;
} }
} }
if (n != 0) { if (! deadlock && lst.cnt() != 0) {
CHK(con.execute(Commit) == 0); deadlock = par.m_deadlock;
n = 0; CHK(con.execute(Commit, deadlock) == 0);
if (deadlock) {
LL1("pkupdate: stop on deadlock");
} else {
set.lock();
set.notpending(lst);
set.unlock();
}
} }
con.closeTransaction(); con.closeTransaction();
return 0; return 0;
...@@ -1992,27 +2139,44 @@ pkdelete(Par par) ...@@ -1992,27 +2139,44 @@ pkdelete(Par par)
Set& set = par.set(); Set& set = par.set();
LL3("pkdelete"); LL3("pkdelete");
CHK(con.startTransaction() == 0); CHK(con.startTransaction() == 0);
unsigned n = 0; Lst lst;
bool deadlock = false;
for (unsigned j = 0; j < par.m_rows; j++) { for (unsigned j = 0; j < par.m_rows; j++) {
unsigned i = thrrow(par, j); unsigned i = thrrow(par, j);
set.lock(); set.lock();
if (! set.exist(i)) { if (! set.exist(i) || set.pending(i)) {
set.unlock(); set.unlock();
continue; continue;
} }
LL4("pkdelete " << i << ": " << *set.m_row[i]); CHK(set.delrow(par, i) == 0);
CHKTRY(set.delrow(par, i) == 0, set.unlock());
set.unlock(); set.unlock();
if (++n == par.m_batch) { LL4("pkdelete " << i << ": " << *set.m_row[i]);
CHK(con.execute(Commit) == 0); lst.push(i);
if (lst.cnt() == par.m_batch) {
deadlock = par.m_deadlock;
CHK(con.execute(Commit, deadlock) == 0);
if (deadlock) {
LL1("pkdelete: stop on deadlock");
break;
}
con.closeTransaction(); con.closeTransaction();
set.lock();
set.notpending(lst);
set.unlock();
lst.reset();
CHK(con.startTransaction() == 0); CHK(con.startTransaction() == 0);
n = 0;
} }
} }
if (n != 0) { if (! deadlock && lst.cnt() != 0) {
CHK(con.execute(Commit) == 0); deadlock = par.m_deadlock;
n = 0; CHK(con.execute(Commit, deadlock) == 0);
if (deadlock) {
LL1("pkdelete: stop on deadlock");
} else {
set.lock();
set.notpending(lst);
set.unlock();
}
} }
con.closeTransaction(); con.closeTransaction();
return 0; return 0;
...@@ -2023,14 +2187,18 @@ pkread(Par par) ...@@ -2023,14 +2187,18 @@ pkread(Par par)
{ {
Con& con = par.con(); Con& con = par.con();
const Tab& tab = par.tab(); const Tab& tab = par.tab();
const Set& set = par.set(); Set& set = par.set();
LL3((par.m_verify ? "pkverify " : "pkread ") << tab.m_name); LL3((par.m_verify ? "pkverify " : "pkread ") << tab.m_name);
// expected // expected
const Set& set1 = set; const Set& set1 = set;
Set set2(tab, set.m_rows); Set set2(tab, set.m_rows);
for (unsigned i = 0; i < set.m_rows; i++) { for (unsigned i = 0; i < set.m_rows; i++) {
if (! set.exist(i)) set.lock();
if (! set.exist(i) || set.pending(i)) {
set.unlock();
continue; continue;
}
set.unlock();
CHK(con.startTransaction() == 0); CHK(con.startTransaction() == 0);
CHK(set2.selrow(par, i) == 0); CHK(set2.selrow(par, i) == 0);
CHK(con.execute(Commit) == 0); CHK(con.execute(Commit) == 0);
...@@ -2053,6 +2221,7 @@ pkreadfast(Par par, unsigned count) ...@@ -2053,6 +2221,7 @@ pkreadfast(Par par, unsigned count)
const Set& set = par.set(); const Set& set = par.set();
LL3("pkfast " << tab.m_name); LL3("pkfast " << tab.m_name);
Row keyrow(tab); Row keyrow(tab);
// not batched on purpose
for (unsigned j = 0; j < count; j++) { for (unsigned j = 0; j < count; j++) {
unsigned i = urandom(set.m_rows); unsigned i = urandom(set.m_rows);
assert(set.exist(i)); assert(set.exist(i));
...@@ -2089,7 +2258,7 @@ scanreadtable(Par par) ...@@ -2089,7 +2258,7 @@ scanreadtable(Par par)
CHK(con.executeScan() == 0); CHK(con.executeScan() == 0);
while (1) { while (1) {
int ret; int ret;
CHK((ret = con.nextScanResult()) == 0 || ret == 1); CHK((ret = con.nextScanResult(true)) == 0 || ret == 1);
if (ret == 1) if (ret == 1)
break; break;
unsigned i = (unsigned)-1; unsigned i = (unsigned)-1;
...@@ -2120,7 +2289,7 @@ scanreadtablefast(Par par, unsigned countcheck) ...@@ -2120,7 +2289,7 @@ scanreadtablefast(Par par, unsigned countcheck)
unsigned count = 0; unsigned count = 0;
while (1) { while (1) {
int ret; int ret;
CHK((ret = con.nextScanResult()) == 0 || ret == 1); CHK((ret = con.nextScanResult(true)) == 0 || ret == 1);
if (ret == 1) if (ret == 1)
break; break;
count++; count++;
...@@ -2150,7 +2319,7 @@ scanreadindex(Par par, const ITab& itab, const BSet& bset) ...@@ -2150,7 +2319,7 @@ scanreadindex(Par par, const ITab& itab, const BSet& bset)
CHK(con.executeScan() == 0); CHK(con.executeScan() == 0);
while (1) { while (1) {
int ret; int ret;
CHK((ret = con.nextScanResult()) == 0 || ret == 1); CHK((ret = con.nextScanResult(true)) == 0 || ret == 1);
if (ret == 1) if (ret == 1)
break; break;
unsigned i = (unsigned)-1; unsigned i = (unsigned)-1;
...@@ -2184,7 +2353,7 @@ scanreadindexfast(Par par, const ITab& itab, const BSet& bset, unsigned countche ...@@ -2184,7 +2353,7 @@ scanreadindexfast(Par par, const ITab& itab, const BSet& bset, unsigned countche
unsigned count = 0; unsigned count = 0;
while (1) { while (1) {
int ret; int ret;
CHK((ret = con.nextScanResult()) == 0 || ret == 1); CHK((ret = con.nextScanResult(true)) == 0 || ret == 1);
if (ret == 1) if (ret == 1)
break; break;
count++; count++;
...@@ -2198,7 +2367,7 @@ static int ...@@ -2198,7 +2367,7 @@ static int
scanreadindex(Par par, const ITab& itab) scanreadindex(Par par, const ITab& itab)
{ {
const Tab& tab = par.tab(); const Tab& tab = par.tab();
for (unsigned i = 0; i < par.m_subloop; i++) { for (unsigned i = 0; i < par.m_idxloop; i++) {
BSet bset(tab, itab, par.m_rows); BSet bset(tab, itab, par.m_rows);
bset.calc(par); bset.calc(par);
CHK(scanreadindex(par, itab, bset) == 0); CHK(scanreadindex(par, itab, bset) == 0);
...@@ -2300,29 +2469,63 @@ scanupdatetable(Par par) ...@@ -2300,29 +2469,63 @@ scanupdatetable(Par par)
unsigned count = 0; unsigned count = 0;
// updating trans // updating trans
Con con2; Con con2;
con2.m_ndb = con.m_ndb; con2.connect(con);
CHK(con2.startTransaction() == 0); CHK(con2.startTransaction() == 0);
Lst lst;
bool deadlock = false;
while (1) { while (1) {
int ret; int ret;
CHK((ret = con.nextScanResult()) == 0 || ret == 1); deadlock = par.m_deadlock;
CHK((ret = con.nextScanResult(true, deadlock)) == 0 || ret == 1);
if (ret == 1)
break;
if (deadlock) {
LL1("scanupdatetable: stop on deadlock");
break;
}
do {
unsigned i = (unsigned)-1;
CHK(set2.getkey(par, &i) == 0);
const Row& row = *set.m_row[i];
set.lock();
if (! set.exist(i) || set.pending(i)) {
LL4("scan update " << tab.m_name << ": skip: " << row);
} else {
CHKTRY(set2.putval(i, false) == 0, set.unlock());
CHKTRY(con.updateScanTuple(con2) == 0, set.unlock());
Par par2 = par;
par2.m_con = &con2;
set.calc(par, i);
CHKTRY(set.setrow(par2, i) == 0, set.unlock());
LL4("scan update " << tab.m_name << ": " << row);
lst.push(i);
}
set.unlock();
if (lst.cnt() == par.m_batch) {
CHK(con2.execute(Commit) == 0);
con2.closeTransaction();
set.lock();
set.notpending(lst);
set.unlock();
count += lst.cnt();
lst.reset();
CHK(con2.startTransaction() == 0);
}
CHK((ret = con.nextScanResult(false)) == 0 || ret == 1 || ret == 2);
if (ret == 2 && lst.cnt() != 0) {
CHK(con2.execute(Commit) == 0);
con2.closeTransaction();
set.lock();
set.notpending(lst);
set.unlock();
count += lst.cnt();
lst.reset();
CHK(con2.startTransaction() == 0);
}
} while (ret == 0);
if (ret == 1) if (ret == 1)
break; break;
unsigned i = (unsigned)-1;
CHK(set2.getkey(par, &i) == 0);
LL4("key " << i);
CHK(set2.putval(i, false) == 0);
CHK(con2.takeOverForUpdate(con) == 0);
Par par2 = par;
par2.m_con = &con2;
set.lock();
set.calc(par, i);
LL4("scan update " << tab.m_name << ": " << *set.m_row[i]);
CHKTRY(set.setrow(par2, i) == 0, set.unlock());
set.unlock();
CHK(con2.execute(NoCommit) == 0);
count++;
} }
CHK(con2.execute(Commit) == 0);
con2.closeTransaction(); con2.closeTransaction();
LL3("scan update " << tab.m_name << " rows updated=" << count); LL3("scan update " << tab.m_name << " rows updated=" << count);
con.closeTransaction(); con.closeTransaction();
...@@ -2346,32 +2549,61 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset) ...@@ -2346,32 +2549,61 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
unsigned count = 0; unsigned count = 0;
// updating trans // updating trans
Con con2; Con con2;
con2.m_ndb = con.m_ndb; con2.connect(con);
CHK(con2.startTransaction() == 0); CHK(con2.startTransaction() == 0);
Lst lst;
bool deadlock = false;
while (1) { while (1) {
int ret; int ret;
CHK((ret = con.nextScanResult()) == 0 || ret == 1); deadlock = par.m_deadlock;
CHK((ret = con.nextScanResult(true, deadlock)) == 0 || ret == 1);
if (ret == 1) if (ret == 1)
break; break;
unsigned i = (unsigned)-1; if (deadlock) {
CHK(set2.getkey(par, &i) == 0); LL1("scanupdateindex: stop on deadlock");
LL4("key " << i); break;
CHK(set2.putval(i, par.m_dups) == 0); }
// avoid deadlock for now do {
//if (! isthrrow(par, i)) unsigned i = (unsigned)-1;
//continue; CHK(set2.getkey(par, &i) == 0);
CHK(con2.takeOverForUpdate(con) == 0); const Row& row = *set.m_row[i];
Par par2 = par; set.lock();
par2.m_con = &con2; if (! set.exist(i) || set.pending(i)) {
set.lock(); LL4("scan update " << itab.m_name << ": skip: " << row);
set.calc(par, i); } else {
LL4("scan update " << itab.m_name << ": " << *set.m_row[i]); CHKTRY(set2.putval(i, par.m_dups) == 0, set.unlock());
CHKTRY(set.setrow(par2, i) == 0, set.unlock()); CHKTRY(con.updateScanTuple(con2) == 0, set.unlock());
set.unlock(); Par par2 = par;
CHK(con2.execute(NoCommit) == 0); par2.m_con = &con2;
count++; set.calc(par, i);
CHKTRY(set.setrow(par2, i) == 0, set.unlock());
LL4("scan update " << itab.m_name << ": " << row);
lst.push(i);
}
set.unlock();
if (lst.cnt() == par.m_batch) {
CHK(con2.execute(Commit) == 0);
con2.closeTransaction();
set.lock();
set.notpending(lst);
set.unlock();
count += lst.cnt();
lst.reset();
CHK(con2.startTransaction() == 0);
}
CHK((ret = con.nextScanResult(false)) == 0 || ret == 1 || ret == 2);
if (ret == 2 && lst.cnt() != 0) {
CHK(con2.execute(Commit) == 0);
con2.closeTransaction();
set.lock();
set.notpending(lst);
set.unlock();
count += lst.cnt();
lst.reset();
CHK(con2.startTransaction() == 0);
}
} while (ret == 0);
} }
CHK(con2.execute(Commit) == 0);
con2.closeTransaction(); con2.closeTransaction();
LL3("scan update " << itab.m_name << " rows updated=" << count); LL3("scan update " << itab.m_name << " rows updated=" << count);
con.closeTransaction(); con.closeTransaction();
...@@ -2382,7 +2614,7 @@ static int ...@@ -2382,7 +2614,7 @@ static int
scanupdateindex(Par par, const ITab& itab) scanupdateindex(Par par, const ITab& itab)
{ {
const Tab& tab = par.tab(); const Tab& tab = par.tab();
for (unsigned i = 0; i < par.m_subloop; i++) { for (unsigned i = 0; i < par.m_idxloop; i++) {
BSet bset(tab, itab, par.m_rows); BSet bset(tab, itab, par.m_rows);
bset.calc(par); bset.calc(par);
CHK(scanupdateindex(par, itab, bset) == 0); CHK(scanupdateindex(par, itab, bset) == 0);
...@@ -2413,41 +2645,15 @@ scanupdateall(Par par) ...@@ -2413,41 +2645,15 @@ scanupdateall(Par par)
// medium level routines // medium level routines
static bool
ignoreverifyerror(Par par)
{
Con& con = par.con();
bool b = par.m_threads > 1;
if (b) {
LL1("ignore verify error");
if (con.m_tx != 0)
con.closeTransaction();
return true;
}
return b;
}
static int static int
readverify(Par par) readverify(Par par)
{ {
par.m_verify = true; par.m_verify = true;
CHK(pkread(par) == 0 || ignoreverifyerror(par)); CHK(pkread(par) == 0);
CHK(scanreadall(par) == 0 || ignoreverifyerror(par)); CHK(scanreadall(par) == 0);
return 0; return 0;
} }
static bool
ignoredeadlock(Par par)
{
Con& con = par.con();
if (con.m_errtype == Con::ErrDeadlock) {
LL1("ignore deadlock");
con.closeTransaction();
return true;
}
return false;
}
static int static int
pkupdatescanread(Par par) pkupdatescanread(Par par)
{ {
...@@ -2469,15 +2675,16 @@ static int ...@@ -2469,15 +2675,16 @@ static int
mixedoperations(Par par) mixedoperations(Par par)
{ {
par.m_dups = true; par.m_dups = true;
par.m_deadlock = true;
unsigned sel = urandom(10); unsigned sel = urandom(10);
if (sel < 2) { if (sel < 2) {
CHK(pkdelete(par) == 0 || ignoredeadlock(par)); CHK(pkdelete(par) == 0);
} else if (sel < 4) { } else if (sel < 4) {
CHK(pkupdate(par) == 0 || ignoredeadlock(par)); CHK(pkupdate(par) == 0);
} else if (sel < 6) { } else if (sel < 6) {
CHK(scanupdatetable(par) == 0 || ignoredeadlock(par)); CHK(scanupdatetable(par) == 0);
} else { } else {
CHK(scanupdateindex(par) == 0 || ignoredeadlock(par)); CHK(scanupdateindex(par) == 0);
} }
return 0; return 0;
} }
...@@ -2611,7 +2818,6 @@ Thr::run() ...@@ -2611,7 +2818,6 @@ Thr::run()
break; break;
} }
LL4("start"); LL4("start");
CHK(con.bugger() == 0);
assert(m_state == Start); assert(m_state == Start);
m_ret = (*m_func)(m_par); m_ret = (*m_func)(m_par);
m_state = Stopped; m_state = Stopped;
...@@ -2936,7 +3142,8 @@ static int ...@@ -2936,7 +3142,8 @@ static int
runtest(Par par) runtest(Par par)
{ {
LL1("start"); LL1("start");
srandom(par.m_seed); if (par.m_seed != 0)
srandom(par.m_seed);
Con con; Con con;
CHK(con.connect() == 0); CHK(con.connect() == 0);
par.m_con = &con; par.m_con = &con;
...@@ -2951,6 +3158,8 @@ runtest(Par par) ...@@ -2951,6 +3158,8 @@ runtest(Par par)
} }
for (unsigned l = 0; par.m_loop == 0 || l < par.m_loop; l++) { for (unsigned l = 0; par.m_loop == 0 || l < par.m_loop; l++) {
LL1("loop " << l); LL1("loop " << l);
if (par.m_seed == 0)
srandom(l);
for (unsigned i = 0; i < tcasecount; i++) { for (unsigned i = 0; i < tcasecount; i++) {
const TCase& tcase = tcaselist[i]; const TCase& tcase = tcaselist[i];
if (par.m_case != 0 && strchr(par.m_case, tcase.m_name[0]) == 0) if (par.m_case != 0 && strchr(par.m_case, tcase.m_name[0]) == 0)
...@@ -2992,6 +3201,12 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535) ...@@ -2992,6 +3201,12 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535)
ndbout << "testOIBasic: unknown argument " << arg; ndbout << "testOIBasic: unknown argument " << arg;
goto usage; goto usage;
} }
if (strcmp(arg, "-batch") == 0) {
if (++argv, --argc > 0) {
g_opt.m_batch = atoi(argv[0]);
continue;
}
}
if (strcmp(arg, "-case") == 0) { if (strcmp(arg, "-case") == 0) {
if (++argv, --argc > 0) { if (++argv, --argc > 0) {
g_opt.m_case = strdup(argv[0]); g_opt.m_case = strdup(argv[0]);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment