Commit 4252ec52 authored by unknown's avatar unknown

ndb - TUP scan filter support for LIKE/NOT_LIKE


ndb/include/ndbapi/NdbScanFilter.hpp:
  TUP scan filter support for LIKE/NOT_LIKE
ndb/include/util/NdbSqlUtil.hpp:
  TUP scan filter support for LIKE/NOT_LIKE
ndb/src/common/util/NdbSqlUtil.cpp:
  TUP scan filter support for LIKE/NOT_LIKE
ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp:
  TUP scan filter support for LIKE/NOT_LIKE
ndb/src/ndbapi/NdbScanFilter.cpp:
  TUP scan filter support for LIKE/NOT_LIKE
parent f9a4cfd9
......@@ -53,7 +53,9 @@ public:
COND_GE = 2, ///< upper bound
COND_GT = 3, ///< upper bound, strict
COND_EQ = 4, ///< equality
COND_NE = 5 ///< not equal
COND_NE = 5, ///< not equal
COND_LIKE = 6, ///< like
COND_NOT_LIKE = 7 ///< not like
};
/**
......
......@@ -25,20 +25,6 @@ typedef struct charset_info_st CHARSET_INFO;
class NdbSqlUtil {
public:
/**
* Compare strings, optionally with padded semantics. Returns
* negative (less), zero (equal), or positive (greater).
*/
static int char_compare(const char* s1, unsigned n1,
const char* s2, unsigned n2, bool padded);
/**
* Like operator, optionally with padded semantics. Returns true or
* false.
*/
static bool char_like(const char* s1, unsigned n1,
const char* s2, unsigned n2, bool padded);
/**
* Compare attribute values. Returns -1, 0, +1 for less, equal,
* greater, respectively. Parameters are pointers to values and their
......@@ -48,7 +34,7 @@ public:
* the partial value is not enough to determine the result, CmpUnknown
* will be returned. A shorter second value is not necessarily
* partial. Partial values are allowed only for types where prefix
* comparison is possible (basically, binary types).
* comparison is possible (basically, binary strings).
*
* First parameter is a pointer to type specific extra info. Char
* types receive CHARSET_INFO in it.
......@@ -58,6 +44,18 @@ public:
*/
typedef int Cmp(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full);
/**
* Prototype for "like" comparison. Defined for the 3 char string
* types. Second argument must have same type-specific format.
* Returns >0 on match, 0 on no match, <0 on bad data.
*
* Uses default special chars ( \ % _ ).
*
* TODO convert special chars to the cs so that ucs2 etc works
* TODO allow user-defined escape ( \ )
*/
typedef int Like(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2);
enum CmpResult {
CmpLess = -1,
CmpEqual = 0,
......@@ -101,6 +99,7 @@ public:
};
Enum m_typeId; // redundant
Cmp* m_cmp; // comparison method
Like* m_like; // "like" comparison method
};
/**
......@@ -110,7 +109,8 @@ public:
/**
* Get the normalized type used in hashing and key comparisons.
* Maps all string types to Binary.
* Maps all string types to Binary. This includes Var* strings
* because strxfrm result is padded to fixed (maximum) length.
*/
static const Type& getTypeBinary(Uint32 typeId);
......@@ -176,6 +176,10 @@ private:
static Cmp cmpOlddecimalunsigned;
static Cmp cmpDecimal;
static Cmp cmpDecimalunsigned;
//
static Like likeChar;
static Like likeVarchar;
static Like likeLongvarchar;
};
#endif
......@@ -16,60 +16,7 @@
#include <NdbSqlUtil.hpp>
#include <NdbOut.hpp>
int
NdbSqlUtil::char_compare(const char* s1, unsigned n1,
const char* s2, unsigned n2, bool padded)
{
int c1 = 0;
int c2 = 0;
unsigned i = 0;
while (i < n1 || i < n2) {
c1 = i < n1 ? s1[i] : padded ? 0x20 : 0;
c2 = i < n2 ? s2[i] : padded ? 0x20 : 0;
if (c1 != c2)
break;
i++;
}
return c1 - c2;
}
bool
NdbSqlUtil::char_like(const char* s1, unsigned n1,
const char* s2, unsigned n2, bool padded)
{
int c1 = 0;
int c2 = 0;
unsigned i1 = 0;
unsigned i2 = 0;
while (i1 < n1 || i2 < n2) {
c1 = i1 < n1 ? s1[i1] : padded ? 0x20 : 0;
c2 = i2 < n2 ? s2[i2] : padded ? 0x20 : 0;
if (c2 == '%') {
while (i2 + 1 < n2 && s2[i2 + 1] == '%') {
i2++;
}
unsigned m = 0;
while (m <= n1 - i1) {
if (char_like(s1 + i1 + m, n1 -i1 - m,
s2 + i2 + 1, n2 - i2 - 1, padded))
return true;
m++;
}
return false;
}
if (c2 == '_') {
if (c1 == 0)
return false;
} else {
if (c1 != c2)
return false;
}
i1++;
i2++;
}
return i1 == n2 && i2 == n2;
}
#include <my_sys.h>
/*
* Data types. The entries must be in the numerical order.
......@@ -79,127 +26,158 @@ const NdbSqlUtil::Type
NdbSqlUtil::m_typeList[] = {
{ // 0
Type::Undefined,
NULL,
NULL
},
{ // 1
Type::Tinyint,
cmpTinyint
cmpTinyint,
NULL
},
{ // 2
Type::Tinyunsigned,
cmpTinyunsigned
cmpTinyunsigned,
NULL
},
{ // 3
Type::Smallint,
cmpSmallint
cmpSmallint,
NULL
},
{ // 4
Type::Smallunsigned,
cmpSmallunsigned
cmpSmallunsigned,
NULL
},
{ // 5
Type::Mediumint,
cmpMediumint
cmpMediumint,
NULL
},
{ // 6
Type::Mediumunsigned,
cmpMediumunsigned
cmpMediumunsigned,
NULL
},
{ // 7
Type::Int,
cmpInt
cmpInt,
NULL
},
{ // 8
Type::Unsigned,
cmpUnsigned
cmpUnsigned,
NULL
},
{ // 9
Type::Bigint,
cmpBigint
cmpBigint,
NULL
},
{ // 10
Type::Bigunsigned,
cmpBigunsigned
cmpBigunsigned,
NULL
},
{ // 11
Type::Float,
cmpFloat
cmpFloat,
NULL
},
{ // 12
Type::Double,
cmpDouble
cmpDouble,
NULL
},
{ // 13
Type::Olddecimal,
cmpOlddecimal
cmpOlddecimal,
NULL
},
{ // 14
Type::Char,
cmpChar
cmpChar,
likeChar
},
{ // 15
Type::Varchar,
cmpVarchar
cmpVarchar,
likeVarchar
},
{ // 16
Type::Binary,
cmpBinary
cmpBinary,
NULL
},
{ // 17
Type::Varbinary,
cmpVarbinary
cmpVarbinary,
NULL
},
{ // 18
Type::Datetime,
cmpDatetime
cmpDatetime,
NULL
},
{ // 19
Type::Date,
cmpDate
cmpDate,
NULL
},
{ // 20
Type::Blob,
NULL // cmpBlob
NULL,
NULL
},
{ // 21
Type::Text,
NULL // cmpText
NULL,
NULL
},
{ // 22
Type::Bit,
NULL // cmpBit
NULL,
NULL
},
{ // 23
Type::Longvarchar,
cmpLongvarchar
cmpLongvarchar,
likeLongvarchar
},
{ // 24
Type::Longvarbinary,
cmpLongvarbinary
cmpLongvarbinary,
NULL
},
{ // 25
Type::Time,
cmpTime
cmpTime,
NULL
},
{ // 26
Type::Year,
cmpYear
cmpYear,
NULL
},
{ // 27
Type::Timestamp,
cmpTimestamp
cmpTimestamp,
NULL
},
{ // 28
Type::Olddecimalunsigned,
cmpOlddecimalunsigned
cmpOlddecimalunsigned,
NULL
},
{ // 29
Type::Decimal,
cmpDecimal
cmpDecimal,
NULL
},
{ // 30
Type::Decimalunsigned,
cmpDecimalunsigned
cmpDecimalunsigned,
NULL
}
};
......@@ -824,6 +802,58 @@ NdbSqlUtil::cmpTimestamp(const void* info, const void* p1, unsigned n1, const vo
return CmpUnknown;
}
// like
int
NdbSqlUtil::likeChar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
const char* v1 = (const char*)p1;
const char* v2 = (const char*)p2;
CHARSET_INFO* cs = (CHARSET_INFO*)(info);
int k = (cs->coll->wildcmp)(cs, v1, v1 + n1, v2, v2 + n2, wild_prefix, wild_one, wild_many);
return k;
}
int
NdbSqlUtil::likeVarchar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
const unsigned lb = 1;
if (n1 >= lb && n2 >= lb) {
const uchar* v1 = (const uchar*)p1;
const uchar* v2 = (const uchar*)p2;
unsigned m1 = *v1;
unsigned m2 = *v2;
if (lb + m1 <= n1 && lb + m2 <= n2) {
const char* w1 = (const char*)v1 + lb;
const char* w2 = (const char*)v2 + lb;
CHARSET_INFO* cs = (CHARSET_INFO*)(info);
int k = (cs->coll->wildcmp)(cs, w1, w1 + m1, w2, w2 + m2, wild_prefix, wild_one, wild_many);
return k;
}
}
return -1;
}
int
NdbSqlUtil::likeLongvarchar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
const unsigned lb = 2;
if (n1 >= lb && n2 >= lb) {
const uchar* v1 = (const uchar*)p1;
const uchar* v2 = (const uchar*)p2;
unsigned m1 = uint2korr(v1);
unsigned m2 = uint2korr(v2);
if (lb + m1 <= n1 && lb + m2 <= n2) {
const char* w1 = (const char*)v1 + lb;
const char* w2 = (const char*)v2 + lb;
CHARSET_INFO* cs = (CHARSET_INFO*)(info);
int k = (cs->coll->wildcmp)(cs, w1, w1 + m1, w2, w2 + m2, wild_prefix, wild_one, wild_many);
return k;
}
}
return -1;
}
// check charset
bool
......
......@@ -1863,7 +1863,8 @@ int Dbtup::interpreterNextLab(Signal* signal,
{
res = r1_null && r2_null ? 0 : r1_null ? -1 : 1;
}
else
else if (cond != Interpreter::LIKE &&
cond != Interpreter::NOT_LIKE)
{
/* --------------------------------------------------------- */
// If length of argument rounded to nearest word is
......@@ -1872,7 +1873,11 @@ int Dbtup::interpreterNextLab(Signal* signal,
if ((((argLen + 3) >> 2) << 2) == attrLen) argLen= attrLen;
res = (*sqlType.m_cmp)(cs, s1, attrLen, s2, argLen, true);
}
else
{
res = (*sqlType.m_like)(cs, s1, attrLen, s2, argLen);
}
switch ((Interpreter::BinaryCondition)cond) {
case Interpreter::EQ:
res = (res == 0);
......@@ -1894,10 +1899,10 @@ int Dbtup::interpreterNextLab(Signal* signal,
res = (res <= 0);
break;
case Interpreter::LIKE:
res = NdbSqlUtil::char_like(s1, attrLen, s2, argLen, false);
res = (res > 0);
break;
case Interpreter::NOT_LIKE:
res = ! NdbSqlUtil::char_like(s1, attrLen, s2, argLen, false);
res = (res == 0);
break;
// XXX handle invalid value
}
......
......@@ -426,6 +426,10 @@ NdbScanFilter::cmp(BinaryCondition cond, int ColId,
return m_impl.cond_col_const(Interpreter::EQ, ColId, val, len);
case COND_NE:
return m_impl.cond_col_const(Interpreter::NE, ColId, val, len);
case COND_LIKE:
return m_impl.cond_col_const(Interpreter::LIKE, ColId, val, len);
case COND_NOT_LIKE:
return m_impl.cond_col_const(Interpreter::NOT_LIKE, ColId, val, len);
}
return -1;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment