Commit 7ef712ba authored by pekka@mysql.com's avatar pekka@mysql.com

ndb - TUP scan filter support for LIKE/NOT_LIKE

parent 14c9ff5c
...@@ -53,7 +53,9 @@ public: ...@@ -53,7 +53,9 @@ public:
COND_GE = 2, ///< upper bound COND_GE = 2, ///< upper bound
COND_GT = 3, ///< upper bound, strict COND_GT = 3, ///< upper bound, strict
COND_EQ = 4, ///< equality COND_EQ = 4, ///< equality
COND_NE = 5 ///< not equal COND_NE = 5, ///< not equal
COND_LIKE = 6, ///< like
COND_NOT_LIKE = 7 ///< not like
}; };
/** /**
......
...@@ -25,20 +25,6 @@ typedef struct charset_info_st CHARSET_INFO; ...@@ -25,20 +25,6 @@ typedef struct charset_info_st CHARSET_INFO;
class NdbSqlUtil { class NdbSqlUtil {
public: public:
/**
* Compare strings, optionally with padded semantics. Returns
* negative (less), zero (equal), or positive (greater).
*/
static int char_compare(const char* s1, unsigned n1,
const char* s2, unsigned n2, bool padded);
/**
* Like operator, optionally with padded semantics. Returns true or
* false.
*/
static bool char_like(const char* s1, unsigned n1,
const char* s2, unsigned n2, bool padded);
/** /**
* Compare attribute values. Returns -1, 0, +1 for less, equal, * Compare attribute values. Returns -1, 0, +1 for less, equal,
* greater, respectively. Parameters are pointers to values and their * greater, respectively. Parameters are pointers to values and their
...@@ -48,7 +34,7 @@ public: ...@@ -48,7 +34,7 @@ public:
* the partial value is not enough to determine the result, CmpUnknown * the partial value is not enough to determine the result, CmpUnknown
* will be returned. A shorter second value is not necessarily * will be returned. A shorter second value is not necessarily
* partial. Partial values are allowed only for types where prefix * partial. Partial values are allowed only for types where prefix
* comparison is possible (basically, binary types). * comparison is possible (basically, binary strings).
* *
* First parameter is a pointer to type specific extra info. Char * First parameter is a pointer to type specific extra info. Char
* types receive CHARSET_INFO in it. * types receive CHARSET_INFO in it.
...@@ -58,6 +44,18 @@ public: ...@@ -58,6 +44,18 @@ public:
*/ */
typedef int Cmp(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full); typedef int Cmp(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full);
/**
* Prototype for "like" comparison. Defined for the 3 char string
* types. Second argument must have same type-specific format.
* Returns >0 on match, 0 on no match, <0 on bad data.
*
* Uses default special chars ( \ % _ ).
*
* TODO convert special chars to the cs so that ucs2 etc works
* TODO allow user-defined escape ( \ )
*/
typedef int Like(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2);
enum CmpResult { enum CmpResult {
CmpLess = -1, CmpLess = -1,
CmpEqual = 0, CmpEqual = 0,
...@@ -101,6 +99,7 @@ public: ...@@ -101,6 +99,7 @@ public:
}; };
Enum m_typeId; // redundant Enum m_typeId; // redundant
Cmp* m_cmp; // comparison method Cmp* m_cmp; // comparison method
Like* m_like; // "like" comparison method
}; };
/** /**
...@@ -110,7 +109,8 @@ public: ...@@ -110,7 +109,8 @@ public:
/** /**
* Get the normalized type used in hashing and key comparisons. * Get the normalized type used in hashing and key comparisons.
* Maps all string types to Binary. * Maps all string types to Binary. This includes Var* strings
* because strxfrm result is padded to fixed (maximum) length.
*/ */
static const Type& getTypeBinary(Uint32 typeId); static const Type& getTypeBinary(Uint32 typeId);
...@@ -176,6 +176,10 @@ private: ...@@ -176,6 +176,10 @@ private:
static Cmp cmpOlddecimalunsigned; static Cmp cmpOlddecimalunsigned;
static Cmp cmpDecimal; static Cmp cmpDecimal;
static Cmp cmpDecimalunsigned; static Cmp cmpDecimalunsigned;
//
static Like likeChar;
static Like likeVarchar;
static Like likeLongvarchar;
}; };
#endif #endif
...@@ -16,60 +16,7 @@ ...@@ -16,60 +16,7 @@
#include <NdbSqlUtil.hpp> #include <NdbSqlUtil.hpp>
#include <NdbOut.hpp> #include <NdbOut.hpp>
#include <my_sys.h>
int
NdbSqlUtil::char_compare(const char* s1, unsigned n1,
const char* s2, unsigned n2, bool padded)
{
int c1 = 0;
int c2 = 0;
unsigned i = 0;
while (i < n1 || i < n2) {
c1 = i < n1 ? s1[i] : padded ? 0x20 : 0;
c2 = i < n2 ? s2[i] : padded ? 0x20 : 0;
if (c1 != c2)
break;
i++;
}
return c1 - c2;
}
bool
NdbSqlUtil::char_like(const char* s1, unsigned n1,
const char* s2, unsigned n2, bool padded)
{
int c1 = 0;
int c2 = 0;
unsigned i1 = 0;
unsigned i2 = 0;
while (i1 < n1 || i2 < n2) {
c1 = i1 < n1 ? s1[i1] : padded ? 0x20 : 0;
c2 = i2 < n2 ? s2[i2] : padded ? 0x20 : 0;
if (c2 == '%') {
while (i2 + 1 < n2 && s2[i2 + 1] == '%') {
i2++;
}
unsigned m = 0;
while (m <= n1 - i1) {
if (char_like(s1 + i1 + m, n1 -i1 - m,
s2 + i2 + 1, n2 - i2 - 1, padded))
return true;
m++;
}
return false;
}
if (c2 == '_') {
if (c1 == 0)
return false;
} else {
if (c1 != c2)
return false;
}
i1++;
i2++;
}
return i1 == n2 && i2 == n2;
}
/* /*
* Data types. The entries must be in the numerical order. * Data types. The entries must be in the numerical order.
...@@ -79,127 +26,158 @@ const NdbSqlUtil::Type ...@@ -79,127 +26,158 @@ const NdbSqlUtil::Type
NdbSqlUtil::m_typeList[] = { NdbSqlUtil::m_typeList[] = {
{ // 0 { // 0
Type::Undefined, Type::Undefined,
NULL,
NULL NULL
}, },
{ // 1 { // 1
Type::Tinyint, Type::Tinyint,
cmpTinyint cmpTinyint,
NULL
}, },
{ // 2 { // 2
Type::Tinyunsigned, Type::Tinyunsigned,
cmpTinyunsigned cmpTinyunsigned,
NULL
}, },
{ // 3 { // 3
Type::Smallint, Type::Smallint,
cmpSmallint cmpSmallint,
NULL
}, },
{ // 4 { // 4
Type::Smallunsigned, Type::Smallunsigned,
cmpSmallunsigned cmpSmallunsigned,
NULL
}, },
{ // 5 { // 5
Type::Mediumint, Type::Mediumint,
cmpMediumint cmpMediumint,
NULL
}, },
{ // 6 { // 6
Type::Mediumunsigned, Type::Mediumunsigned,
cmpMediumunsigned cmpMediumunsigned,
NULL
}, },
{ // 7 { // 7
Type::Int, Type::Int,
cmpInt cmpInt,
NULL
}, },
{ // 8 { // 8
Type::Unsigned, Type::Unsigned,
cmpUnsigned cmpUnsigned,
NULL
}, },
{ // 9 { // 9
Type::Bigint, Type::Bigint,
cmpBigint cmpBigint,
NULL
}, },
{ // 10 { // 10
Type::Bigunsigned, Type::Bigunsigned,
cmpBigunsigned cmpBigunsigned,
NULL
}, },
{ // 11 { // 11
Type::Float, Type::Float,
cmpFloat cmpFloat,
NULL
}, },
{ // 12 { // 12
Type::Double, Type::Double,
cmpDouble cmpDouble,
NULL
}, },
{ // 13 { // 13
Type::Olddecimal, Type::Olddecimal,
cmpOlddecimal cmpOlddecimal,
NULL
}, },
{ // 14 { // 14
Type::Char, Type::Char,
cmpChar cmpChar,
likeChar
}, },
{ // 15 { // 15
Type::Varchar, Type::Varchar,
cmpVarchar cmpVarchar,
likeVarchar
}, },
{ // 16 { // 16
Type::Binary, Type::Binary,
cmpBinary cmpBinary,
NULL
}, },
{ // 17 { // 17
Type::Varbinary, Type::Varbinary,
cmpVarbinary cmpVarbinary,
NULL
}, },
{ // 18 { // 18
Type::Datetime, Type::Datetime,
cmpDatetime cmpDatetime,
NULL
}, },
{ // 19 { // 19
Type::Date, Type::Date,
cmpDate cmpDate,
NULL
}, },
{ // 20 { // 20
Type::Blob, Type::Blob,
NULL // cmpBlob NULL,
NULL
}, },
{ // 21 { // 21
Type::Text, Type::Text,
NULL // cmpText NULL,
NULL
}, },
{ // 22 { // 22
Type::Bit, Type::Bit,
NULL // cmpBit NULL,
NULL
}, },
{ // 23 { // 23
Type::Longvarchar, Type::Longvarchar,
cmpLongvarchar cmpLongvarchar,
likeLongvarchar
}, },
{ // 24 { // 24
Type::Longvarbinary, Type::Longvarbinary,
cmpLongvarbinary cmpLongvarbinary,
NULL
}, },
{ // 25 { // 25
Type::Time, Type::Time,
cmpTime cmpTime,
NULL
}, },
{ // 26 { // 26
Type::Year, Type::Year,
cmpYear cmpYear,
NULL
}, },
{ // 27 { // 27
Type::Timestamp, Type::Timestamp,
cmpTimestamp cmpTimestamp,
NULL
}, },
{ // 28 { // 28
Type::Olddecimalunsigned, Type::Olddecimalunsigned,
cmpOlddecimalunsigned cmpOlddecimalunsigned,
NULL
}, },
{ // 29 { // 29
Type::Decimal, Type::Decimal,
cmpDecimal cmpDecimal,
NULL
}, },
{ // 30 { // 30
Type::Decimalunsigned, Type::Decimalunsigned,
cmpDecimalunsigned cmpDecimalunsigned,
NULL
} }
}; };
...@@ -824,6 +802,58 @@ NdbSqlUtil::cmpTimestamp(const void* info, const void* p1, unsigned n1, const vo ...@@ -824,6 +802,58 @@ NdbSqlUtil::cmpTimestamp(const void* info, const void* p1, unsigned n1, const vo
return CmpUnknown; return CmpUnknown;
} }
// like
int
NdbSqlUtil::likeChar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
const char* v1 = (const char*)p1;
const char* v2 = (const char*)p2;
CHARSET_INFO* cs = (CHARSET_INFO*)(info);
int k = (cs->coll->wildcmp)(cs, v1, v1 + n1, v2, v2 + n2, wild_prefix, wild_one, wild_many);
return k;
}
int
NdbSqlUtil::likeVarchar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
const unsigned lb = 1;
if (n1 >= lb && n2 >= lb) {
const uchar* v1 = (const uchar*)p1;
const uchar* v2 = (const uchar*)p2;
unsigned m1 = *v1;
unsigned m2 = *v2;
if (lb + m1 <= n1 && lb + m2 <= n2) {
const char* w1 = (const char*)v1 + lb;
const char* w2 = (const char*)v2 + lb;
CHARSET_INFO* cs = (CHARSET_INFO*)(info);
int k = (cs->coll->wildcmp)(cs, w1, w1 + m1, w2, w2 + m2, wild_prefix, wild_one, wild_many);
return k;
}
}
return -1;
}
int
NdbSqlUtil::likeLongvarchar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
const unsigned lb = 2;
if (n1 >= lb && n2 >= lb) {
const uchar* v1 = (const uchar*)p1;
const uchar* v2 = (const uchar*)p2;
unsigned m1 = uint2korr(v1);
unsigned m2 = uint2korr(v2);
if (lb + m1 <= n1 && lb + m2 <= n2) {
const char* w1 = (const char*)v1 + lb;
const char* w2 = (const char*)v2 + lb;
CHARSET_INFO* cs = (CHARSET_INFO*)(info);
int k = (cs->coll->wildcmp)(cs, w1, w1 + m1, w2, w2 + m2, wild_prefix, wild_one, wild_many);
return k;
}
}
return -1;
}
// check charset // check charset
bool bool
......
...@@ -1863,7 +1863,8 @@ int Dbtup::interpreterNextLab(Signal* signal, ...@@ -1863,7 +1863,8 @@ int Dbtup::interpreterNextLab(Signal* signal,
{ {
res = r1_null && r2_null ? 0 : r1_null ? -1 : 1; res = r1_null && r2_null ? 0 : r1_null ? -1 : 1;
} }
else else if (cond != Interpreter::LIKE &&
cond != Interpreter::NOT_LIKE)
{ {
/* --------------------------------------------------------- */ /* --------------------------------------------------------- */
// If length of argument rounded to nearest word is // If length of argument rounded to nearest word is
...@@ -1872,6 +1873,10 @@ int Dbtup::interpreterNextLab(Signal* signal, ...@@ -1872,6 +1873,10 @@ int Dbtup::interpreterNextLab(Signal* signal,
if ((((argLen + 3) >> 2) << 2) == attrLen) argLen= attrLen; if ((((argLen + 3) >> 2) << 2) == attrLen) argLen= attrLen;
res = (*sqlType.m_cmp)(cs, s1, attrLen, s2, argLen, true); res = (*sqlType.m_cmp)(cs, s1, attrLen, s2, argLen, true);
} }
else
{
res = (*sqlType.m_like)(cs, s1, attrLen, s2, argLen);
}
switch ((Interpreter::BinaryCondition)cond) { switch ((Interpreter::BinaryCondition)cond) {
case Interpreter::EQ: case Interpreter::EQ:
...@@ -1894,10 +1899,10 @@ int Dbtup::interpreterNextLab(Signal* signal, ...@@ -1894,10 +1899,10 @@ int Dbtup::interpreterNextLab(Signal* signal,
res = (res <= 0); res = (res <= 0);
break; break;
case Interpreter::LIKE: case Interpreter::LIKE:
res = NdbSqlUtil::char_like(s1, attrLen, s2, argLen, false); res = (res > 0);
break; break;
case Interpreter::NOT_LIKE: case Interpreter::NOT_LIKE:
res = ! NdbSqlUtil::char_like(s1, attrLen, s2, argLen, false); res = (res == 0);
break; break;
// XXX handle invalid value // XXX handle invalid value
} }
......
...@@ -426,6 +426,10 @@ NdbScanFilter::cmp(BinaryCondition cond, int ColId, ...@@ -426,6 +426,10 @@ NdbScanFilter::cmp(BinaryCondition cond, int ColId,
return m_impl.cond_col_const(Interpreter::EQ, ColId, val, len); return m_impl.cond_col_const(Interpreter::EQ, ColId, val, len);
case COND_NE: case COND_NE:
return m_impl.cond_col_const(Interpreter::NE, ColId, val, len); return m_impl.cond_col_const(Interpreter::NE, ColId, val, len);
case COND_LIKE:
return m_impl.cond_col_const(Interpreter::LIKE, ColId, val, len);
case COND_NOT_LIKE:
return m_impl.cond_col_const(Interpreter::NOT_LIKE, ColId, val, len);
} }
return -1; return -1;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment