Commit 2ade8e1a authored by unknown's avatar unknown

ndb: index tests


ndb/test/ndbapi/testOIBasic.cpp:
  test ordering, add hash indexes, fix result verification
mysql-test/ndb/ndb_range_bounds.pl:
  minor
mysql-test/r/ndb_index_ordered.result:
  minor
mysql-test/t/ndb_index_ordered.test:
  minor
parent 9acdec63
#
# test range scan bounds
# give option --all to test all cases
# set MYSQL_HOME to installation top
#
use strict;
......@@ -14,8 +15,9 @@ my $opt_verbose = 0;
GetOptions("all" => \$opt_all, "cnt=i" => \$opt_cnt, "verbose" => \$opt_verbose)
or die "options are: --all --cnt=N --verbose";
my $mysql_top = $ENV{MYSQL_TOP};
my $dsn = "dbi:mysql:database=test;host=localhost;mysql_read_default_file=$mysql_top/.target/var/my.cnf";
my $mysql_home = $ENV{MYSQL_HOME};
defined($mysql_home) or die "no MYSQL_HOME";
my $dsn = "dbi:mysql:database=test;host=localhost;mysql_read_default_file=$mysql_home/var/my.cnf";
my $opts = { RaiseError => 0, PrintError => 0, AutoCommit => 1, };
my $dbh;
......
......@@ -383,6 +383,7 @@ b c
select min(b), max(b) from t1;
min(b) max(b)
1 5000000
drop table t1;
CREATE TABLE test1 (
SubscrID int(11) NOT NULL auto_increment,
UsrID int(11) NOT NULL default '0',
......
......@@ -175,6 +175,8 @@ select b, c from t1 where 1000<=b and b<=100000 and c<'j' order by b, c;
select b, c from t1 where 1000<=b and b<=100000 and c<'j' order by b desc, c desc;
#
select min(b), max(b) from t1;
#
drop table t1;
#
# Bug #6435
......
......@@ -72,7 +72,7 @@ struct Opt {
m_die(0),
m_dups(false),
m_fragtype(NdbDictionary::Object::FragUndefined),
m_subsubloop(4),
m_subsubloop(2),
m_index(0),
m_loop(1),
m_msglock(true),
......@@ -257,6 +257,9 @@ struct Par : public Opt {
bool m_verify;
// deadlock possible
bool m_deadlock;
// ordered range scan
bool m_ordered;
bool m_descending;
// timer location
Par(const Opt& opt) :
Opt(opt),
......@@ -274,7 +277,9 @@ struct Par : public Opt {
m_bdir(0),
m_randomkey(false),
m_verify(false),
m_deadlock(false) {
m_deadlock(false),
m_ordered(false),
m_descending(false) {
}
};
......@@ -444,6 +449,9 @@ struct Chs {
~Chs();
};
static NdbOut&
operator<<(NdbOut& out, const Chs& chs);
Chs::Chs(CHARSET_INFO* cs) :
m_cs(cs)
{
......@@ -455,10 +463,14 @@ Chs::Chs(CHARSET_INFO* cs) :
unsigned i = 0;
unsigned miss1 = 0;
unsigned miss2 = 0;
unsigned miss3 = 0;
unsigned miss4 = 0;
while (i < maxcharcount) {
unsigned char* bytes = m_chr[i].m_bytes;
unsigned char* xbytes = m_chr[i].m_xbytes;
unsigned size = m_cs->mbminlen + urandom(m_cs->mbmaxlen - m_cs->mbminlen + 1);
unsigned& size = m_chr[i].m_size;
bool ok;
size = m_cs->mbminlen + urandom(m_cs->mbmaxlen - m_cs->mbminlen + 1);
assert(m_cs->mbminlen <= size && size <= m_cs->mbmaxlen);
// prefer longer chars
if (size == m_cs->mbminlen && m_cs->mbminlen < m_cs->mbmaxlen && urandom(5) != 0)
......@@ -466,33 +478,57 @@ Chs::Chs(CHARSET_INFO* cs) :
for (unsigned j = 0; j < size; j++) {
bytes[j] = urandom(256);
}
// check wellformed
const char* sbytes = (const char*)bytes;
if ((*cs->cset->well_formed_len)(cs, sbytes, sbytes + size, 1) != size) {
miss1++;
continue;
}
// do not trust well_formed_len currently
// check no proper prefix wellformed
ok = true;
for (unsigned j = 1; j < size; j++) {
if ((*cs->cset->well_formed_len)(cs, sbytes, sbytes + j, 1) == j) {
ok = false;
break;
}
}
if (! ok) {
miss2++;
continue;
}
// normalize
memset(xbytes, 0, sizeof(xbytes));
// currently returns buffer size always
int xlen = (*cs->coll->strnxfrm)(cs, xbytes, m_xmul * size, bytes, size);
// check we got something
bool xok = false;
ok = false;
for (unsigned j = 0; j < xlen; j++) {
if (xbytes[j] != 0) {
xok = true;
ok = true;
break;
}
}
if (! xok) {
miss2++;
if (! ok) {
miss3++;
continue;
}
// check for duplicate (before normalize)
ok = true;
for (unsigned j = 0; j < i; j++) {
const Chr& chr = m_chr[j];
if (chr.m_size == size && memcmp(chr.m_bytes, bytes, size) == 0) {
ok = false;
break;
}
}
if (! ok) {
miss4++;
continue;
}
// occasional duplicate char is ok
m_chr[i].m_size = size;
i++;
}
bool disorder = true;
unsigned bubbels = 0;
unsigned bubbles = 0;
while (disorder) {
disorder = false;
for (unsigned i = 1; i < maxcharcount; i++) {
......@@ -502,11 +538,11 @@ Chs::Chs(CHARSET_INFO* cs) :
m_chr[i] = m_chr[i-1];
m_chr[i-1] = chr;
disorder = true;
bubbels++;
bubbles++;
}
}
}
LL3("inited charset " << cs->name << " miss1=" << miss1 << " miss2=" << miss2 << " bubbels=" << bubbels);
LL3("inited charset " << *this << " miss=" << miss1 << "," << miss2 << "," << miss3 << "," << miss4 << " bubbles=" << bubbles);
}
Chs::~Chs()
......@@ -514,6 +550,14 @@ Chs::~Chs()
delete [] m_chr;
}
static NdbOut&
operator<<(NdbOut& out, const Chs& chs)
{
CHARSET_INFO* cs = chs.m_cs;
out << cs->name << "[" << cs->mbminlen << "-" << cs->mbmaxlen << "," << chs.m_xmul << "]";
return out;
}
static Chs* cslist[maxcsnumber];
static void
......@@ -552,22 +596,26 @@ getcs(Par par)
// Col - table column
struct Col {
enum Type {
Unsigned = NdbDictionary::Column::Unsigned,
Char = NdbDictionary::Column::Char
};
const class Tab& m_tab;
unsigned m_num;
const char* m_name;
bool m_pk;
NdbDictionary::Column::Type m_type;
Type m_type;
unsigned m_length;
unsigned m_bytelength;
bool m_nullable;
const Chs* m_chs;
Col(const class Tab& tab, unsigned num, const char* name, bool pk, NdbDictionary::Column::Type type, unsigned length, bool nullable, const Chs* chs);
Col(const class Tab& tab, unsigned num, const char* name, bool pk, Type type, unsigned length, bool nullable, const Chs* chs);
~Col();
bool equal(const Col& col2) const;
void verify(const void* addr) const;
};
Col::Col(const class Tab& tab, unsigned num, const char* name, bool pk, NdbDictionary::Column::Type type, unsigned length, bool nullable, const Chs* chs) :
Col::Col(const class Tab& tab, unsigned num, const char* name, bool pk, Type type, unsigned length, bool nullable, const Chs* chs) :
m_tab(tab),
m_num(num),
m_name(strcpy(new char [strlen(name) + 1], name)),
......@@ -595,9 +643,9 @@ void
Col::verify(const void* addr) const
{
switch (m_type) {
case NdbDictionary::Column::Unsigned:
case Col::Unsigned:
break;
case NdbDictionary::Column::Char:
case Col::Char:
{
CHARSET_INFO* cs = m_chs->m_cs;
const char* src = (const char*)addr;
......@@ -616,10 +664,10 @@ operator<<(NdbOut& out, const Col& col)
{
out << "col[" << col.m_num << "] " << col.m_name;
switch (col.m_type) {
case NdbDictionary::Column::Unsigned:
case Col::Unsigned:
out << " unsigned";
break;
case NdbDictionary::Column::Char:
case Col::Char:
{
CHARSET_INFO* cs = col.m_chs->m_cs;
out << " char(" << col.m_length << "*" << cs->mbmaxlen << ";" << cs->name << ")";
......@@ -656,25 +704,41 @@ ICol::~ICol()
{
}
static NdbOut&
operator<<(NdbOut& out, const ICol& icol)
{
out << "icol[" << icol.m_num << "] " << icol.m_col;
return out;
}
// ITab - index
struct ITab {
enum Type {
OrderedIndex = NdbDictionary::Index::OrderedIndex,
UniqueHashIndex = NdbDictionary::Index::UniqueHashIndex
};
const class Tab& m_tab;
const char* m_name;
Type m_type;
unsigned m_icols;
const ICol** m_icol;
ITab(const class Tab& tab, const char* name, unsigned icols);
unsigned m_colmask;
ITab(const class Tab& tab, const char* name, Type type, unsigned icols);
~ITab();
void icoladd(unsigned k, const ICol* icolptr);
};
ITab::ITab(const class Tab& tab, const char* name, unsigned icols) :
ITab::ITab(const class Tab& tab, const char* name, Type type, unsigned icols) :
m_tab(tab),
m_name(strcpy(new char [strlen(name) + 1], name)),
m_type(type),
m_icols(icols),
m_icol(new const ICol* [icols + 1])
m_icol(new const ICol* [icols + 1]),
m_colmask(0)
{
for (unsigned i = 0; i <= m_icols; i++)
m_icol[0] = 0;
for (unsigned k = 0; k <= m_icols; k++)
m_icol[k] = 0;
}
ITab::~ITab()
......@@ -685,13 +749,21 @@ ITab::~ITab()
delete [] m_icol;
}
void
ITab::icoladd(unsigned k, const ICol* icolptr)
{
assert(k == icolptr->m_num && k < m_icols && m_icol[k] == 0);
m_icol[k] = icolptr;
m_colmask |= (1 << icolptr->m_col.m_num);
}
static NdbOut&
operator<<(NdbOut& out, const ITab& itab)
{
out << "itab " << itab.m_name << " icols=" << itab.m_icols;
for (unsigned k = 0; k < itab.m_icols; k++) {
out << endl;
out << "icol[" << k << "] " << itab.m_icol[k]->m_col;
const ICol& icol = *itab.m_icol[k];
out << endl << icol;
}
return out;
}
......@@ -706,6 +778,8 @@ struct Tab {
const ITab** m_itab;
// pk must contain an Unsigned column
unsigned m_keycol;
void coladd(unsigned k, Col* colptr);
void itabadd(unsigned j, ITab* itab);
Tab(const char* name, unsigned cols, unsigned itabs, unsigned keycol);
~Tab();
};
......@@ -718,10 +792,10 @@ Tab::Tab(const char* name, unsigned cols, unsigned itabs, unsigned keycol) :
m_itab(new const ITab* [itabs + 1]),
m_keycol(keycol)
{
for (unsigned i = 0; i <= cols; i++)
m_col[i] = 0;
for (unsigned i = 0; i <= itabs; i++)
m_itab[i] = 0;
for (unsigned k = 0; k <= cols; k++)
m_col[k] = 0;
for (unsigned j = 0; j <= itabs; j++)
m_itab[j] = 0;
}
Tab::~Tab()
......@@ -735,19 +809,33 @@ Tab::~Tab()
delete [] m_itab;
}
void
Tab::coladd(unsigned k, Col* colptr)
{
assert(k == colptr->m_num && k < m_cols && m_col[k] == 0);
m_col[k] = colptr;
}
void
Tab::itabadd(unsigned j, ITab* itabptr)
{
assert(j < m_itabs && m_itab[j] == 0);
m_itab[j] = itabptr;
}
static NdbOut&
operator<<(NdbOut& out, const Tab& tab)
{
out << "tab " << tab.m_name << " cols=" << tab.m_cols;
for (unsigned k = 0; k < tab.m_cols; k++) {
out << endl;
out << *tab.m_col[k];
const Col& col = *tab.m_col[k];
out << endl << col;
}
for (unsigned i = 0; i < tab.m_itabs; i++) {
if (tab.m_itab[i] == 0)
continue;
out << endl;
out << *tab.m_itab[i];
const ITab& itab = *tab.m_itab[i];
out << endl << itab;
}
return out;
}
......@@ -774,7 +862,7 @@ verifytables()
{
assert(t->m_keycol < t->m_cols);
const Col* c = t->m_col[t->m_keycol];
assert(c->m_pk && c->m_type == NdbDictionary::Column::Unsigned);
assert(c->m_pk && c->m_type == Col::Unsigned);
}
assert(t->m_itabs != 0 && t->m_itab != 0);
for (unsigned i = 0; i < t->m_itabs; i++) {
......@@ -785,6 +873,9 @@ verifytables()
for (unsigned k = 0; k < x->m_icols; k++) {
const ICol* c = x->m_icol[k];
assert(c != 0 && c->m_num == k && c->m_col.m_num < t->m_cols);
if (x->m_type == ITab::UniqueHashIndex) {
assert(! c->m_col.m_nullable);
}
}
}
assert(t->m_itab[t->m_itabs] == 0);
......@@ -810,127 +901,186 @@ makebuiltintables(Par par)
}
// ti0 - basic
if (usetable(par, 0)) {
const Tab* t = new Tab("ti0", 5, 5, 0);
Tab* t = new Tab("ti0", 5, 7, 0);
// name - pk - type - length - nullable - cs
t->m_col[0] = new Col(*t, 0, "a", 1, NdbDictionary::Column::Unsigned, 1, 0, 0);
t->m_col[1] = new Col(*t, 1, "b", 0, NdbDictionary::Column::Unsigned, 1, 1, 0);
t->m_col[2] = new Col(*t, 2, "c", 0, NdbDictionary::Column::Unsigned, 1, 1, 0);
t->m_col[3] = new Col(*t, 3, "d", 0, NdbDictionary::Column::Unsigned, 1, 1, 0);
t->m_col[4] = new Col(*t, 4, "e", 0, NdbDictionary::Column::Unsigned, 1, 1, 0);
t->coladd(0, new Col(*t, 0, "a", 1, Col::Unsigned, 1, 0, 0));
t->coladd(1, new Col(*t, 1, "b", 0, Col::Unsigned, 1, 1, 0));
t->coladd(2, new Col(*t, 2, "c", 0, Col::Unsigned, 1, 0, 0));
t->coladd(3, new Col(*t, 3, "d", 0, Col::Unsigned, 1, 1, 0));
t->coladd(4, new Col(*t, 4, "e", 0, Col::Unsigned, 1, 0, 0));
if (useindex(par, 0)) {
// a
const ITab* x = t->m_itab[0] = new ITab(*t, "ti0x0", 1);
x->m_icol[0] = new ICol(*x, 0, *t->m_col[0]);
ITab* x = new ITab(*t, "ti0x0", ITab::OrderedIndex, 1);
x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
t->itabadd(0, x);
}
if (useindex(par, 1)) {
// b
const ITab* x = t->m_itab[1] = new ITab(*t, "ti0x1", 1);
x->m_icol[0] = new ICol(*x, 0, *t->m_col[1]);
ITab* x = new ITab(*t, "ti0x1", ITab::OrderedIndex, 1);
x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
t->itabadd(1, x);
}
if (useindex(par, 2)) {
// b, c
const ITab* x = t->m_itab[2] = new ITab(*t, "ti0x2", 2);
x->m_icol[0] = new ICol(*x, 0, *t->m_col[1]);
x->m_icol[1] = new ICol(*x, 1, *t->m_col[2]);
ITab* x = new ITab(*t, "ti0x2", ITab::OrderedIndex, 2);
x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
t->itabadd(2, x);
}
if (useindex(par, 3)) {
// d, c, b
const ITab* x = t->m_itab[3] = new ITab(*t, "ti0x3", 3);
x->m_icol[0] = new ICol(*x, 0, *t->m_col[3]);
x->m_icol[1] = new ICol(*x, 1, *t->m_col[2]);
x->m_icol[2] = new ICol(*x, 2, *t->m_col[1]);
ITab* x = new ITab(*t, "ti0x3", ITab::OrderedIndex, 3);
x->icoladd(0, new ICol(*x, 0, *t->m_col[3]));
x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
x->icoladd(2, new ICol(*x, 2, *t->m_col[1]));
t->itabadd(3, x);
}
if (useindex(par, 4)) {
// b, e, c, d
const ITab* x = t->m_itab[4] = new ITab(*t, "ti0x4", 4);
x->m_icol[0] = new ICol(*x, 0, *t->m_col[1]);
x->m_icol[1] = new ICol(*x, 1, *t->m_col[4]);
x->m_icol[2] = new ICol(*x, 2, *t->m_col[2]);
x->m_icol[3] = new ICol(*x, 3, *t->m_col[3]);
ITab* x = new ITab(*t, "ti0x4", ITab::OrderedIndex, 4);
x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
x->icoladd(1, new ICol(*x, 1, *t->m_col[4]));
x->icoladd(2, new ICol(*x, 2, *t->m_col[2]));
x->icoladd(3, new ICol(*x, 3, *t->m_col[3]));
t->itabadd(4, x);
}
if (useindex(par, 5)) {
// a, c
ITab* x = new ITab(*t, "ti0z5", ITab::UniqueHashIndex, 2);
x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
t->itabadd(5, x);
}
if (useindex(par, 6)) {
// a, e
ITab* x = new ITab(*t, "ti0z6", ITab::UniqueHashIndex, 2);
x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
x->icoladd(1, new ICol(*x, 1, *t->m_col[4]));
t->itabadd(6, x);
}
tablist[0] = t;
}
// ti1 - simple char fields
if (usetable(par, 1)) {
const Tab* t = new Tab("ti1", 5, 5, 1);
Tab* t = new Tab("ti1", 5, 7, 1);
// name - pk - type - length - nullable - cs
t->m_col[0] = new Col(*t, 0, "a", 0, NdbDictionary::Column::Unsigned, 1, 1, 0);
t->m_col[1] = new Col(*t, 1, "b", 1, NdbDictionary::Column::Unsigned, 1, 0, 0);
t->m_col[2] = new Col(*t, 2, "c", 0, NdbDictionary::Column::Char, 20, 1, getcs(par));
t->m_col[3] = new Col(*t, 3, "d", 0, NdbDictionary::Column::Char, 5, 1, getcs(par));
t->m_col[4] = new Col(*t, 4, "e", 0, NdbDictionary::Column::Char, 5, 1, getcs(par));
t->coladd(0, new Col(*t, 0, "a", 0, Col::Unsigned, 1, 0, 0));
t->coladd(1, new Col(*t, 1, "b", 1, Col::Unsigned, 1, 0, 0));
t->coladd(2, new Col(*t, 2, "c", 0, Col::Char, 20, 1, getcs(par)));
t->coladd(3, new Col(*t, 3, "d", 0, Col::Char, 5, 0, getcs(par)));
t->coladd(4, new Col(*t, 4, "e", 0, Col::Char, 5, 1, getcs(par)));
if (useindex(par, 0)) {
// b
const ITab* x = t->m_itab[0] = new ITab(*t, "ti1x0", 1);
x->m_icol[0] = new ICol(*x, 0, *t->m_col[1]);
ITab* x = new ITab(*t, "ti1x0", ITab::OrderedIndex, 1);
x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
}
if (useindex(par, 1)) {
// a, c
const ITab* x = t->m_itab[1] = new ITab(*t, "ti1x1", 2);
x->m_icol[0] = new ICol(*x, 0, *t->m_col[0]);
x->m_icol[1] = new ICol(*x, 1, *t->m_col[2]);
ITab* x = new ITab(*t, "ti1x1", ITab::OrderedIndex, 2);
x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
t->itabadd(1, x);
}
if (useindex(par, 2)) {
// c, a
const ITab* x = t->m_itab[2] = new ITab(*t, "ti1x2", 2);
x->m_icol[0] = new ICol(*x, 0, *t->m_col[2]);
x->m_icol[1] = new ICol(*x, 1, *t->m_col[0]);
ITab* x = new ITab(*t, "ti1x2", ITab::OrderedIndex, 2);
x->icoladd(0, new ICol(*x, 0, *t->m_col[2]));
x->icoladd(1, new ICol(*x, 1, *t->m_col[0]));
t->itabadd(2, x);
}
if (useindex(par, 3)) {
// e
const ITab* x = t->m_itab[3] = new ITab(*t, "ti1x3", 1);
x->m_icol[0] = new ICol(*x, 0, *t->m_col[4]);
ITab* x = new ITab(*t, "ti1x3", ITab::OrderedIndex, 1);
x->icoladd(0, new ICol(*x, 0, *t->m_col[4]));
t->itabadd(3, x);
}
if (useindex(par, 4)) {
// e, d, c, b
const ITab* x = t->m_itab[4] = new ITab(*t, "ti1x4", 4);
x->m_icol[0] = new ICol(*x, 0, *t->m_col[4]);
x->m_icol[1] = new ICol(*x, 1, *t->m_col[3]);
x->m_icol[2] = new ICol(*x, 2, *t->m_col[2]);
x->m_icol[3] = new ICol(*x, 3, *t->m_col[1]);
ITab* x = new ITab(*t, "ti1x4", ITab::OrderedIndex, 4);
x->icoladd(0, new ICol(*x, 0, *t->m_col[4]));
x->icoladd(1, new ICol(*x, 1, *t->m_col[3]));
x->icoladd(2, new ICol(*x, 2, *t->m_col[2]));
x->icoladd(3, new ICol(*x, 3, *t->m_col[1]));
t->itabadd(4, x);
}
if (useindex(par, 5)) {
// a, b
ITab* x = new ITab(*t, "ti1z5", ITab::UniqueHashIndex, 2);
x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
x->icoladd(1, new ICol(*x, 1, *t->m_col[1]));
t->itabadd(5, x);
}
if (useindex(par, 6)) {
// a, b, d
ITab* x = new ITab(*t, "ti1z6", ITab::UniqueHashIndex, 3);
x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
x->icoladd(1, new ICol(*x, 1, *t->m_col[1]));
x->icoladd(2, new ICol(*x, 2, *t->m_col[3]));
t->itabadd(6, x);
}
tablist[1] = t;
}
// ti2 - complex char fields
if (usetable(par, 2)) {
const Tab* t = new Tab("ti2", 5, 5, 2);
Tab* t = new Tab("ti2", 5, 7, 2);
// name - pk - type - length - nullable - cs
t->m_col[0] = new Col(*t, 0, "a", 1, NdbDictionary::Column::Char, 101, 0, getcs(par));
t->m_col[1] = new Col(*t, 1, "b", 0, NdbDictionary::Column::Char, 4, 1, getcs(par));
t->m_col[2] = new Col(*t, 2, "c", 1, NdbDictionary::Column::Unsigned, 1, 0, 0);
t->m_col[3] = new Col(*t, 3, "d", 1, NdbDictionary::Column::Char, 3, 0, getcs(par));
t->m_col[4] = new Col(*t, 4, "e", 0, NdbDictionary::Column::Char, 101, 0, getcs(par));
t->coladd(0, new Col(*t, 0, "a", 1, Col::Char, 31, 0, getcs(par)));
t->coladd(1, new Col(*t, 1, "b", 0, Col::Char, 4, 1, getcs(par)));
t->coladd(2, new Col(*t, 2, "c", 1, Col::Unsigned, 1, 0, 0));
t->coladd(3, new Col(*t, 3, "d", 1, Col::Char, 3, 0, getcs(par)));
t->coladd(4, new Col(*t, 4, "e", 0, Col::Char, 17, 0, getcs(par)));
if (useindex(par, 0)) {
// a, c, d
const ITab* x = t->m_itab[0] = new ITab(*t, "ti2x0", 3);
x->m_icol[0] = new ICol(*x, 0, *t->m_col[0]);
x->m_icol[1] = new ICol(*x, 1, *t->m_col[2]);
x->m_icol[2] = new ICol(*x, 2, *t->m_col[3]);
ITab* x = new ITab(*t, "ti2x0", ITab::OrderedIndex, 3);
x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
x->icoladd(2, new ICol(*x, 2, *t->m_col[3]));
t->itabadd(0, x);
}
if (useindex(par, 1)) {
// e, d, c, b, a
const ITab* x = t->m_itab[1] = new ITab(*t, "ti2x1", 5);
x->m_icol[0] = new ICol(*x, 0, *t->m_col[4]);
x->m_icol[1] = new ICol(*x, 1, *t->m_col[3]);
x->m_icol[2] = new ICol(*x, 2, *t->m_col[2]);
x->m_icol[3] = new ICol(*x, 3, *t->m_col[1]);
x->m_icol[4] = new ICol(*x, 4, *t->m_col[0]);
ITab* x = new ITab(*t, "ti2x1", ITab::OrderedIndex, 5);
x->icoladd(0, new ICol(*x, 0, *t->m_col[4]));
x->icoladd(1, new ICol(*x, 1, *t->m_col[3]));
x->icoladd(2, new ICol(*x, 2, *t->m_col[2]));
x->icoladd(3, new ICol(*x, 3, *t->m_col[1]));
x->icoladd(4, new ICol(*x, 4, *t->m_col[0]));
t->itabadd(1, x);
}
if (useindex(par, 2)) {
// d
const ITab* x = t->m_itab[2] = new ITab(*t, "ti2x2", 1);
x->m_icol[0] = new ICol(*x, 0, *t->m_col[3]);
ITab* x = new ITab(*t, "ti2x2", ITab::OrderedIndex, 1);
x->icoladd(0, new ICol(*x, 0, *t->m_col[3]));
t->itabadd(2, x);
}
if (useindex(par, 3)) {
// b
const ITab* x = t->m_itab[3] = new ITab(*t, "ti2x3", 1);
x->m_icol[0] = new ICol(*x, 0, *t->m_col[1]);
ITab* x = new ITab(*t, "ti2x3", ITab::OrderedIndex, 1);
x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
t->itabadd(3, x);
}
if (useindex(par, 4)) {
// a, e
const ITab* x = t->m_itab[4] = new ITab(*t, "ti2x4", 2);
x->m_icol[0] = new ICol(*x, 0, *t->m_col[0]);
x->m_icol[1] = new ICol(*x, 1, *t->m_col[4]);
ITab* x = new ITab(*t, "ti2x4", ITab::OrderedIndex, 2);
x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
x->icoladd(1, new ICol(*x, 1, *t->m_col[4]));
t->itabadd(4, x);
}
if (useindex(par, 5)) {
// a, c
ITab* x = new ITab(*t, "ti2z5", ITab::UniqueHashIndex, 2);
x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
t->itabadd(5, x);
}
if (useindex(par, 6)) {
// a, c, d, e
ITab* x = new ITab(*t, "ti2z6", ITab::UniqueHashIndex, 4);
x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
x->icoladd(2, new ICol(*x, 2, *t->m_col[3]));
x->icoladd(3, new ICol(*x, 3, *t->m_col[4]));
t->itabadd(6, x);
}
tablist[2] = t;
}
......@@ -944,6 +1094,7 @@ struct Con {
NdbDictionary::Dictionary* m_dic;
NdbConnection* m_tx;
NdbOperation* m_op;
NdbIndexOperation* m_indexop;
NdbScanOperation* m_scanop;
NdbIndexScanOperation* m_indexscanop;
enum ScanMode { ScanNo = 0, Committed, Latest, Exclusive };
......@@ -951,7 +1102,7 @@ struct Con {
enum ErrType { ErrNone = 0, ErrDeadlock, ErrOther };
ErrType m_errtype;
Con() :
m_ndb(0), m_dic(0), m_tx(0), m_op(0),
m_ndb(0), m_dic(0), m_tx(0), m_op(0), m_indexop(0),
m_scanop(0), m_indexscanop(0), m_scanmode(ScanNo), m_errtype(ErrNone) {}
~Con() {
if (m_tx != 0)
......@@ -962,6 +1113,7 @@ struct Con {
void disconnect();
int startTransaction();
int getNdbOperation(const Tab& tab);
int getNdbIndexOperation(const ITab& itab, const Tab& tab);
int getNdbScanOperation(const Tab& tab);
int getNdbScanOperation(const ITab& itab, const Tab& tab);
int equal(int num, const char* addr);
......@@ -972,6 +1124,8 @@ struct Con {
int execute(ExecType t, bool& deadlock);
int openScanRead(unsigned scanbat, unsigned scanpar);
int openScanExclusive(unsigned scanbat, unsigned scanpar);
int openScanOrdered(unsigned scanbat, unsigned scanpar, bool descending);
int openScanOrderedExclusive(unsigned scanbat, unsigned scanpar, bool descending);
int executeScan();
int nextScanResult(bool fetchAllowed);
int nextScanResult(bool fetchAllowed, bool& deadlock);
......@@ -1025,6 +1179,14 @@ Con::getNdbOperation(const Tab& tab)
return 0;
}
int
Con::getNdbIndexOperation(const ITab& itab, const Tab& tab)
{
assert(m_tx != 0);
CHKCON((m_op = m_indexop = m_tx->getNdbIndexOperation(itab.m_name, tab.m_name)) != 0, *this);
return 0;
}
int
Con::getNdbScanOperation(const Tab& tab)
{
......@@ -1115,6 +1277,25 @@ Con::openScanExclusive(unsigned scanbat, unsigned scanpar)
return 0;
}
int
Con::openScanOrdered(unsigned scanbat, unsigned scanpar, bool descending)
{
assert(m_tx != 0 && m_indexscanop != 0);
NdbOperation::LockMode lm = NdbOperation::LM_Read;
CHKCON(m_indexscanop->readTuples(lm, scanbat, scanpar, true, descending) == 0, *this);
return 0;
}
int
Con::openScanOrderedExclusive(unsigned scanbat, unsigned scanpar, bool descending)
{
assert(m_tx != 0 && m_indexscanop != 0);
NdbOperation::LockMode lm = NdbOperation::LM_Exclusive;
CHKCON(m_indexscanop->readTuples(lm, scanbat, scanpar, true, descending) == 0, *this);
return 0;
}
int
Con::executeScan()
{
......@@ -1202,6 +1383,7 @@ Con::printerror(NdbOut& out)
if ((code = m_tx->getNdbError().code) != 0) {
LL0(++any << " con: error " << m_tx->getNdbError());
die += (code == g_opt.m_die);
// 631 is new, occurs only on 4 db nodes, needs to be checked out
if (code == 266 || code == 274 || code == 296 || code == 297 || code == 499 || code == 631)
m_errtype = ErrDeadlock;
}
......@@ -1290,7 +1472,7 @@ createtable(Par par)
for (unsigned k = 0; k < tab.m_cols; k++) {
const Col& col = *tab.m_col[k];
NdbDictionary::Column c(col.m_name);
c.setType(col.m_type);
c.setType((NdbDictionary::Column::Type)col.m_type);
c.setLength(col.m_bytelength); // NDB API uses length in bytes
c.setPrimaryKey(col.m_pk);
c.setNullable(col.m_nullable);
......@@ -1343,8 +1525,10 @@ createindex(Par par, const ITab& itab)
LL4(itab);
NdbDictionary::Index x(itab.m_name);
x.setTable(tab.m_name);
x.setType(NdbDictionary::Index::OrderedIndex);
x.setLogging(false);
x.setType((NdbDictionary::Index::Type)itab.m_type);
if (par.m_nologging || itab.m_type == ITab::OrderedIndex) {
x.setLogging(false);
}
for (unsigned k = 0; k < itab.m_icols; k++) {
const ICol& icol = *itab.m_icol[k];
const Col& col = icol.m_col;
......@@ -1385,6 +1569,8 @@ struct Val {
void copy(const void* addr);
const void* dataaddr() const;
bool m_null;
int equal(Par par) const;
int equal(Par par, const ICol& icol) const;
int setval(Par par) const;
void calc(Par par, unsigned i);
void calckey(Par par, unsigned i);
......@@ -1402,9 +1588,9 @@ Val::Val(const Col& col) :
m_col(col)
{
switch (col.m_type) {
case NdbDictionary::Column::Unsigned:
case Col::Unsigned:
break;
case NdbDictionary::Column::Char:
case Col::Char:
m_char = new unsigned char [col.m_bytelength];
break;
default:
......@@ -1417,9 +1603,9 @@ Val::~Val()
{
const Col& col = m_col;
switch (col.m_type) {
case NdbDictionary::Column::Unsigned:
case Col::Unsigned:
break;
case NdbDictionary::Column::Char:
case Col::Char:
delete [] m_char;
break;
default:
......@@ -1446,10 +1632,10 @@ Val::copy(const void* addr)
{
const Col& col = m_col;
switch (col.m_type) {
case NdbDictionary::Column::Unsigned:
case Col::Unsigned:
m_uint32 = *(const Uint32*)addr;
break;
case NdbDictionary::Column::Char:
case Col::Char:
memcpy(m_char, addr, col.m_bytelength);
break;
default:
......@@ -1464,9 +1650,9 @@ Val::dataaddr() const
{
const Col& col = m_col;
switch (col.m_type) {
case NdbDictionary::Column::Unsigned:
case Col::Unsigned:
return &m_uint32;
case NdbDictionary::Column::Char:
case Col::Char:
return m_char;
default:
break;
......@@ -1476,18 +1662,37 @@ Val::dataaddr() const
}
int
Val::setval(Par par) const
Val::equal(Par par) const
{
Con& con = par.con();
const Col& col = m_col;
assert(col.m_pk && ! m_null);
const char* addr = (const char*)dataaddr();
if (m_null)
addr = 0;
LL5("setval [" << m_col << "] " << *this);
if (col.m_pk)
CHK(con.equal(col.m_num, addr) == 0);
else
CHK(con.setValue(col.m_num, addr) == 0);
LL5("equal [" << col << "] " << *this);
CHK(con.equal(col.m_num, addr) == 0);
return 0;
}
int
Val::equal(Par par, const ICol& icol) const
{
Con& con = par.con();
assert(! m_null);
const char* addr = (const char*)dataaddr();
LL5("equal [" << icol << "] " << *this);
CHK(con.equal(icol.m_num, addr) == 0);
return 0;
}
int
Val::setval(Par par) const
{
Con& con = par.con();
const Col& col = m_col;
assert(! col.m_pk);
const char* addr = ! m_null ? (const char*)dataaddr() : 0;
LL5("setval [" << col << "] " << *this);
CHK(con.setValue(col.m_num, addr) == 0);
return 0;
}
......@@ -1506,10 +1711,10 @@ Val::calckey(Par par, unsigned i)
const Col& col = m_col;
m_null = false;
switch (col.m_type) {
case NdbDictionary::Column::Unsigned:
case Col::Unsigned:
m_uint32 = i;
break;
case NdbDictionary::Column::Char:
case Col::Char:
{
const Chs* chs = col.m_chs;
CHARSET_INFO* cs = chs->m_cs;
......@@ -1549,10 +1754,10 @@ Val::calcnokey(Par par)
}
unsigned v = par.m_range + r;
switch (col.m_type) {
case NdbDictionary::Column::Unsigned:
case Col::Unsigned:
m_uint32 = v;
break;
case NdbDictionary::Column::Char:
case Col::Char:
{
const Chs* chs = col.m_chs;
CHARSET_INFO* cs = chs->m_cs;
......@@ -1609,7 +1814,7 @@ Val::cmp(const Val& val2) const
col.verify(val2.dataaddr());
// compare
switch (col.m_type) {
case NdbDictionary::Column::Unsigned:
case Col::Unsigned:
{
if (m_uint32 < val2.m_uint32)
return -1;
......@@ -1618,7 +1823,7 @@ Val::cmp(const Val& val2) const
return 0;
}
break;
case NdbDictionary::Column::Char:
case Col::Char:
{
const Chs* chs = col.m_chs;
CHARSET_INFO* cs = chs->m_cs;
......@@ -1657,10 +1862,10 @@ operator<<(NdbOut& out, const Val& val)
return out;
}
switch (col.m_type) {
case NdbDictionary::Column::Unsigned:
case Col::Unsigned:
out << val.m_uint32;
break;
case NdbDictionary::Column::Char:
case Col::Char:
{
char buf[4 * 8000];
char *p = buf;
......@@ -1697,19 +1902,25 @@ struct Row {
const Tab& m_tab;
Val** m_val;
bool m_exist;
enum Op { NoOp = 0, ReadOp, InsOp, UpdOp, DelOp };
enum Op { NoOp = 0, ReadOp = 1, InsOp = 2, UpdOp = 4, DelOp = 8, AnyOp = 15 };
Op m_pending;
Row* m_dbrow; // copy of db row before update
Row(const Tab& tab);
~Row();
void copy(const Row& row2);
void calc(Par par, unsigned i);
void calc(Par par, unsigned i, unsigned mask = 0);
const Row& dbrow() const;
int verify(const Row& row2) const;
int insrow(Par par);
int updrow(Par par);
int updrow(Par par, const ITab& itab);
int delrow(Par par);
int delrow(Par par, const ITab& itab);
int selrow(Par par);
int selrow(Par par, const ITab& itab);
int setrow(Par par);
int cmp(const Row& row2) const;
int cmp(const Row& row2, const ITab& itab) const;
private:
Row& operator=(const Row& row2);
};
......@@ -1724,6 +1935,7 @@ Row::Row(const Tab& tab) :
}
m_exist = false;
m_pending = NoOp;
m_dbrow = 0;
}
Row::~Row()
......@@ -1733,6 +1945,7 @@ Row::~Row()
delete m_val[k];
}
delete [] m_val;
delete m_dbrow;
}
void
......@@ -1745,27 +1958,49 @@ Row::copy(const Row& row2)
const Val& val2 = *row2.m_val[k];
val.copy(val2);
}
m_exist = row2.m_exist;
m_pending = row2.m_pending;
if (row2.m_dbrow == 0) {
m_dbrow = 0;
} else {
assert(row2.m_dbrow->m_dbrow == 0);
if (m_dbrow == 0)
m_dbrow = new Row(tab);
m_dbrow->copy(*row2.m_dbrow);
}
}
void
Row::calc(Par par, unsigned i)
Row::calc(Par par, unsigned i, unsigned mask)
{
const Tab& tab = m_tab;
for (unsigned k = 0; k < tab.m_cols; k++) {
Val& val = *m_val[k];
val.calc(par, i);
if (! (mask & (1 << k))) {
Val& val = *m_val[k];
val.calc(par, i);
}
}
}
const Row&
Row::dbrow() const
{
if (m_dbrow == 0)
return *this;
assert(m_pending == Row::UpdOp || m_pending == Row::DelOp);
return *m_dbrow;
}
int
Row::verify(const Row& row2) const
{
const Tab& tab = m_tab;
assert(&tab == &row2.m_tab && m_exist && row2.m_exist);
const Row& row1 = *this;
assert(&row1.m_tab == &row2.m_tab && row1.m_exist && row2.m_exist);
for (unsigned k = 0; k < tab.m_cols; k++) {
const Val& val = *m_val[k];
const Val& val1 = *row1.m_val[k];
const Val& val2 = *row2.m_val[k];
CHK(val.verify(val2) == 0);
CHK(val1.verify(val2) == 0);
}
return 0;
}
......@@ -1780,7 +2015,15 @@ Row::insrow(Par par)
CHKCON(con.m_op->insertTuple() == 0, con);
for (unsigned k = 0; k < tab.m_cols; k++) {
const Val& val = *m_val[k];
CHK(val.setval(par) == 0);
const Col& col = val.m_col;
if (col.m_pk)
CHK(val.equal(par) == 0);
}
for (unsigned k = 0; k < tab.m_cols; k++) {
const Val& val = *m_val[k];
const Col& col = val.m_col;
if (! col.m_pk)
CHK(val.setval(par) == 0);
}
m_pending = InsOp;
return 0;
......@@ -1794,19 +2037,43 @@ Row::updrow(Par par)
assert(m_exist);
CHK(con.getNdbOperation(tab) == 0);
CHKCON(con.m_op->updateTuple() == 0, con);
for (unsigned k = 0; k < tab.m_cols; k++) {
const Val& val = *m_val[k];
const Col& col = val.m_col;
if (col.m_pk)
CHK(val.equal(par) == 0);
}
for (unsigned k = 0; k < tab.m_cols; k++) {
const Val& val = *m_val[k];
const Col& col = val.m_col;
if (! col.m_pk)
continue;
CHK(val.setval(par) == 0);
CHK(val.setval(par) == 0);
}
m_pending = UpdOp;
return 0;
}
int
Row::updrow(Par par, const ITab& itab)
{
Con& con = par.con();
const Tab& tab = m_tab;
assert(itab.m_type == ITab::UniqueHashIndex && &itab.m_tab == &tab);
assert(m_exist);
CHK(con.getNdbIndexOperation(itab, tab) == 0);
CHKCON(con.m_op->updateTuple() == 0, con);
for (unsigned k = 0; k < itab.m_icols; k++) {
const ICol& icol = *itab.m_icol[k];
const Col& col = icol.m_col;
unsigned m = col.m_num;
const Val& val = *m_val[m];
CHK(val.equal(par, icol) == 0);
}
for (unsigned k = 0; k < tab.m_cols; k++) {
const Val& val = *m_val[k];
const Col& col = val.m_col;
if (col.m_pk)
continue;
CHK(val.setval(par) == 0);
if (! col.m_pk)
CHK(val.setval(par) == 0);
}
m_pending = UpdOp;
return 0;
......@@ -1824,7 +2091,27 @@ Row::delrow(Par par)
const Val& val = *m_val[k];
const Col& col = val.m_col;
if (col.m_pk)
CHK(val.setval(par) == 0);
CHK(val.equal(par) == 0);
}
m_pending = DelOp;
return 0;
}
int
Row::delrow(Par par, const ITab& itab)
{
Con& con = par.con();
const Tab& tab = m_tab;
assert(itab.m_type == ITab::UniqueHashIndex && &itab.m_tab == &tab);
assert(m_exist);
CHK(con.getNdbIndexOperation(itab, tab) == 0);
CHKCON(con.m_op->deleteTuple() == 0, con);
for (unsigned k = 0; k < itab.m_icols; k++) {
const ICol& icol = *itab.m_icol[k];
const Col& col = icol.m_col;
unsigned m = col.m_num;
const Val& val = *m_val[m];
CHK(val.equal(par, icol) == 0);
}
m_pending = DelOp;
return 0;
......@@ -1841,7 +2128,25 @@ Row::selrow(Par par)
const Val& val = *m_val[k];
const Col& col = val.m_col;
if (col.m_pk)
CHK(val.setval(par) == 0);
CHK(val.equal(par) == 0);
}
return 0;
}
int
Row::selrow(Par par, const ITab& itab)
{
Con& con = par.con();
const Tab& tab = m_tab;
assert(itab.m_type == ITab::UniqueHashIndex && &itab.m_tab == &tab);
CHK(con.getNdbIndexOperation(itab, tab) == 0);
CHKCON(con.m_op->readTuple() == 0, con);
for (unsigned k = 0; k < itab.m_icols; k++) {
const ICol& icol = *itab.m_icol[k];
const Col& col = icol.m_col;
unsigned m = col.m_num;
const Val& val = *m_val[m];
CHK(val.equal(par, icol) == 0);
}
return 0;
}
......@@ -1876,6 +2181,40 @@ Row::cmp(const Row& row2) const
return c;
}
int
Row::cmp(const Row& row2, const ITab& itab) const
{
const Tab& tab = m_tab;
int c = 0;
for (unsigned i = 0; i < itab.m_icols; i++) {
const ICol& icol = *itab.m_icol[i];
const Col& col = icol.m_col;
unsigned k = col.m_num;
assert(k < tab.m_cols);
const Val& val = *m_val[k];
const Val& val2 = *row2.m_val[k];
if ((c = val.cmp(val2)) != 0)
break;
}
return c;
}
static NdbOut&
operator<<(NdbOut& out, const Row::Op op)
{
if (op == Row::NoOp)
out << "NoOp";
else if (op == Row::InsOp)
out << "InsOp";
else if (op == Row::UpdOp)
out << "UpdOp";
else if (op == Row::DelOp)
out << "DelOp";
else
out << op;
return out;
}
static NdbOut&
operator<<(NdbOut& out, const Row& row)
{
......@@ -1885,10 +2224,21 @@ operator<<(NdbOut& out, const Row& row)
out << " ";
out << *row.m_val[i];
}
out << " [exist=" << row.m_exist;
out << " exist=" << row.m_exist;
if (row.m_pending)
out << " pending=" << row.m_pending;
out << "]";
if (row.m_dbrow != 0)
out << " [dbrow=" << *row.m_dbrow << "]";
return out;
}
static NdbOut&
operator<<(NdbOut& out, const Row* rowptr)
{
if (rowptr == 0)
out << "null";
else
out << *rowptr;
return out;
}
......@@ -1898,38 +2248,47 @@ struct Set {
const Tab& m_tab;
unsigned m_rows;
Row** m_row;
Row** m_saverow;
unsigned* m_rowkey; // maps row number (from 0) in scan to tuple key
Row* m_keyrow;
NdbRecAttr** m_rec;
Set(const Tab& tab, unsigned rows);
~Set();
void reset();
unsigned count() const;
// row methods
// old and new values
bool exist(unsigned i) const;
Row::Op pending(unsigned i) const;
void dbsave(unsigned i);
void calc(Par par, unsigned i, unsigned mask = 0);
bool pending(unsigned i, unsigned mask) const;
void notpending(unsigned i);
void notpending(const Lst& lst);
void calc(Par par, unsigned i);
void dbdiscard(unsigned i);
void dbdiscard(const Lst& lst);
const Row& dbrow(unsigned i) const;
// operations
int insrow(Par par, unsigned i);
int updrow(Par par, unsigned i);
int updrow(Par par, const ITab& itab, unsigned i);
int delrow(Par par, unsigned i);
int selrow(Par par, unsigned i);
int delrow(Par par, const ITab& itab, unsigned i);
int selrow(Par par, const Row& keyrow);
int selrow(Par par, const ITab& itab, const Row& keyrow);
// set and get
void setkey(Par par, const Row& keyrow);
void setkey(Par par, const ITab& itab, const Row& keyrow);
int setrow(Par par, unsigned i);
int getval(Par par);
int getkey(Par par, unsigned* i);
int putval(unsigned i, bool force);
// set methods
int putval(unsigned i, bool force, unsigned n = ~0);
// verify
int verify(const Set& set2) const;
void savepoint();
void commit();
void rollback();
int verifyorder(const ITab& itab, bool descending) const;
// protect structure
NdbMutex* m_mutex;
void lock() {
void lock() const {
NdbMutex_Lock(m_mutex);
}
void unlock() {
void unlock() const {
NdbMutex_Unlock(m_mutex);
}
private:
......@@ -1945,7 +2304,11 @@ Set::Set(const Tab& tab, unsigned rows) :
// allocate on need to save space
m_row[i] = 0;
}
m_saverow = 0;
m_rowkey = new unsigned [m_rows];
for (unsigned n = 0; n < m_rows; n++) {
// initialize to null
m_rowkey[n] = ~0;
}
m_keyrow = new Row(tab);
m_rec = new NdbRecAttr* [tab.m_cols];
for (unsigned k = 0; k < tab.m_cols; k++) {
......@@ -1959,11 +2322,9 @@ Set::~Set()
{
for (unsigned i = 0; i < m_rows; i++) {
delete m_row[i];
if (m_saverow != 0)
delete m_saverow[i];
}
delete [] m_row;
delete [] m_saverow;
delete [] m_rowkey;
delete m_keyrow;
delete [] m_rec;
NdbMutex_Destroy(m_mutex);
......@@ -1994,6 +2355,8 @@ Set::count() const
return count;
}
// old and new values
bool
Set::exist(unsigned i) const
{
......@@ -2003,13 +2366,37 @@ Set::exist(unsigned i) const
return m_row[i]->m_exist;
}
Row::Op
Set::pending(unsigned i) const
void
Set::dbsave(unsigned i)
{
const Tab& tab = m_tab;
assert(i < m_rows && m_row[i] != 0);
Row& row = *m_row[i];
LL5("dbsave " << i << ": " << row);
assert(row.m_exist && ! row.m_pending && row.m_dbrow == 0);
// could swap pointers but making copy is safer
Row* rowptr = new Row(tab);
rowptr->copy(row);
row.m_dbrow = rowptr;
}
void
Set::calc(Par par, unsigned i, unsigned mask)
{
const Tab& tab = m_tab;
if (m_row[i] == 0)
m_row[i] = new Row(tab);
Row& row = *m_row[i];
row.calc(par, i, mask);
}
bool
Set::pending(unsigned i, unsigned mask) const
{
assert(i < m_rows);
if (m_row[i] == 0) // not allocated => not pending
return Row::NoOp;
return m_row[i]->m_pending;
return m_row[i]->m_pending & mask;
}
void
......@@ -2017,10 +2404,13 @@ Set::notpending(unsigned i)
{
assert(m_row[i] != 0);
Row& row = *m_row[i];
if (row.m_pending == Row::InsOp)
if (row.m_pending == Row::InsOp) {
row.m_exist = true;
if (row.m_pending == Row::DelOp)
} else if (row.m_pending == Row::UpdOp) {
;
} else if (row.m_pending == Row::DelOp) {
row.m_exist = false;
}
row.m_pending = Row::NoOp;
}
......@@ -2034,15 +2424,35 @@ Set::notpending(const Lst& lst)
}
void
Set::calc(Par par, unsigned i)
Set::dbdiscard(unsigned i)
{
const Tab& tab = m_tab;
if (m_row[i] == 0)
m_row[i] = new Row(tab);
assert(m_row[i] != 0);
Row& row = *m_row[i];
LL5("dbdiscard " << i << ": " << row);
assert(row.m_dbrow != 0);
delete row.m_dbrow;
row.m_dbrow = 0;
}
const Row&
Set::dbrow(unsigned i) const
{
assert(m_row[i] != 0);
Row& row = *m_row[i];
row.calc(par, i);
return row.dbrow();
}
void
Set::dbdiscard(const Lst& lst)
{
for (unsigned j = 0; j < lst.m_cnt; j++) {
unsigned i = lst.m_arr[j];
dbdiscard(i);
}
}
// operations
int
Set::insrow(Par par, unsigned i)
{
......@@ -2061,6 +2471,15 @@ Set::updrow(Par par, unsigned i)
return 0;
}
int
Set::updrow(Par par, const ITab& itab, unsigned i)
{
assert(m_row[i] != 0);
Row& row = *m_row[i];
CHK(row.updrow(par, itab) == 0);
return 0;
}
int
Set::delrow(Par par, unsigned i)
{
......@@ -2071,15 +2490,67 @@ Set::delrow(Par par, unsigned i)
}
int
Set::selrow(Par par, unsigned i)
Set::delrow(Par par, const ITab& itab, unsigned i)
{
assert(m_row[i] != 0);
Row& row = *m_row[i];
CHK(row.delrow(par, itab) == 0);
return 0;
}
int
Set::selrow(Par par, const Row& keyrow)
{
Con& con = par.con();
m_keyrow->calc(par, i);
const Tab& tab = par.tab();
setkey(par, keyrow);
LL5("selrow " << tab.m_name << ": keyrow: " << keyrow);
CHK(m_keyrow->selrow(par) == 0);
CHK(getval(par) == 0);
return 0;
}
int
Set::selrow(Par par, const ITab& itab, const Row& keyrow)
{
Con& con = par.con();
setkey(par, itab, keyrow);
LL5("selrow " << itab.m_name << ": keyrow: " << keyrow);
CHK(m_keyrow->selrow(par, itab) == 0);
CHK(getval(par) == 0);
return 0;
}
// set and get
void
Set::setkey(Par par, const Row& keyrow)
{
const Tab& tab = m_tab;
for (unsigned k = 0; k < tab.m_cols; k++) {
const Col& col = *tab.m_col[k];
if (col.m_pk) {
Val& val1 = *m_keyrow->m_val[k];
const Val& val2 = *keyrow.dbrow().m_val[k];
val1.copy(val2);
}
}
}
void
Set::setkey(Par par, const ITab& itab, const Row& keyrow)
{
const Tab& tab = m_tab;
for (unsigned k = 0; k < itab.m_icols; k++) {
const ICol& icol = *itab.m_icol[k];
const Col& col = icol.m_col;
unsigned m = col.m_num;
Val& val1 = *m_keyrow->m_val[m];
const Val& val2 = *keyrow.dbrow().m_val[m];
val1.copy(val2);
}
}
int
Set::setrow(Par par, unsigned i)
{
......@@ -2114,7 +2585,7 @@ Set::getkey(Par par, unsigned* i)
}
int
Set::putval(unsigned i, bool force)
Set::putval(unsigned i, bool force, unsigned n)
{
const Tab& tab = m_tab;
if (m_row[i] == 0)
......@@ -2135,55 +2606,55 @@ Set::putval(unsigned i, bool force)
}
if (! row.m_exist)
row.m_exist = true;
if (n != ~0)
m_rowkey[n] = i;
return 0;
}
// verify
int
Set::verify(const Set& set2) const
{
const Tab& tab = m_tab;
assert(&tab == &set2.m_tab && m_rows == set2.m_rows);
LL3("verify set1 count=" << count() << " vs set2 count=" << set2.count());
assert(&m_tab == &set2.m_tab && m_rows == set2.m_rows);
LL4("verify set1 count=" << count() << " vs set2 count=" << set2.count());
for (unsigned i = 0; i < m_rows; i++) {
CHK(exist(i) == set2.exist(i));
if (! exist(i))
continue;
Row& row = *m_row[i];
Row& row2 = *set2.m_row[i];
CHK(row.verify(row2) == 0);
bool ok = true;
if (exist(i) != set2.exist(i)) {
ok = false;
} else if (exist(i)) {
if (dbrow(i).verify(set2.dbrow(i)) != 0)
ok = false;
}
if (! ok) {
LL1("verify failed: key=" << i << " row1=" << m_row[i] << " row2=" << set2.m_row[i]);
CHK(0 == 1);
}
}
return 0;
}
void
Set::savepoint()
int
Set::verifyorder(const ITab& itab, bool descending) const
{
const Tab& tab = m_tab;
assert(m_saverow == 0);
m_saverow = new Row* [m_rows];
for (unsigned i = 0; i < m_rows; i++) {
if (m_row[i] == 0)
m_saverow[i] = 0;
else {
m_saverow[i] = new Row(tab);
m_saverow[i]->copy(*m_row[i]);
}
for (unsigned n = 0; n < m_rows; n++) {
unsigned i2 = m_rowkey[n];
if (i2 == ~0)
break;
if (n == 0)
continue;
unsigned i1 = m_rowkey[n - 1];
assert(i1 < m_rows && i2 < m_rows);
const Row& row1 = *m_row[i1];
const Row& row2 = *m_row[i2];
assert(row1.m_exist && row2.m_exist);
if (! descending)
CHK(row1.cmp(row2, itab) <= 0);
else
CHK(row1.cmp(row2, itab) >= 0);
}
}
void
Set::commit()
{
delete [] m_saverow;
m_saverow = 0;
}
void
Set::rollback()
{
assert(m_saverow != 0);
m_row = m_saverow;
m_saverow = 0;
return 0;
}
static NdbOut&
......@@ -2384,7 +2855,9 @@ BSet::filter(const Set& set, Set& set2) const
for (unsigned i = 0; i < set.m_rows; i++) {
if (! set.exist(i))
continue;
const Row& row = *set.m_row[i];
set.lock();
const Row& row = set.dbrow(i);
set.unlock();
if (! g_store_null_key) {
bool ok1 = false;
for (unsigned k = 0; k < itab.m_icols; k++) {
......@@ -2430,7 +2903,6 @@ BSet::filter(const Set& set, Set& set2) const
Row& row2 = *set2.m_row[i];
assert(! row2.m_exist);
row2.copy(row);
row2.m_exist = true;
}
}
......@@ -2451,15 +2923,16 @@ static int
pkinsert(Par par)
{
Con& con = par.con();
const Tab& tab = par.tab();
Set& set = par.set();
LL3("pkinsert");
LL3("pkinsert " << tab.m_name);
CHK(con.startTransaction() == 0);
Lst lst;
for (unsigned j = 0; j < par.m_rows; j++) {
unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
unsigned i = thrrow(par, j2);
set.lock();
if (set.exist(i) || set.pending(i)) {
if (set.exist(i) || set.pending(i, Row::AnyOp)) {
set.unlock();
continue;
}
......@@ -2473,7 +2946,7 @@ pkinsert(Par par)
CHK(con.execute(Commit, deadlock) == 0);
con.closeTransaction();
if (deadlock) {
LL1("pkinsert: stop on deadlock");
LL1("pkinsert: stop on deadlock [at 1]");
return 0;
}
set.lock();
......@@ -2488,7 +2961,7 @@ pkinsert(Par par)
CHK(con.execute(Commit, deadlock) == 0);
con.closeTransaction();
if (deadlock) {
LL1("pkinsert: stop on deadlock");
LL1("pkinsert: stop on deadlock [at 2]");
return 0;
}
set.lock();
......@@ -2504,8 +2977,9 @@ static int
pkupdate(Par par)
{
Con& con = par.con();
const Tab& tab = par.tab();
Set& set = par.set();
LL3("pkupdate");
LL3("pkupdate " << tab.m_name);
CHK(con.startTransaction() == 0);
Lst lst;
bool deadlock = false;
......@@ -2513,10 +2987,11 @@ pkupdate(Par par)
unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
unsigned i = thrrow(par, j2);
set.lock();
if (! set.exist(i) || set.pending(i)) {
if (! set.exist(i) || set.pending(i, Row::AnyOp)) {
set.unlock();
continue;
}
set.dbsave(i);
set.calc(par, i);
CHK(set.updrow(par, i) == 0);
set.unlock();
......@@ -2526,12 +3001,13 @@ pkupdate(Par par)
deadlock = par.m_deadlock;
CHK(con.execute(Commit, deadlock) == 0);
if (deadlock) {
LL1("pkupdate: stop on deadlock");
LL1("pkupdate: stop on deadlock [at 1]");
break;
}
con.closeTransaction();
set.lock();
set.notpending(lst);
set.dbdiscard(lst);
set.unlock();
lst.reset();
CHK(con.startTransaction() == 0);
......@@ -2541,10 +3017,11 @@ pkupdate(Par par)
deadlock = par.m_deadlock;
CHK(con.execute(Commit, deadlock) == 0);
if (deadlock) {
LL1("pkupdate: stop on deadlock");
LL1("pkupdate: stop on deadlock [at 1]");
} else {
set.lock();
set.notpending(lst);
set.dbdiscard(lst);
set.unlock();
}
}
......@@ -2556,8 +3033,9 @@ static int
pkdelete(Par par)
{
Con& con = par.con();
const Tab& tab = par.tab();
Set& set = par.set();
LL3("pkdelete");
LL3("pkdelete " << tab.m_name);
CHK(con.startTransaction() == 0);
Lst lst;
bool deadlock = false;
......@@ -2565,7 +3043,7 @@ pkdelete(Par par)
unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
unsigned i = thrrow(par, j2);
set.lock();
if (! set.exist(i) || set.pending(i)) {
if (! set.exist(i) || set.pending(i, Row::AnyOp)) {
set.unlock();
continue;
}
......@@ -2577,7 +3055,7 @@ pkdelete(Par par)
deadlock = par.m_deadlock;
CHK(con.execute(Commit, deadlock) == 0);
if (deadlock) {
LL1("pkdelete: stop on deadlock");
LL1("pkdelete: stop on deadlock [at 1]");
break;
}
con.closeTransaction();
......@@ -2592,7 +3070,7 @@ pkdelete(Par par)
deadlock = par.m_deadlock;
CHK(con.execute(Commit, deadlock) == 0);
if (deadlock) {
LL1("pkdelete: stop on deadlock");
LL1("pkdelete: stop on deadlock [at 2]");
} else {
set.lock();
set.notpending(lst);
......@@ -2609,19 +3087,19 @@ pkread(Par par)
Con& con = par.con();
const Tab& tab = par.tab();
Set& set = par.set();
LL3((par.m_verify ? "pkverify " : "pkread ") << tab.m_name);
LL3("pkread " << tab.m_name << " verify=" << par.m_verify);
// expected
const Set& set1 = set;
Set set2(tab, set.m_rows);
for (unsigned i = 0; i < set.m_rows; i++) {
set.lock();
if (! set.exist(i) || set.pending(i)) {
if (! set.exist(i)) {
set.unlock();
continue;
}
set.unlock();
CHK(con.startTransaction() == 0);
CHK(set2.selrow(par, i) == 0);
CHK(set2.selrow(par, *set1.m_row[i]) == 0);
CHK(con.execute(Commit) == 0);
unsigned i2 = (unsigned)-1;
CHK(set2.getkey(par, &i2) == 0 && i == i2);
......@@ -2659,6 +3137,146 @@ pkreadfast(Par par, unsigned count)
return 0;
}
// hash index operations
static int
hashindexupdate(Par par, const ITab& itab)
{
Con& con = par.con();
Set& set = par.set();
LL3("hashindexupdate " << itab.m_name);
CHK(con.startTransaction() == 0);
Lst lst;
bool deadlock = false;
for (unsigned j = 0; j < par.m_rows; j++) {
unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
unsigned i = thrrow(par, j2);
set.lock();
if (! set.exist(i) || set.pending(i, Row::AnyOp)) {
set.unlock();
continue;
}
set.dbsave(i);
// index key columns are not re-calculated
set.calc(par, i, itab.m_colmask);
CHK(set.updrow(par, itab, i) == 0);
set.unlock();
LL4("hashindexupdate " << i << ": " << *set.m_row[i]);
lst.push(i);
if (lst.cnt() == par.m_batch) {
deadlock = par.m_deadlock;
CHK(con.execute(Commit, deadlock) == 0);
if (deadlock) {
LL1("hashindexupdate: stop on deadlock [at 1]");
break;
}
con.closeTransaction();
set.lock();
set.notpending(lst);
set.dbdiscard(lst);
set.unlock();
lst.reset();
CHK(con.startTransaction() == 0);
}
}
if (! deadlock && lst.cnt() != 0) {
deadlock = par.m_deadlock;
CHK(con.execute(Commit, deadlock) == 0);
if (deadlock) {
LL1("hashindexupdate: stop on deadlock [at 1]");
} else {
set.lock();
set.notpending(lst);
set.dbdiscard(lst);
set.unlock();
}
}
con.closeTransaction();
return 0;
};
static int
hashindexdelete(Par par, const ITab& itab)
{
Con& con = par.con();
Set& set = par.set();
LL3("hashindexdelete " << itab.m_name);
CHK(con.startTransaction() == 0);
Lst lst;
bool deadlock = false;
for (unsigned j = 0; j < par.m_rows; j++) {
unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
unsigned i = thrrow(par, j2);
set.lock();
if (! set.exist(i) || set.pending(i, Row::AnyOp)) {
set.unlock();
continue;
}
CHK(set.delrow(par, itab, i) == 0);
set.unlock();
LL4("hashindexdelete " << i << ": " << *set.m_row[i]);
lst.push(i);
if (lst.cnt() == par.m_batch) {
deadlock = par.m_deadlock;
CHK(con.execute(Commit, deadlock) == 0);
if (deadlock) {
LL1("hashindexdelete: stop on deadlock [at 1]");
break;
}
con.closeTransaction();
set.lock();
set.notpending(lst);
set.unlock();
lst.reset();
CHK(con.startTransaction() == 0);
}
}
if (! deadlock && lst.cnt() != 0) {
deadlock = par.m_deadlock;
CHK(con.execute(Commit, deadlock) == 0);
if (deadlock) {
LL1("hashindexdelete: stop on deadlock [at 2]");
} else {
set.lock();
set.notpending(lst);
set.unlock();
}
}
con.closeTransaction();
return 0;
};
static int
hashindexread(Par par, const ITab& itab)
{
Con& con = par.con();
const Tab& tab = par.tab();
Set& set = par.set();
LL3("hashindexread " << itab.m_name << " verify=" << par.m_verify);
// expected
const Set& set1 = set;
Set set2(tab, set.m_rows);
for (unsigned i = 0; i < set.m_rows; i++) {
set.lock();
if (! set.exist(i)) {
set.unlock();
continue;
}
set.unlock();
CHK(con.startTransaction() == 0);
CHK(set2.selrow(par, itab, *set1.m_row[i]) == 0);
CHK(con.execute(Commit) == 0);
unsigned i2 = (unsigned)-1;
CHK(set2.getkey(par, &i2) == 0 && i == i2);
CHK(set2.putval(i, false) == 0);
LL4("row " << set2.count() << ": " << *set2.m_row[i]);
con.closeTransaction();
}
if (par.m_verify)
CHK(set1.verify(set2) == 0);
return 0;
}
// scan read
static int
......@@ -2691,14 +3309,14 @@ scanreadtable(Par par)
}
unsigned i = (unsigned)-1;
CHK(set2.getkey(par, &i) == 0);
CHK(set2.putval(i, false) == 0);
CHK(set2.putval(i, false, n) == 0);
LL4("row " << n << ": " << *set2.m_row[i]);
n++;
}
con.closeTransaction();
if (par.m_verify)
CHK(set1.verify(set2) == 0);
LL3("scanread " << tab.m_name << " rows=" << n);
LL3("scanread " << tab.m_name << " done rows=" << n);
return 0;
}
......@@ -2745,19 +3363,22 @@ scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc)
// prefer proper subset
if (0 < n && n < set.m_rows)
break;
if (urandom(5) == 0)
if (urandom(3) == 0)
break;
set1.reset();
}
} else {
bset.filter(set, set1);
}
LL3("scanread " << itab.m_name << " bounds=" << bset.m_bvals << " verify=" << par.m_verify);
LL3("scanread " << itab.m_name << " bounds=" << bset << " verify=" << par.m_verify << " ordered=" << par.m_ordered << " descending=" << par.m_descending);
LL4("expect " << set1.count() << " rows");
Set set2(tab, set.m_rows);
CHK(con.startTransaction() == 0);
CHK(con.getNdbScanOperation(itab, tab) == 0);
CHK(con.openScanRead(par.m_scanbat, par.m_scanpar) == 0);
if (! par.m_ordered)
CHK(con.openScanRead(par.m_scanbat, par.m_scanpar) == 0);
else
CHK(con.openScanOrdered(par.m_scanbat, par.m_scanpar, par.m_descending) == 0);
CHK(bset.setbnd(par) == 0);
set2.getval(par);
CHK(con.executeScan() == 0);
......@@ -2775,15 +3396,17 @@ scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc)
}
unsigned i = (unsigned)-1;
CHK(set2.getkey(par, &i) == 0);
LL4("key " << i);
CHK(set2.putval(i, par.m_dups) == 0);
LL4("row " << n << ": " << *set2.m_row[i]);
CHK(set2.putval(i, par.m_dups, n) == 0);
LL4("key " << i << " row " << n << ": " << *set2.m_row[i]);
n++;
}
con.closeTransaction();
if (par.m_verify)
if (par.m_verify) {
CHK(set1.verify(set2) == 0);
LL3("scanread " << itab.m_name << " rows=" << n);
if (par.m_ordered)
CHK(set2.verifyorder(itab, par.m_descending) == 0);
}
LL3("scanread " << itab.m_name << " done rows=" << n);
return 0;
}
......@@ -2821,8 +3444,10 @@ scanreadindex(Par par, const ITab& itab)
{
const Tab& tab = par.tab();
for (unsigned i = 0; i < par.m_subsubloop; i++) {
BSet bset(tab, itab, par.m_rows);
CHK(scanreadindex(par, itab, bset, true) == 0);
if (itab.m_type == ITab::OrderedIndex) {
BSet bset(tab, itab, par.m_rows);
CHK(scanreadindex(par, itab, bset, true) == 0);
}
}
return 0;
}
......@@ -2835,7 +3460,11 @@ scanreadindex(Par par)
if (tab.m_itab[i] == 0)
continue;
const ITab& itab = *tab.m_itab[i];
CHK(scanreadindex(par, itab) == 0);
if (itab.m_type == ITab::OrderedIndex) {
CHK(scanreadindex(par, itab) == 0);
} else {
CHK(hashindexread(par, itab) == 0);
}
}
return 0;
}
......@@ -2932,7 +3561,7 @@ scanupdatetable(Par par)
if (ret == 1)
break;
if (deadlock) {
LL1("scanupdatetable: stop on deadlock");
LL1("scanupdatetable: stop on deadlock [at 1]");
break;
}
if (par.m_scanstop != 0 && urandom(par.m_scanstop) == 0) {
......@@ -2944,13 +3573,14 @@ scanupdatetable(Par par)
CHK(set2.getkey(par, &i) == 0);
const Row& row = *set.m_row[i];
set.lock();
if (! set.exist(i) || set.pending(i)) {
if (! set.exist(i) || set.pending(i, Row::AnyOp)) {
LL4("scan update " << tab.m_name << ": skip: " << row);
} else {
CHKTRY(set2.putval(i, false) == 0, set.unlock());
CHKTRY(con.updateScanTuple(con2) == 0, set.unlock());
Par par2 = par;
par2.m_con = &con2;
set.dbsave(i);
set.calc(par, i);
CHKTRY(set.setrow(par2, i) == 0, set.unlock());
LL4("scan update " << tab.m_name << ": " << row);
......@@ -2961,12 +3591,13 @@ scanupdatetable(Par par)
deadlock = par.m_deadlock;
CHK(con2.execute(Commit, deadlock) == 0);
if (deadlock) {
LL1("scanupdateindex: stop on deadlock");
LL1("scanupdatetable: stop on deadlock [at 2]");
goto out;
}
con2.closeTransaction();
set.lock();
set.notpending(lst);
set.dbdiscard(lst);
set.unlock();
count += lst.cnt();
lst.reset();
......@@ -2977,12 +3608,13 @@ scanupdatetable(Par par)
deadlock = par.m_deadlock;
CHK(con2.execute(Commit, deadlock) == 0);
if (deadlock) {
LL1("scanupdateindex: stop on deadlock");
LL1("scanupdatetable: stop on deadlock [at 3]");
goto out;
}
con2.closeTransaction();
set.lock();
set.notpending(lst);
set.dbdiscard(lst);
set.unlock();
count += lst.cnt();
lst.reset();
......@@ -3009,7 +3641,10 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
Set set2(tab, set.m_rows);
CHK(con.startTransaction() == 0);
CHK(con.getNdbScanOperation(itab, tab) == 0);
CHK(con.openScanExclusive(par.m_scanbat, par.m_scanpar) == 0);
if (! par.m_ordered)
CHK(con.openScanExclusive(par.m_scanbat, par.m_scanpar) == 0);
else
CHK(con.openScanOrderedExclusive(par.m_scanbat, par.m_scanpar, par.m_descending) == 0);
CHK(bset.setbnd(par) == 0);
set2.getval(par);
CHK(con.executeScan() == 0);
......@@ -3027,7 +3662,7 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
if (ret == 1)
break;
if (deadlock) {
LL1("scanupdateindex: stop on deadlock");
LL1("scanupdateindex: stop on deadlock [at 1]");
break;
}
if (par.m_scanstop != 0 && urandom(par.m_scanstop) == 0) {
......@@ -3039,13 +3674,14 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
CHK(set2.getkey(par, &i) == 0);
const Row& row = *set.m_row[i];
set.lock();
if (! set.exist(i) || set.pending(i)) {
if (! set.exist(i) || set.pending(i, Row::AnyOp)) {
LL4("scan update " << itab.m_name << ": skip: " << row);
} else {
CHKTRY(set2.putval(i, par.m_dups) == 0, set.unlock());
CHKTRY(con.updateScanTuple(con2) == 0, set.unlock());
Par par2 = par;
par2.m_con = &con2;
set.dbsave(i);
set.calc(par, i);
CHKTRY(set.setrow(par2, i) == 0, set.unlock());
LL4("scan update " << itab.m_name << ": " << row);
......@@ -3056,12 +3692,13 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
deadlock = par.m_deadlock;
CHK(con2.execute(Commit, deadlock) == 0);
if (deadlock) {
LL1("scanupdateindex: stop on deadlock");
LL1("scanupdateindex: stop on deadlock [at 2]");
goto out;
}
con2.closeTransaction();
set.lock();
set.notpending(lst);
set.dbdiscard(lst);
set.unlock();
count += lst.cnt();
lst.reset();
......@@ -3072,12 +3709,13 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
deadlock = par.m_deadlock;
CHK(con2.execute(Commit, deadlock) == 0);
if (deadlock) {
LL1("scanupdateindex: stop on deadlock");
LL1("scanupdateindex: stop on deadlock [at 3]");
goto out;
}
con2.closeTransaction();
set.lock();
set.notpending(lst);
set.dbdiscard(lst);
set.unlock();
count += lst.cnt();
lst.reset();
......@@ -3097,9 +3735,13 @@ scanupdateindex(Par par, const ITab& itab)
{
const Tab& tab = par.tab();
for (unsigned i = 0; i < par.m_subsubloop; i++) {
BSet bset(tab, itab, par.m_rows);
bset.calc(par);
CHK(scanupdateindex(par, itab, bset) == 0);
if (itab.m_type == ITab::OrderedIndex) {
BSet bset(tab, itab, par.m_rows);
bset.calc(par);
CHK(scanupdateindex(par, itab, bset) == 0);
} else {
CHK(hashindexupdate(par, itab) == 0);
}
}
return 0;
}
......@@ -3151,8 +3793,12 @@ readverifyfull(Par par)
unsigned i = par.m_no - 1;
if (i < tab.m_itabs && tab.m_itab[i] != 0) {
const ITab& itab = *tab.m_itab[i];
BSet bset(tab, itab, par.m_rows);
CHK(scanreadindex(par, itab, bset, false) == 0);
if (itab.m_type == ITab::OrderedIndex) {
BSet bset(tab, itab, par.m_rows);
CHK(scanreadindex(par, itab, bset, false) == 0);
} else {
CHK(hashindexread(par, itab) == 0);
}
}
}
return 0;
......@@ -3162,6 +3808,11 @@ static int
readverifyindex(Par par)
{
par.m_verify = true;
unsigned sel = urandom(10);
if (sel < 9) {
par.m_ordered = true;
par.m_descending = (sel < 5);
}
CHK(scanreadindex(par) == 0);
return 0;
}
......@@ -3169,26 +3820,56 @@ readverifyindex(Par par)
static int
pkops(Par par)
{
const Tab& tab = par.tab();
par.m_randomkey = true;
for (unsigned i = 0; i < par.m_subsubloop; i++) {
unsigned j = 0;
while (j < tab.m_itabs) {
if (tab.m_itab[j] != 0) {
const ITab& itab = *tab.m_itab[j];
if (itab.m_type == ITab::UniqueHashIndex && urandom(5) == 0)
break;
}
j++;
}
unsigned sel = urandom(10);
if (par.m_slno % 2 == 0) {
// favor insert
if (sel < 8) {
CHK(pkinsert(par) == 0);
} else if (sel < 9) {
CHK(pkupdate(par) == 0);
if (j == tab.m_itabs)
CHK(pkupdate(par) == 0);
else {
const ITab& itab = *tab.m_itab[j];
CHK(hashindexupdate(par, itab) == 0);
}
} else {
CHK(pkdelete(par) == 0);
if (j == tab.m_itabs)
CHK(pkdelete(par) == 0);
else {
const ITab& itab = *tab.m_itab[j];
CHK(hashindexdelete(par, itab) == 0);
}
}
} else {
// favor delete
if (sel < 1) {
CHK(pkinsert(par) == 0);
} else if (sel < 2) {
CHK(pkupdate(par) == 0);
if (j == tab.m_itabs)
CHK(pkupdate(par) == 0);
else {
const ITab& itab = *tab.m_itab[j];
CHK(hashindexupdate(par, itab) == 0);
}
} else {
CHK(pkdelete(par) == 0);
if (j == tab.m_itabs)
CHK(pkdelete(par) == 0);
else {
const ITab& itab = *tab.m_itab[j];
CHK(hashindexdelete(par, itab) == 0);
}
}
}
}
......@@ -3208,6 +3889,10 @@ pkupdatescanread(Par par)
CHK(scanreadtable(par) == 0);
} else {
par.m_verify = false;
if (sel < 8) {
par.m_ordered = true;
par.m_descending = (sel < 7);
}
CHK(scanreadindex(par) == 0);
}
return 0;
......@@ -3227,6 +3912,10 @@ mixedoperations(Par par)
} else if (sel < 6) {
CHK(scanupdatetable(par) == 0);
} else {
if (sel < 8) {
par.m_ordered = true;
par.m_descending = (sel < 7);
}
CHK(scanupdateindex(par) == 0);
}
return 0;
......@@ -3720,7 +4409,7 @@ printtables()
{
Par par(g_opt);
makebuiltintables(par);
ndbout << "builtin tables (index x0 is on table pk):" << endl;
ndbout << "builtin tables (x0 on pk, x=ordered z=hash):" << endl;
for (unsigned j = 0; j < tabcount; j++) {
if (tablist[j] == 0)
continue;
......@@ -3744,6 +4433,7 @@ runtest(Par par)
LL1("random seed: " << seed);
srandom((unsigned)seed);
} else if (par.m_seed != 0)
LL1("random seed: " << par.m_seed);
srandom(par.m_seed);
// cs
assert(par.m_csname != 0);
......@@ -3953,7 +4643,8 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535)
if (strcmp(arg, "-threads") == 0) {
if (++argv, --argc > 0) {
g_opt.m_threads = atoi(argv[0]);
continue;
if (1 <= g_opt.m_threads)
continue;
}
}
if (strcmp(arg, "-v") == 0) {
......@@ -3970,7 +4661,7 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535)
printhelp();
goto wrongargs;
}
ndbout << "testOIBasic: unknown option " << arg;
ndbout << "testOIBasic: bad or unknown option " << arg;
goto usage;
}
{
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment