Commit 52fb0ec5 authored by joreland@mysql.com's avatar joreland@mysql.com

wl1804 - ndb - add support for _all_ type in NdbScanFilter + TUP

parent bd930e89
...@@ -577,21 +577,21 @@ public: ...@@ -577,21 +577,21 @@ public:
* @param Label label to jump to * @param Label label to jump to
* @return -1 if unsuccessful * @return -1 if unsuccessful
*/ */
int branch_col_eq(Uint32 ColId, const char * val, Uint32 len, int branch_col_eq(Uint32 ColId, const void * val, Uint32 len,
bool nopad, Uint32 Label); bool nopad, Uint32 Label);
int branch_col_ne(Uint32 ColId, const char * val, Uint32 len, int branch_col_ne(Uint32 ColId, const void * val, Uint32 len,
bool nopad, Uint32 Label); bool nopad, Uint32 Label);
int branch_col_lt(Uint32 ColId, const char * val, Uint32 len, int branch_col_lt(Uint32 ColId, const void * val, Uint32 len,
bool nopad, Uint32 Label); bool nopad, Uint32 Label);
int branch_col_le(Uint32 ColId, const char * val, Uint32 len, int branch_col_le(Uint32 ColId, const void * val, Uint32 len,
bool nopad, Uint32 Label); bool nopad, Uint32 Label);
int branch_col_gt(Uint32 ColId, const char * val, Uint32 len, int branch_col_gt(Uint32 ColId, const void * val, Uint32 len,
bool nopad, Uint32 Label); bool nopad, Uint32 Label);
int branch_col_ge(Uint32 ColId, const char * val, Uint32 len, int branch_col_ge(Uint32 ColId, const void * val, Uint32 len,
bool nopad, Uint32 Label); bool nopad, Uint32 Label);
int branch_col_like(Uint32 ColId, const char *, Uint32 len, int branch_col_like(Uint32 ColId, const void *, Uint32 len,
bool nopad, Uint32 Label); bool nopad, Uint32 Label);
int branch_col_notlike(Uint32 ColId, const char *, Uint32 len, int branch_col_notlike(Uint32 ColId, const void *, Uint32 len,
bool nopad, Uint32 Label); bool nopad, Uint32 Label);
/** /**
...@@ -822,7 +822,7 @@ protected: ...@@ -822,7 +822,7 @@ protected:
int read_attr(const NdbColumnImpl* anAttrObject, Uint32 RegDest); int read_attr(const NdbColumnImpl* anAttrObject, Uint32 RegDest);
int write_attr(const NdbColumnImpl* anAttrObject, Uint32 RegSource); int write_attr(const NdbColumnImpl* anAttrObject, Uint32 RegSource);
int branch_reg_reg(Uint32 type, Uint32, Uint32, Uint32); int branch_reg_reg(Uint32 type, Uint32, Uint32, Uint32);
int branch_col(Uint32 type, Uint32, const char *, Uint32, bool, Uint32 Label); int branch_col(Uint32 type, Uint32, const void *, Uint32, bool, Uint32 Label);
int branch_col_null(Uint32 type, Uint32 col, Uint32 Label); int branch_col_null(Uint32 type, Uint32 col, Uint32 Label);
// Handle ATTRINFO signals // Handle ATTRINFO signals
......
...@@ -46,6 +46,16 @@ public: ...@@ -46,6 +46,16 @@ public:
NOR = 4 ///< NOT (x1 OR x2 OR x3) NOR = 4 ///< NOT (x1 OR x2 OR x3)
}; };
enum BinaryCondition
{
LE = 0, ///< lower bound
LT = 1, ///< lower bound, strict
GE = 2, ///< upper bound
GT = 3, ///< upper bound, strict
EQ = 4, ///< equality
NE = 5
};
/** /**
* @name Grouping * @name Grouping
* @{ * @{
...@@ -75,6 +85,11 @@ public: ...@@ -75,6 +85,11 @@ public:
*/ */
int isfalse(); int isfalse();
/**
* Compare column <b>ColId</b> with <b>val</b>
*/
int cmp(BinaryCondition cond, int ColId, const void *val, Uint32 len);
/** /**
* @name Integer Comparators * @name Integer Comparators
* @{ * @{
...@@ -82,52 +97,53 @@ public: ...@@ -82,52 +97,53 @@ public:
/** Compare column value with integer for equal /** Compare column value with integer for equal
* ®return 0 if successful, -1 otherwize * ®return 0 if successful, -1 otherwize
*/ */
int eq(int ColId, Uint32 value); int eq(int ColId, Uint32 value) { return cmp(EQ, ColId, &value, 4);}
/** Compare column value with integer for not equal. /** Compare column value with integer for not equal.
* ®return 0 if successful, -1 otherwize * ®return 0 if successful, -1 otherwize
*/ */
int ne(int ColId, Uint32 value); int ne(int ColId, Uint32 value) { return cmp(NE, ColId, &value, 4);}
/** Compare column value with integer for less than. /** Compare column value with integer for less than.
* ®return 0 if successful, -1 otherwize * ®return 0 if successful, -1 otherwize
*/ */
int lt(int ColId, Uint32 value); int lt(int ColId, Uint32 value) { return cmp(LT, ColId, &value, 4);}
/** Compare column value with integer for less than or equal. /** Compare column value with integer for less than or equal.
* ®return 0 if successful, -1 otherwize * ®return 0 if successful, -1 otherwize
*/ */
int le(int ColId, Uint32 value); int le(int ColId, Uint32 value) { return cmp(LE, ColId, &value, 4);}
/** Compare column value with integer for greater than. /** Compare column value with integer for greater than.
* ®return 0 if successful, -1 otherwize * ®return 0 if successful, -1 otherwize
*/ */
int gt(int ColId, Uint32 value); int gt(int ColId, Uint32 value) { return cmp(GT, ColId, &value, 4);}
/** Compare column value with integer for greater than or equal. /** Compare column value with integer for greater than or equal.
* ®return 0 if successful, -1 otherwize * ®return 0 if successful, -1 otherwize
*/ */
int ge(int ColId, Uint32 value); int ge(int ColId, Uint32 value) { return cmp(GE, ColId, &value, 4);}
/** Compare column value with integer for equal. 64-bit. /** Compare column value with integer for equal. 64-bit.
* ®return 0 if successful, -1 otherwize * ®return 0 if successful, -1 otherwize
*/ */
int eq(int ColId, Uint64 value); int eq(int ColId, Uint64 value) { return cmp(EQ, ColId, &value, 8);}
/** Compare column value with integer for not equal. 64-bit. /** Compare column value with integer for not equal. 64-bit.
* ®return 0 if successful, -1 otherwize * ®return 0 if successful, -1 otherwize
*/ */
int ne(int ColId, Uint64 value); int ne(int ColId, Uint64 value) { return cmp(NE, ColId, &value, 8);}
/** Compare column value with integer for less than. 64-bit. /** Compare column value with integer for less than. 64-bit.
* ®return 0 if successful, -1 otherwize * ®return 0 if successful, -1 otherwize
*/ */
int lt(int ColId, Uint64 value); int lt(int ColId, Uint64 value) { return cmp(LT, ColId, &value, 8);}
/** Compare column value with integer for less than or equal. 64-bit. /** Compare column value with integer for less than or equal. 64-bit.
* ®return 0 if successful, -1 otherwize * ®return 0 if successful, -1 otherwize
*/ */
int le(int ColId, Uint64 value); int le(int ColId, Uint64 value) { return cmp(LE, ColId, &value, 8);}
/** Compare column value with integer for greater than. 64-bit. /** Compare column value with integer for greater than. 64-bit.
* ®return 0 if successful, -1 otherwize * ®return 0 if successful, -1 otherwize
*/ */
int gt(int ColId, Uint64 value); int gt(int ColId, Uint64 value) { return cmp(GT, ColId, &value, 8);}
/** Compare column value with integer for greater than or equal. 64-bit. /** Compare column value with integer for greater than or equal. 64-bit.
* ®return 0 if successful, -1 otherwize * ®return 0 if successful, -1 otherwize
*/ */
int ge(int ColId, Uint64 value); int ge(int ColId, Uint64 value) { return cmp(GE, ColId, &value, 8);}
/** @} *********************************************************************/ /** @} *********************************************************************/
/** Check if column value is NULL */ /** Check if column value is NULL */
...@@ -135,27 +151,7 @@ public: ...@@ -135,27 +151,7 @@ public:
/** Check if column value is non-NULL */ /** Check if column value is non-NULL */
int isnotnull(int ColId); int isnotnull(int ColId);
/** #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
* @name String Comparators
* @{
*/
/**
* Compare string against a Char or Varchar column.
*
* By default Char comparison blank-pads both sides to common length.
* Varchar comparison does not blank-pad.
*
* The extra <i>nopad</i> argument can be used to
* force non-padded comparison for a Char column.
* ®return 0 if successful, -1 otherwize
*/
int eq(int ColId, const char * val, Uint32 len, bool nopad=false);
int ne(int ColId, const char * val, Uint32 len, bool nopad=false);
int lt(int ColId, const char * val, Uint32 len, bool nopad=false);
int le(int ColId, const char * val, Uint32 len, bool nopad=false);
int gt(int ColId, const char * val, Uint32 len, bool nopad=false);
int ge(int ColId, const char * val, Uint32 len, bool nopad=false);
/** /**
* Like comparison operator. * Like comparison operator.
* ®return 0 if successful, -1 otherwize * ®return 0 if successful, -1 otherwize
...@@ -167,6 +163,7 @@ public: ...@@ -167,6 +163,7 @@ public:
*/ */
int notlike(int ColId, const char * val, Uint32 len, bool nopad=false); int notlike(int ColId, const char * val, Uint32 len, bool nopad=false);
/** @} *********************************************************************/ /** @} *********************************************************************/
#endif
private: private:
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
......
...@@ -1818,9 +1818,6 @@ int Dbtup::interpreterNextLab(Signal* signal, ...@@ -1818,9 +1818,6 @@ int Dbtup::interpreterNextLab(Signal* signal,
case Interpreter::BRANCH_ATTR_OP_ARG:{ case Interpreter::BRANCH_ATTR_OP_ARG:{
jam(); jam();
Uint32 cond = Interpreter::getBinaryCondition(theInstruction); Uint32 cond = Interpreter::getBinaryCondition(theInstruction);
Uint32 diff = Interpreter::getArrayLengthDiff(theInstruction);
Uint32 vchr = Interpreter::isVarchar(theInstruction);
Uint32 nopad =Interpreter::isNopad(theInstruction);
Uint32 ins2 = TcurrentProgram[TprogramCounter]; Uint32 ins2 = TcurrentProgram[TprogramCounter];
Uint32 attrId = Interpreter::getBranchCol_AttrId(ins2) << 16; Uint32 attrId = Interpreter::getBranchCol_AttrId(ins2) << 16;
Uint32 argLen = Interpreter::getBranchCol_Len(ins2); Uint32 argLen = Interpreter::getBranchCol_Len(ins2);
...@@ -1840,66 +1837,62 @@ int Dbtup::interpreterNextLab(Signal* signal, ...@@ -1840,66 +1837,62 @@ int Dbtup::interpreterNextLab(Signal* signal,
tmpHabitant = attrId; tmpHabitant = attrId;
} }
attrId >>= 16;
AttributeHeader ah(tmpArea[0]); AttributeHeader ah(tmpArea[0]);
const char* s1 = (char*)&tmpArea[1]; const char* s1 = (char*)&tmpArea[1];
const char* s2 = (char*)&TcurrentProgram[TprogramCounter+1]; const char* s2 = (char*)&TcurrentProgram[TprogramCounter+1];
Uint32 attrLen = (4 * ah.getDataSize()) - diff; Uint32 attrLen = (4 * ah.getDataSize());
if (vchr) { Uint32 TattrDescrIndex = tabptr.p->tabDescriptor +
#if NDB_VERSION_MAJOR >= 3 (attrId << ZAD_LOG_SIZE);
bool vok = false; Uint32 TattrDesc1 = tableDescriptor[TattrDescrIndex].tabDescr;
if (attrLen >= 2) { Uint32 TattrDesc2 = tableDescriptor[TattrDescrIndex+1].tabDescr;
Uint32 vlen = (s1[0] << 8) | s1[1]; // big-endian Uint32 typeId = AttributeDescriptor::getType(TattrDesc1);
s1 += 2; void * cs = 0;
attrLen -= 2; if(AttributeOffset::getCharsetFlag(TattrDesc2))
if (attrLen >= vlen) { {
attrLen = vlen; Uint32 pos = AttributeOffset::getCharsetPos(TattrDesc2);
vok = true; cs = tabptr.p->charsetArray[pos];
}
}
if (!vok) {
terrorCode = ZREGISTER_INIT_ERROR;
tupkeyErrorLab(signal);
return -1;
} }
#else const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(typeId);
Uint32 tmp;
if (attrLen >= 2) { bool r1_null = ah.isNULL();
unsigned char* ss = (unsigned char*)&s1[attrLen - 2]; bool r2_null = argLen == 0;
tmp = (ss[0] << 8) | ss[1]; int res;
if (tmp <= attrLen - 2) if(r1_null || r2_null)
attrLen = tmp; {
res = r1_null && r2_null ? 0 : r1_null ? -1 : 1;
} }
// XXX handle bad data else
#endif {
res = (*sqlType.m_cmp)(cs, s1, attrLen, s2, argLen, true);
} }
bool res = false;
switch ((Interpreter::BinaryCondition)cond) { switch ((Interpreter::BinaryCondition)cond) {
case Interpreter::EQ: case Interpreter::EQ:
res = NdbSqlUtil::char_compare(s1, attrLen, s2, argLen, !nopad) == 0; res = (res == 0);
break; break;
case Interpreter::NE: case Interpreter::NE:
res = NdbSqlUtil::char_compare(s1, attrLen, s2, argLen, !nopad) != 0; res = (res != 0);
break; break;
// note the condition is backwards // note the condition is backwards
case Interpreter::LT: case Interpreter::LT:
res = NdbSqlUtil::char_compare(s1, attrLen, s2, argLen, !nopad) > 0; res = (res > 0);
break; break;
case Interpreter::LE: case Interpreter::LE:
res = NdbSqlUtil::char_compare(s1, attrLen, s2, argLen, !nopad) >= 0; res = (res >= 0);
break; break;
case Interpreter::GT: case Interpreter::GT:
res = NdbSqlUtil::char_compare(s1, attrLen, s2, argLen, !nopad) < 0; res = (res < 0);
break; break;
case Interpreter::GE: case Interpreter::GE:
res = NdbSqlUtil::char_compare(s1, attrLen, s2, argLen, !nopad) <= 0; res = (res <= 0);
break; break;
case Interpreter::LIKE: case Interpreter::LIKE:
res = NdbSqlUtil::char_like(s1, attrLen, s2, argLen, !nopad); res = NdbSqlUtil::char_like(s1, attrLen, s2, argLen, false);
break; break;
case Interpreter::NOT_LIKE: case Interpreter::NOT_LIKE:
res = ! NdbSqlUtil::char_like(s1, attrLen, s2, argLen, !nopad); res = ! NdbSqlUtil::char_like(s1, attrLen, s2, argLen, false);
break; break;
// XXX handle invalid value // XXX handle invalid value
} }
...@@ -1910,8 +1903,10 @@ int Dbtup::interpreterNextLab(Signal* signal, ...@@ -1910,8 +1903,10 @@ int Dbtup::interpreterNextLab(Signal* signal,
#endif #endif
if (res) if (res)
TprogramCounter = brancher(theInstruction, TprogramCounter); TprogramCounter = brancher(theInstruction, TprogramCounter);
else { else
Uint32 tmp = (Interpreter::mod4(argLen) >> 2) + 1; {
Uint32 tmp = ((argLen + 3) >> 2) + 1;
ndbout_c("tmp = %d", tmp);
TprogramCounter += tmp; TprogramCounter += tmp;
} }
break; break;
......
...@@ -1010,7 +1010,7 @@ NdbOperation::insertCall(Uint32 aCall) ...@@ -1010,7 +1010,7 @@ NdbOperation::insertCall(Uint32 aCall)
int int
NdbOperation::branch_col(Uint32 type, NdbOperation::branch_col(Uint32 type,
Uint32 ColId, const char * val, Uint32 len, Uint32 ColId, const void * val, Uint32 len,
bool nopad, Uint32 Label){ bool nopad, Uint32 Label){
if (initial_interpreterCheck() == -1) if (initial_interpreterCheck() == -1)
...@@ -1018,18 +1018,23 @@ NdbOperation::branch_col(Uint32 type, ...@@ -1018,18 +1018,23 @@ NdbOperation::branch_col(Uint32 type,
Interpreter::BinaryCondition c = (Interpreter::BinaryCondition)type; Interpreter::BinaryCondition c = (Interpreter::BinaryCondition)type;
const NdbDictionary::Column * col = const NdbColumnImpl * col =
m_currentTable->getColumn(ColId); m_currentTable->getColumn(ColId);
if(col == 0){ if(col == 0){
abort(); abort();
} }
Uint32 vc = col->getType() == NdbDictionary::Column::Varchar; Uint32 sizeInBytes = col->m_attrSize * col->m_arraySize;
Uint32 colLen = col->getLength() + 2 * vc; if(len != 0 && len != sizeInBytes)
Uint32 al = (4 - (colLen & 3)) & 0x3; {
setErrorCodeAbort(4209);
return -1;
}
len = sizeInBytes;
if (insertATTRINFO(Interpreter::BranchCol(c, al, vc, nopad)) == -1) if (insertATTRINFO(Interpreter::BranchCol(c, 0, 0, false)) == -1)
return -1; return -1;
if (insertBranch(Label) == -1) if (insertBranch(Label) == -1)
...@@ -1047,7 +1052,7 @@ NdbOperation::branch_col(Uint32 type, ...@@ -1047,7 +1052,7 @@ NdbOperation::branch_col(Uint32 type,
Uint32 tmp = 0; Uint32 tmp = 0;
for (Uint32 i = 0; i < len-len2; i++) { for (Uint32 i = 0; i < len-len2; i++) {
char* p = (char*)&tmp; char* p = (char*)&tmp;
p[i] = val[len2+i]; p[i] = ((char*)val)[len2+i];
} }
insertATTRINFO(tmp); insertATTRINFO(tmp);
} }
...@@ -1057,53 +1062,53 @@ NdbOperation::branch_col(Uint32 type, ...@@ -1057,53 +1062,53 @@ NdbOperation::branch_col(Uint32 type,
} }
int int
NdbOperation::branch_col_eq(Uint32 ColId, const char * val, Uint32 len, NdbOperation::branch_col_eq(Uint32 ColId, const void * val, Uint32 len,
bool nopad, Uint32 Label){ bool nopad, Uint32 Label){
INT_DEBUG(("branch_col_eq %u %.*s(%u,%d) -> %u", ColId, len, val, len, nopad, Label)); INT_DEBUG(("branch_col_eq %u %.*s(%u,%d) -> %u", ColId, len, val, len, nopad, Label));
return branch_col(Interpreter::EQ, ColId, val, len, nopad, Label); return branch_col(Interpreter::EQ, ColId, val, len, nopad, Label);
} }
int int
NdbOperation::branch_col_ne(Uint32 ColId, const char * val, Uint32 len, NdbOperation::branch_col_ne(Uint32 ColId, const void * val, Uint32 len,
bool nopad, Uint32 Label){ bool nopad, Uint32 Label){
INT_DEBUG(("branch_col_ne %u %.*s(%u,%d) -> %u", ColId, len, val, len, nopad, Label)); INT_DEBUG(("branch_col_ne %u %.*s(%u,%d) -> %u", ColId, len, val, len, nopad, Label));
return branch_col(Interpreter::NE, ColId, val, len, nopad, Label); return branch_col(Interpreter::NE, ColId, val, len, nopad, Label);
} }
int int
NdbOperation::branch_col_lt(Uint32 ColId, const char * val, Uint32 len, NdbOperation::branch_col_lt(Uint32 ColId, const void * val, Uint32 len,
bool nopad, Uint32 Label){ bool nopad, Uint32 Label){
INT_DEBUG(("branch_col_lt %u %.*s(%u,%d) -> %u", ColId, len, val, len, nopad, Label)); INT_DEBUG(("branch_col_lt %u %.*s(%u,%d) -> %u", ColId, len, val, len, nopad, Label));
return branch_col(Interpreter::LT, ColId, val, len, nopad, Label); return branch_col(Interpreter::LT, ColId, val, len, nopad, Label);
} }
int int
NdbOperation::branch_col_le(Uint32 ColId, const char * val, Uint32 len, NdbOperation::branch_col_le(Uint32 ColId, const void * val, Uint32 len,
bool nopad, Uint32 Label){ bool nopad, Uint32 Label){
INT_DEBUG(("branch_col_le %u %.*s(%u,%d) -> %u", ColId, len, val, len, nopad, Label)); INT_DEBUG(("branch_col_le %u %.*s(%u,%d) -> %u", ColId, len, val, len, nopad, Label));
return branch_col(Interpreter::LE, ColId, val, len, nopad, Label); return branch_col(Interpreter::LE, ColId, val, len, nopad, Label);
} }
int int
NdbOperation::branch_col_gt(Uint32 ColId, const char * val, Uint32 len, NdbOperation::branch_col_gt(Uint32 ColId, const void * val, Uint32 len,
bool nopad, Uint32 Label){ bool nopad, Uint32 Label){
INT_DEBUG(("branch_col_gt %u %.*s(%u,%d) -> %u", ColId, len, val, len, nopad, Label)); INT_DEBUG(("branch_col_gt %u %.*s(%u,%d) -> %u", ColId, len, val, len, nopad, Label));
return branch_col(Interpreter::GT, ColId, val, len, nopad, Label); return branch_col(Interpreter::GT, ColId, val, len, nopad, Label);
} }
int int
NdbOperation::branch_col_ge(Uint32 ColId, const char * val, Uint32 len, NdbOperation::branch_col_ge(Uint32 ColId, const void * val, Uint32 len,
bool nopad, Uint32 Label){ bool nopad, Uint32 Label){
INT_DEBUG(("branch_col_ge %u %.*s(%u,%d) -> %u", ColId, len, val, len, nopad, Label)); INT_DEBUG(("branch_col_ge %u %.*s(%u,%d) -> %u", ColId, len, val, len, nopad, Label));
return branch_col(Interpreter::GE, ColId, val, len, nopad, Label); return branch_col(Interpreter::GE, ColId, val, len, nopad, Label);
} }
int int
NdbOperation::branch_col_like(Uint32 ColId, const char * val, Uint32 len, NdbOperation::branch_col_like(Uint32 ColId, const void * val, Uint32 len,
bool nopad, Uint32 Label){ bool nopad, Uint32 Label){
INT_DEBUG(("branch_col_like %u %.*s(%u,%d) -> %u", ColId, len, val, len, nopad, Label)); INT_DEBUG(("branch_col_like %u %.*s(%u,%d) -> %u", ColId, len, val, len, nopad, Label));
return branch_col(Interpreter::LIKE, ColId, val, len, nopad, Label); return branch_col(Interpreter::LIKE, ColId, val, len, nopad, Label);
} }
int int
NdbOperation::branch_col_notlike(Uint32 ColId, const char * val, Uint32 len, NdbOperation::branch_col_notlike(Uint32 ColId, const void * val, Uint32 len,
bool nopad, Uint32 Label){ bool nopad, Uint32 Label){
INT_DEBUG(("branch_col_notlike %u %.*s(%u,%d) -> %u", ColId,len,val,len,nopad,Label)); INT_DEBUG(("branch_col_notlike %u %.*s(%u,%d) -> %u", ColId,len,val,len,nopad,Label));
return branch_col(Interpreter::NOT_LIKE, ColId, val, len, nopad, Label); return branch_col(Interpreter::NOT_LIKE, ColId, val, len, nopad, Label);
......
...@@ -48,11 +48,8 @@ class NdbScanFilterImpl { ...@@ -48,11 +48,8 @@ class NdbScanFilterImpl {
int cond_col(Interpreter::UnaryCondition, Uint32 attrId); int cond_col(Interpreter::UnaryCondition, Uint32 attrId);
template<typename T>
int cond_col_const(Interpreter::BinaryCondition, Uint32 attrId, T value);
int cond_col_const(Interpreter::BinaryCondition, Uint32 attrId, int cond_col_const(Interpreter::BinaryCondition, Uint32 attrId,
const char * value, Uint32 len, bool nopad); const void * value, Uint32 len);
}; };
const Uint32 LabelExit = ~0; const Uint32 LabelExit = ~0;
...@@ -247,68 +244,7 @@ NdbScanFilter::isfalse(){ ...@@ -247,68 +244,7 @@ NdbScanFilter::isfalse(){
typedef int (NdbOperation:: * Branch1)(Uint32, Uint32 label); typedef int (NdbOperation:: * Branch1)(Uint32, Uint32 label);
typedef int (NdbOperation:: * Branch2)(Uint32, Uint32, Uint32 label); typedef int (NdbOperation:: * StrBranch2)(Uint32, const void*, Uint32, bool, Uint32);
typedef int (NdbOperation:: * StrBranch2)(Uint32, const char*,Uint32,bool,Uint32);
struct tab {
Branch2 m_branches[5];
};
static const tab table[] = {
/**
* EQ (AND, OR, NAND, NOR)
*/
{ { 0,
&NdbOperation::branch_ne,
&NdbOperation::branch_eq,
&NdbOperation::branch_eq,
&NdbOperation::branch_ne } }
/**
* NEQ
*/
,{ { 0,
&NdbOperation::branch_eq,
&NdbOperation::branch_ne,
&NdbOperation::branch_ne,
&NdbOperation::branch_eq } }
/**
* LT
*/
,{ { 0,
&NdbOperation::branch_le,
&NdbOperation::branch_gt,
&NdbOperation::branch_gt,
&NdbOperation::branch_le } }
/**
* LE
*/
,{ { 0,
&NdbOperation::branch_lt,
&NdbOperation::branch_ge,
&NdbOperation::branch_ge,
&NdbOperation::branch_lt } }
/**
* GT
*/
,{ { 0,
&NdbOperation::branch_ge,
&NdbOperation::branch_lt,
&NdbOperation::branch_lt,
&NdbOperation::branch_ge } }
/**
* GE
*/
,{ { 0,
&NdbOperation::branch_gt,
&NdbOperation::branch_le,
&NdbOperation::branch_le,
&NdbOperation::branch_gt } }
};
struct tab2 { struct tab2 {
Branch1 m_branches[5]; Branch1 m_branches[5];
...@@ -334,133 +270,8 @@ static const tab2 table2[] = { ...@@ -334,133 +270,8 @@ static const tab2 table2[] = {
&NdbOperation::branch_col_eq_null } } &NdbOperation::branch_col_eq_null } }
}; };
const int tab_sz = sizeof(table)/sizeof(table[0]);
const int tab2_sz = sizeof(table2)/sizeof(table2[0]); const int tab2_sz = sizeof(table2)/sizeof(table2[0]);
int
matchType(const NdbDictionary::Column * col){
return 1;
}
template<typename T> int load_const(NdbOperation* op, T value, Uint32 reg);
template<>
int
load_const(NdbOperation* op, Uint32 value, Uint32 reg){
return op->load_const_u32(reg, value);
}
template<>
int
load_const(NdbOperation* op, Uint64 value, Uint32 reg){
return op->load_const_u64(reg, value);
}
template<typename T>
int
NdbScanFilterImpl::cond_col_const(Interpreter::BinaryCondition op,
Uint32 AttrId, T value){
if(op < 0 || op >= tab_sz){
m_operation->setErrorCodeAbort(4262);
return -1;
}
if(m_current.m_group < NdbScanFilter::AND ||
m_current.m_group > NdbScanFilter::NOR){
m_operation->setErrorCodeAbort(4260);
return -1;
}
Branch2 branch = table[op].m_branches[m_current.m_group];
const NdbDictionary::Column * col =
m_operation->m_currentTable->getColumn(AttrId);
if(col == 0){
m_operation->setErrorCodeAbort(4261);
return -1;
}
if(!matchType(col)){
/**
* Code not reached
*/
return -1;
}
if(m_latestAttrib != AttrId){
m_operation->read_attr(&NdbColumnImpl::getImpl(* col), 4);
m_latestAttrib = AttrId;
}
load_const<T>(m_operation, value, 5);
(m_operation->* branch)(4, 5, m_current.m_ownLabel);
return 0;
};
int
NdbScanFilter::eq(int AttrId, Uint32 value){
return m_impl.cond_col_const(Interpreter::EQ, AttrId, value);
}
int
NdbScanFilter::ne(int AttrId, Uint32 value){
return m_impl.cond_col_const(Interpreter::NE, AttrId, value);
}
int
NdbScanFilter::lt(int AttrId, Uint32 value){
return m_impl.cond_col_const(Interpreter::LT, AttrId, value);
}
int
NdbScanFilter::le(int AttrId, Uint32 value){
return m_impl.cond_col_const(Interpreter::LE, AttrId, value);
}
int
NdbScanFilter::gt(int AttrId, Uint32 value){
return m_impl.cond_col_const(Interpreter::GT, AttrId, value);
}
int
NdbScanFilter::ge(int AttrId, Uint32 value){
return m_impl.cond_col_const(Interpreter::GE, AttrId, value);
}
int
NdbScanFilter::eq(int AttrId, Uint64 value){
return m_impl.cond_col_const(Interpreter::EQ, AttrId, value);
}
int
NdbScanFilter::ne(int AttrId, Uint64 value){
return m_impl.cond_col_const(Interpreter::NE, AttrId, value);
}
int
NdbScanFilter::lt(int AttrId, Uint64 value){
return m_impl.cond_col_const(Interpreter::LT, AttrId, value);
}
int
NdbScanFilter::le(int AttrId, Uint64 value){
return m_impl.cond_col_const(Interpreter::LE, AttrId, value);
}
int
NdbScanFilter::gt(int AttrId, Uint64 value){
return m_impl.cond_col_const(Interpreter::GT, AttrId, value);
}
int
NdbScanFilter::ge(int AttrId, Uint64 value){
return m_impl.cond_col_const(Interpreter::GE, AttrId, value);
}
int int
NdbScanFilterImpl::cond_col(Interpreter::UnaryCondition op, Uint32 AttrId){ NdbScanFilterImpl::cond_col(Interpreter::UnaryCondition op, Uint32 AttrId){
...@@ -570,11 +381,10 @@ static const tab3 table3[] = { ...@@ -570,11 +381,10 @@ static const tab3 table3[] = {
const int tab3_sz = sizeof(table3)/sizeof(table3[0]); const int tab3_sz = sizeof(table3)/sizeof(table3[0]);
int int
NdbScanFilterImpl::cond_col_const(Interpreter::BinaryCondition op, NdbScanFilterImpl::cond_col_const(Interpreter::BinaryCondition op,
Uint32 AttrId, Uint32 AttrId,
const char * value, Uint32 len, bool nopad){ const void * value, Uint32 len){
if(op < 0 || op >= tab3_sz){ if(op < 0 || op >= tab3_sz){
m_operation->setErrorCodeAbort(4260); m_operation->setErrorCodeAbort(4260);
return -1; return -1;
...@@ -595,49 +405,31 @@ NdbScanFilterImpl::cond_col_const(Interpreter::BinaryCondition op, ...@@ -595,49 +405,31 @@ NdbScanFilterImpl::cond_col_const(Interpreter::BinaryCondition op,
return -1; return -1;
} }
(m_operation->* branch)(AttrId, value, len, nopad, m_current.m_ownLabel); (m_operation->* branch)(AttrId, value, len, false, m_current.m_ownLabel);
return 0; return 0;
} }
int int
NdbScanFilter::eq(int ColId, const char * val, Uint32 len, bool nopad){ NdbScanFilter::cmp(BinaryCondition cond, int ColId,
return m_impl.cond_col_const(Interpreter::EQ, ColId, val, len, nopad); const void *val, Uint32 len)
} {
switch(cond){
int case LE:
NdbScanFilter::ne(int ColId, const char * val, Uint32 len, bool nopad){ return m_impl.cond_col_const(Interpreter::LE, ColId, val, len);
return m_impl.cond_col_const(Interpreter::NE, ColId, val, len, nopad); case LT:
} return m_impl.cond_col_const(Interpreter::LT, ColId, val, len);
case GE:
int return m_impl.cond_col_const(Interpreter::GE, ColId, val, len);
NdbScanFilter::lt(int ColId, const char * val, Uint32 len, bool nopad){ case GT:
return m_impl.cond_col_const(Interpreter::LT, ColId, val, len, nopad); return m_impl.cond_col_const(Interpreter::GT, ColId, val, len);
} case EQ:
return m_impl.cond_col_const(Interpreter::EQ, ColId, val, len);
int case NE:
NdbScanFilter::le(int ColId, const char * val, Uint32 len, bool nopad){ return m_impl.cond_col_const(Interpreter::NE, ColId, val, len);
return m_impl.cond_col_const(Interpreter::LE, ColId, val, len, nopad); }
} return -1;
int
NdbScanFilter::gt(int ColId, const char * val, Uint32 len, bool nopad){
return m_impl.cond_col_const(Interpreter::GT, ColId, val, len, nopad);
}
int
NdbScanFilter::ge(int ColId, const char * val, Uint32 len, bool nopad){
return m_impl.cond_col_const(Interpreter::GE, ColId, val, len, nopad);
}
int
NdbScanFilter::like(int ColId, const char * val, Uint32 len, bool nopad){
return m_impl.cond_col_const(Interpreter::LIKE, ColId, val, len, nopad);
} }
int
NdbScanFilter::notlike(int ColId, const char * val, Uint32 len, bool nopad){
return m_impl.cond_col_const(Interpreter::NOT_LIKE, ColId, val, len, nopad);
}
#if 0 #if 0
int int
...@@ -778,10 +570,4 @@ main(void){ ...@@ -778,10 +570,4 @@ main(void){
#endif #endif
template class Vector<NdbScanFilterImpl::State>; template class Vector<NdbScanFilterImpl::State>;
#if __SUNPRO_CC != 0x560
#ifndef _FORTEC_
template int NdbScanFilterImpl::cond_col_const(Interpreter::BinaryCondition, Uint32 attrId, Uint32);
template int NdbScanFilterImpl::cond_col_const(Interpreter::BinaryCondition, Uint32 attrId, Uint64);
#endif
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment