Commit 849eee74 authored by unknown's avatar unknown

ndb - wl#2972 (5.1) fix detached trigger opType


storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp:
  set non-update pk pre data to UNDEFINED (previously unspecified)
storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp:
  fix detached trigger opType
storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp:
  fix detached trigger opType
storage/ndb/test/ndbapi/test_event_merge.cpp:
  exact verif post/pre data
parent 3e6f20c0
......@@ -448,7 +448,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
if(!regOperPtr.p->is_first_operation())
{
/**
* Out of order commit
* Out of order commit XXX check effect on triggers
*/
fix_commit_order(regOperPtr);
}
......
......@@ -559,8 +559,9 @@ Dbtup::fireDetachedTriggers(KeyReqStruct *req_struct,
/**
* Set correct operation type and fix change mask
* Note ALLOC is set in "orig" tuple
*/
if(req_struct->m_tuple_ptr->m_header_bits & Tuple_header::ALLOC)
if(save_ptr->m_header_bits & Tuple_header::ALLOC)
{
if(save == ZDELETE)
{
......@@ -571,6 +572,14 @@ Dbtup::fireDetachedTriggers(KeyReqStruct *req_struct,
}
regOperPtr->op_struct.op_type = ZINSERT;
}
else if (save == ZINSERT)
/**
* Tuple was not created but last op is INSERT.
* This is possible only on DELETE + INSERT
*/
{
regOperPtr->op_struct.op_type = ZUPDATE;
}
ndbrequire(regOperPtr->is_first_operation());
triggerList.first(trigPtr);
......
......@@ -376,10 +376,10 @@ NdbEventOperationImpl::receive_event()
AttributeHeader(*aAttrPtr).getAttributeId());
receive_data(tAttr, aDataPtr, tDataSz);
if (is_update)
{
receive_data(tAttr1, aDataPtr, tDataSz);
else
tAttr1->setUNDEFINED(); // do not leave unspecified
tAttr1= tAttr1->next();
}
// next
aAttrPtr++;
aDataPtr+= (tDataSz + 3) >> 2;
......
......@@ -47,8 +47,8 @@
* There are 5 ways (ignoring NUL operand) to compose 2 ops:
* 5.0 bugs 5.1 bugs
* INS o DEL = NUL
* INS o UPD = INS 5.1
* DEL o INS = UPD type=INS 5.1
* INS o UPD = INS type=INS
* DEL o INS = UPD type=INS type=INS
* UPD o DEL = DEL no event
* UPD o UPD = UPD
*/
......@@ -78,6 +78,7 @@ static NdbOperation* g_op = 0;
static const char* g_tabname = "tem1";
static const char* g_evtname = "tem1ev1";
static const uint g_charlen = 5;
static const char* g_charval = "abcde";
static const char* g_csname = "latin1_swedish_ci";
static const NdbDictionary::Table* g_tab = 0;
......@@ -229,9 +230,9 @@ createtable()
chkdb((g_op = g_con->getNdbOperation(g_tabname)) != 0);
chkdb(g_op->insertTuple() == 0);
Uint32 pk1;
char pk2[g_charlen];
char pk2[g_charlen + 1];
pk1 = g_maxpk;
memset(pk2, 0x20, g_charlen);
sprintf(pk2, "%-*u", g_charlen, pk1);
chkdb(g_op->equal("pk1", (char*)&pk1) == 0);
chkdb(g_op->equal("pk2", (char*)&pk2[0]) == 0);
chkdb(g_con->execute(Commit) == 0);
......@@ -256,9 +257,9 @@ droptable()
struct Data {
Uint32 pk1;
char pk2[g_charlen];
char pk2[g_charlen + 1];
Uint32 seq;
char cc1[g_charlen];
char cc1[g_charlen + 1];
void* ptr[g_ncol];
int ind[g_ncol]; // -1 = no data, 1 = NULL, 0 = not NULL
void init() {
......@@ -276,6 +277,30 @@ struct Data {
}
};
static int
cmpdata(const Data& d1, const Data& d2)
{
uint i;
for (i = 0; i < g_ncol; i++) {
const Col& c = getcol(i);
if (d1.ind[i] != d2.ind[i])
return 1;
if (d1.ind[i] == 0 && memcmp(d1.ptr[i], d2.ptr[i], c.size) != 0)
return 1;
}
return 0;
}
static int
cmpdata(const Data (&d1)[2], const Data (&d2)[2])
{
if (cmpdata(d1[0], d2[0]) != 0)
return 1;
if (cmpdata(d1[1], d2[1]) != 0)
return 1;
return 0;
}
static NdbOut&
operator<<(NdbOut& out, const Data& d)
{
......@@ -304,7 +329,7 @@ operator<<(NdbOut& out, const Data& d)
break;
n--;
}
out << buf;
out << "'" << buf << "'";
}
break;
default:
......@@ -370,7 +395,7 @@ operator<<(NdbOut& out, Op::Type t)
static NdbOut&
operator<<(NdbOut& out, const Op& op)
{
out << "t=" << op.type;
out << op.type;
out << " " << op.data[0];
out << " [" << op.data[1] << "]";
return out;
......@@ -504,13 +529,9 @@ checkop(const Op* op, Uint32& pk1)
if (t == Op::INS) {
chkrc(d[1].ind[i] == -1);
} else if (t == Op::DEL) {
#ifdef ndb_event_cares_about_pk_pre_data
chkrc(d[1].ind[i] == -1);
#endif
} else {
#ifdef ndb_event_cares_about_pk_pre_data
chkrc(d[1].ind[i] == 0);
#endif
}
} else {
if (t == Op::INS) {
......@@ -547,43 +568,24 @@ copycol(const Col& c, const Data& d1, Data& d3)
}
static void
copykeys(const Data& d1, Data& d3)
{
uint i;
for (i = 0; i < g_ncol; i++) {
const Col& c = g_col[i];
if (c.pk)
copycol(c, d1, d3);
}
}
static void
copydata(const Data& d1, Data& d3)
copydata(const Data& d1, Data& d3, bool pk, bool nonpk)
{
uint i;
for (i = 0; i < g_ncol; i++) {
const Col& c = g_col[i];
if (c.pk && pk || ! c.pk && nonpk)
copycol(c, d1, d3);
}
}
static void
copyop(const Op* op1, Op* op3)
{
op3->type = op1->type;
copydata(op1->data[0], op3->data[0]);
copydata(op1->data[1], op3->data[1]);
Uint32 pk1_tmp;
reqrc(checkop(op3, pk1_tmp) == 0);
}
// not needed for ops
static void
compdata(const Data& d1, const Data& d2, Data& d3) // d2 overrides d1
compdata(const Data& d1, const Data& d2, Data& d3, bool pk, bool nonpk)
{
uint i;
for (i = 0; i < g_ncol; i++) {
const Col& c = g_col[i];
if (c.pk && pk || ! c.pk && nonpk) {
const Data* d = 0;
if (d1.ind[i] == -1 && d2.ind[i] == -1)
d3.ind[i] = -1;
......@@ -596,6 +598,17 @@ compdata(const Data& d1, const Data& d2, Data& d3) // d2 overrides d1
if (d != 0)
copycol(c, *d, d3);
}
}
}
static void
copyop(const Op* op1, Op* op3)
{
op3->type = op1->type;
copydata(op1->data[0], op3->data[0], true, true);
copydata(op1->data[1], op3->data[1], true, true);
Uint32 pk1_tmp;
reqrc(checkop(op3, pk1_tmp) == 0);
}
static int
......@@ -612,11 +625,14 @@ compop(const Op* op1, const Op* op2, Op* op3) // op1 o op2 = op3
}
chkrc((comp = comptype(op1->type, op2->type)) != 0);
op3->type = comp->t3;
copykeys(op2->data[0], op3->data[0]);
// sucks
copydata(op2->data[0], op3->data[0], true, false);
if (op3->type != Op::DEL)
copydata(op2->data[0], op3->data[0]);
copydata(op2->data[0], op3->data[0], false, true);
if (op3->type != Op::INS)
copydata(op1->data[1], op3->data[1]);
copydata(op1->data[1], op3->data[1], false, true);
if (op3->type == Op::UPD)
copydata(op2->data[0], op3->data[1], true, false);
Uint32 pk1_tmp;
reqrc(checkop(op3, pk1_tmp) == 0);
// not eliminating identical post-pre fields
......@@ -705,9 +721,9 @@ waitgci() // wait for event to be installed and for at least 1 GCI to pass
chkdb((g_con = g_ndb->startTransaction()) != 0);
{ // forced to exec a dummy op
Uint32 pk1;
char pk2[g_charlen];
char pk2[g_charlen + 1];
pk1 = g_maxpk;
memset(pk2, 0x20, g_charlen);
sprintf(pk2, "%-*u", g_charlen, pk1);
chkdb((g_op = g_con->getNdbOperation(g_tabname)) != 0);
chkdb(g_op->readTuple() == 0);
chkdb(g_op->equal("pk1", (char*)&pk1) == 0);
......@@ -723,6 +739,7 @@ waitgci() // wait for event to be installed and for at least 1 GCI to pass
break;
}
i = 1;
sleep(1);
}
return 0;
}
......@@ -732,11 +749,14 @@ makeop(Op* op, Uint32 pk1, Op::Type t, const Op* prev_op)
{
op->type = t;
if (t != Op::INS)
copydata(prev_op->data[0], op->data[1]);
copydata(prev_op->data[0], op->data[1], true, true);
uint i;
for (i = 0; i < g_ncol; i++) {
const Col& c = getcol(i);
Data (&d)[2] = op->data;
if (c.pk && t == Op::DEL) {
d[1].ind[i] = -1;
}
if (i == getcol("pk1").no) {
d[0].pk1 = pk1;
d[0].ind[i] = 0;
......@@ -758,7 +778,7 @@ makeop(Op* op, Uint32 pk1, Op::Type t, const Op* prev_op)
}
uint u;
u = urandom(100);
if (c.nullable && u < 20) {
if (c.nullable && u < 10) {
d[0].ind[i] = 1;
continue;
}
......@@ -776,8 +796,8 @@ makeop(Op* op, Uint32 pk1, Op::Type t, const Op* prev_op)
char* p = (char*)d[0].ptr[i];
uint j;
for (j = 0; j < g_charlen; j++) {
uint v = urandom(3);
p[j] = j < u ? "abcde"[v] : 0x20;
uint v = urandom(strlen(g_charval));
p[j] = j < u ? g_charval[v] : 0x20;
}
}
break;
......@@ -993,9 +1013,9 @@ static int
matchevent(Op* ev)
{
Op::Type t = ev->type;
Data (&d)[2] = ev->data;
Data (&d2)[2] = ev->data;
// get PK
Uint32 pk1 = d[0].pk1;
Uint32 pk1 = d2[0].pk1;
chkrc(pk1 < g_opts.maxpk);
// on error repeat and print details
uint loop = 0;
......@@ -1016,30 +1036,35 @@ matchevent(Op* ev)
op = op->next_op;
}
if (com_op->type != Op::NUL) {
if (com_op->type == t) {
const Data (&d2)[2] = com_op->data;
if (t == Op::INS && d2[0].seq == d[0].seq ||
t == Op::DEL && d2[1].seq == d[1].seq ||
t == Op::UPD && d2[0].seq == d[0].seq) {
if (cnt == g_ev_cnt[pk1]) {
if (! com_op->match) {
ll2("match pos " << cnt);
const Data (&d1)[2] = com_op->data;
if (cmpdata(d1, d2) == 0) {
bool tmpok = true;
if (com_op->type != t) {
ll2("***: wrong type " << com_op->type << " != " << t);
tmpok = false;
}
if (com_op->match) {
ll2("***: duplicate match");
tmpok = false;
}
if (cnt != g_ev_cnt[pk1]) {
ll2("***: wrong pos " << cnt << " != " << g_ev_cnt[pk1]);
tmpok = false;
}
if (tmpok) {
ok = com_op->match = true;
} else {
ll2("duplicate match");
}
} else {
ll2("match bad pos event=" << g_ev_cnt[pk1] << " op=" << cnt);
}
ll2("===: match");
}
}
cnt++;
}
com_op = com_op->next_com;
}
if (ok)
if (ok) {
ll1("matchevent: match");
return 0;
ll2("no match");
}
ll1("matchevent: ERROR: no match");
if (g_loglevel >= 2)
return -1;
loop++;
......@@ -1100,12 +1125,13 @@ runevents()
{
ll1("runevents");
NdbEventOperation* evt_op;
uint npoll = 3;
uint mspoll = 1000;
uint npoll = 7; // strangely long delay
while (npoll != 0) {
npoll--;
int ret;
ll1("poll");
ret = g_ndb->pollEvents(1000);
ret = g_ndb->pollEvents(mspoll);
if (ret <= 0)
continue;
while (1) {
......@@ -1180,7 +1206,7 @@ runtest()
chkrc(createtable() == 0);
chkrc(createevent() == 0);
uint n;
for (n = 0; n < g_opts.loop; n++) {
for (n = 0; g_opts.loop == 0 || n < g_opts.loop; n++) {
ll0("loop " << n);
setseed(n);
resetmem();
......@@ -1280,6 +1306,10 @@ main(int argc, char** argv)
return NDBT_ProgramExit(NDBT_OK);
}
}
if (g_evt_op != 0) {
(void)dropeventop();
g_evt_op = 0;
}
delete g_ndb;
delete g_ncc;
return NDBT_ProgramExit(NDBT_FAILED);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment