Commit 067543c4 authored by mskold@mysql.com's avatar mskold@mysql.com

Added support for LIKE and NOT LIKE in condition pushdown

parent adb94fcb
...@@ -5907,6 +5907,8 @@ void ndb_serialize_cond(const Item *item, void *arg) ...@@ -5907,6 +5907,8 @@ void ndb_serialize_cond(const Item *item, void *arg)
break; break;
} }
} }
DBUG_PRINT("info", ("Was not expecting field of type %u",
field->result_type()));
context->supported= FALSE; context->supported= FALSE;
break; break;
} }
...@@ -6009,14 +6011,16 @@ void ndb_serialize_cond(const Item *item, void *arg) ...@@ -6009,14 +6011,16 @@ void ndb_serialize_cond(const Item *item, void *arg)
DBUG_PRINT("info", ("LIKE_FUNC")); DBUG_PRINT("info", ("LIKE_FUNC"));
curr_cond->ndb_item= new Ndb_item(func_item->functype()); curr_cond->ndb_item= new Ndb_item(func_item->functype());
context->expect(Item::STRING_ITEM); context->expect(Item::STRING_ITEM);
context->supported= FALSE; // Currently not supported context->expect(Item::FIELD_ITEM);
context->expect_field_result(STRING_RESULT);
break; break;
} }
case(Item_func::NOTLIKE_FUNC): { case(Item_func::NOTLIKE_FUNC): {
DBUG_PRINT("info", ("NOTLIKE_FUNC")); DBUG_PRINT("info", ("NOTLIKE_FUNC"));
curr_cond->ndb_item= new Ndb_item(func_item->functype()); curr_cond->ndb_item= new Ndb_item(func_item->functype());
context->expect(Item::STRING_ITEM); context->expect(Item::STRING_ITEM);
context->supported= FALSE; // Currently not supported context->expect(Item::FIELD_ITEM);
context->expect_field_result(STRING_RESULT);
break; break;
} }
case(Item_func::ISNULL_FUNC): { case(Item_func::ISNULL_FUNC): {
...@@ -6039,6 +6043,13 @@ void ndb_serialize_cond(const Item *item, void *arg) ...@@ -6039,6 +6043,13 @@ void ndb_serialize_cond(const Item *item, void *arg)
context->expect_field_result(DECIMAL_RESULT); context->expect_field_result(DECIMAL_RESULT);
break; break;
} }
case(Item_func::NOT_FUNC): {
DBUG_PRINT("info", ("NOT_FUNC"));
curr_cond->ndb_item= new Ndb_item(func_item->functype());
context->expect(Item::FUNC_ITEM);
context->expect(Item::COND_ITEM);
break;
}
case(Item_func::UNKNOWN_FUNC): { case(Item_func::UNKNOWN_FUNC): {
DBUG_PRINT("info", ("UNKNOWN_FUNC %s", DBUG_PRINT("info", ("UNKNOWN_FUNC %s",
func_item->const_item()?"const":"")); func_item->const_item()?"const":""));
...@@ -6293,7 +6304,8 @@ ha_ndbcluster::serialize_cond(const COND *cond, Ndb_cond_stack *ndb_cond) ...@@ -6293,7 +6304,8 @@ ha_ndbcluster::serialize_cond(const COND *cond, Ndb_cond_stack *ndb_cond)
int int
ha_ndbcluster::build_scan_filter_predicate(Ndb_cond * &cond, ha_ndbcluster::build_scan_filter_predicate(Ndb_cond * &cond,
NdbScanFilter *filter) NdbScanFilter *filter,
bool negated)
{ {
DBUG_ENTER("build_scan_filter_predicate"); DBUG_ENTER("build_scan_filter_predicate");
switch(cond->ndb_item->type) { switch(cond->ndb_item->type) {
...@@ -6301,7 +6313,10 @@ ha_ndbcluster::build_scan_filter_predicate(Ndb_cond * &cond, ...@@ -6301,7 +6313,10 @@ ha_ndbcluster::build_scan_filter_predicate(Ndb_cond * &cond,
if (!cond->next) if (!cond->next)
break; break;
Ndb_item *a= cond->next->ndb_item; Ndb_item *a= cond->next->ndb_item;
switch(cond->ndb_item->qualification.function_type) { switch((negated) ?
Ndb_item::negate(cond->ndb_item->qualification.function_type)
: cond->ndb_item->qualification.function_type)
{
case(Item_func::EQ_FUNC): { case(Item_func::EQ_FUNC): {
if (!cond->next->next) if (!cond->next->next)
break; break;
...@@ -6511,14 +6526,13 @@ ha_ndbcluster::build_scan_filter_predicate(Ndb_cond * &cond, ...@@ -6511,14 +6526,13 @@ ha_ndbcluster::build_scan_filter_predicate(Ndb_cond * &cond,
// Save value in right format for the field type // Save value in right format for the field type
value->save_in_field(field); value->save_in_field(field);
DBUG_PRINT("info", ("Generating LIKE filter: like(%d,%s,%d)", DBUG_PRINT("info", ("Generating LIKE filter: like(%d,%s,%d)",
field->get_field_no(), field->get_val(), field->get_field_no(), value->get_val(),
field->pack_length())); value->pack_length()));
/* if (filter->cmp(NdbScanFilter::COND_LIKE,
if (filter->like(field->get_field_no(), field->get_field_no(),
field->get_val(), field->get_val(),
field->pack_length()) == -1) field->pack_length()) == -1)
DBUG_RETURN(1); DBUG_RETURN(1);
*/
cond= cond->next->next->next; cond= cond->next->next->next;
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -6539,13 +6553,13 @@ ha_ndbcluster::build_scan_filter_predicate(Ndb_cond * &cond, ...@@ -6539,13 +6553,13 @@ ha_ndbcluster::build_scan_filter_predicate(Ndb_cond * &cond,
// Save value in right format for the field type // Save value in right format for the field type
value->save_in_field(field); value->save_in_field(field);
DBUG_PRINT("info", ("Generating NOTLIKE filter: notlike(%d,%s,%d)", DBUG_PRINT("info", ("Generating NOTLIKE filter: notlike(%d,%s,%d)",
field->get_field_no(), field->get_val(), field->get_field_no(), value->get_val(),
field->pack_length())); value->pack_length()));
/* if (filter->cmp(NdbScanFilter::COND_NOT_LIKE,
if (filter->notlike(field->get_field_no(), field->get_field_no(),
field->get_val(), field->pack_length()) == -1) field->get_val(),
field->pack_length()) == -1)
DBUG_RETURN(1); DBUG_RETURN(1);
*/
cond= cond->next->next->next; cond= cond->next->next->next;
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -6581,7 +6595,8 @@ ha_ndbcluster::build_scan_filter_predicate(Ndb_cond * &cond, ...@@ -6581,7 +6595,8 @@ ha_ndbcluster::build_scan_filter_predicate(Ndb_cond * &cond,
} }
int int
ha_ndbcluster::build_scan_filter_group(Ndb_cond* &cond, NdbScanFilter *filter) ha_ndbcluster::build_scan_filter_group(Ndb_cond* &cond, NdbScanFilter *filter,
bool negated)
{ {
DBUG_ENTER("build_scan_filter_group"); DBUG_ENTER("build_scan_filter_group");
if (!cond) DBUG_RETURN(1); if (!cond) DBUG_RETURN(1);
...@@ -6589,8 +6604,9 @@ ha_ndbcluster::build_scan_filter_group(Ndb_cond* &cond, NdbScanFilter *filter) ...@@ -6589,8 +6604,9 @@ ha_ndbcluster::build_scan_filter_group(Ndb_cond* &cond, NdbScanFilter *filter)
case(NDB_FUNCTION): case(NDB_FUNCTION):
switch(cond->ndb_item->qualification.function_type) { switch(cond->ndb_item->qualification.function_type) {
case(Item_func::COND_AND_FUNC): { case(Item_func::COND_AND_FUNC): {
DBUG_PRINT("info", ("Generating AND group")); DBUG_PRINT("info", ("Generating %s group", (negated)?"NAND":"AND"));
if (filter->begin(NdbScanFilter::AND) == -1) if ((negated) ? filter->begin(NdbScanFilter::NAND)
: filter->begin(NdbScanFilter::AND) == -1)
DBUG_RETURN(1); DBUG_RETURN(1);
cond= cond->next; cond= cond->next;
do do
...@@ -6601,12 +6617,13 @@ ha_ndbcluster::build_scan_filter_group(Ndb_cond* &cond, NdbScanFilter *filter) ...@@ -6601,12 +6617,13 @@ ha_ndbcluster::build_scan_filter_group(Ndb_cond* &cond, NdbScanFilter *filter)
if (cond) cond= cond->next; if (cond) cond= cond->next;
if (filter->end() == -1) if (filter->end() == -1)
DBUG_RETURN(1); DBUG_RETURN(1);
DBUG_PRINT("info", ("End of AND group")); DBUG_PRINT("info", ("End of %s group", (negated)?"NAND":"AND"));
break; break;
} }
case(Item_func::COND_OR_FUNC): { case(Item_func::COND_OR_FUNC): {
DBUG_PRINT("info", ("Generating OR group")); DBUG_PRINT("info", ("Generating % group", (negated)?"NOR":"OR"));
if (filter->begin(NdbScanFilter::OR) == -1) if ((negated) ? filter->begin(NdbScanFilter::OR)
: filter->begin(NdbScanFilter::OR) == -1)
DBUG_RETURN(1); DBUG_RETURN(1);
cond= cond->next; cond= cond->next;
do do
...@@ -6617,11 +6634,16 @@ ha_ndbcluster::build_scan_filter_group(Ndb_cond* &cond, NdbScanFilter *filter) ...@@ -6617,11 +6634,16 @@ ha_ndbcluster::build_scan_filter_group(Ndb_cond* &cond, NdbScanFilter *filter)
if (cond) cond= cond->next; if (cond) cond= cond->next;
if (filter->end() == -1) if (filter->end() == -1)
DBUG_RETURN(1); DBUG_RETURN(1);
DBUG_PRINT("info", ("End of OR group")); DBUG_PRINT("info", ("End of %s group", (negated)?"NOR":"OR"));
break;
}
case(Item_func::NOT_FUNC): {
cond= cond->next;
build_scan_filter_group(cond, filter, true);
break; break;
} }
default: default:
if (build_scan_filter_predicate(cond, filter)) if (build_scan_filter_predicate(cond, filter, negated))
DBUG_RETURN(1); DBUG_RETURN(1);
} }
break; break;
......
...@@ -87,6 +87,27 @@ typedef union ndb_item_value { ...@@ -87,6 +87,27 @@ typedef union ndb_item_value {
NDB_ITEM_FIELD_VALUE *field_value; NDB_ITEM_FIELD_VALUE *field_value;
} NDB_ITEM_VALUE; } NDB_ITEM_VALUE;
struct negated_function_mapping
{
Item_func::Functype pos_fun;
Item_func::Functype neg_fun;
};
static const negated_function_mapping neg_map[]=
{
{Item_func::EQ_FUNC, Item_func::NE_FUNC},
{Item_func::NE_FUNC, Item_func::EQ_FUNC},
{Item_func::LT_FUNC, Item_func::GE_FUNC},
{Item_func::LE_FUNC, Item_func::GT_FUNC},
{Item_func::GT_FUNC, Item_func::LE_FUNC},
{Item_func::GE_FUNC, Item_func::LT_FUNC},
{Item_func::LIKE_FUNC, Item_func::NOTLIKE_FUNC},
{Item_func::NOTLIKE_FUNC, Item_func::LIKE_FUNC},
{Item_func::ISNULL_FUNC, Item_func::ISNOTNULL_FUNC},
{Item_func::ISNOTNULL_FUNC, Item_func::ISNULL_FUNC},
{Item_func::UNKNOWN_FUNC, Item_func::NOT_FUNC}
};
/* /*
This class is used for serialization of the Item tree for This class is used for serialization of the Item tree for
condition pushdown. It is stored in a linked list implemented condition pushdown. It is stored in a linked list implemented
...@@ -141,6 +162,10 @@ class Ndb_item { ...@@ -141,6 +162,10 @@ class Ndb_item {
uint32 pack_length() uint32 pack_length()
{ {
switch(type) { switch(type) {
case(NDB_VALUE):
if(qualification.value_type == Item::STRING_ITEM)
return value.item->str_value.length();
break;
case(NDB_FIELD): case(NDB_FIELD):
return value.field_value->field->pack_length(); return value.field_value->field->pack_length();
default: default:
...@@ -149,9 +174,27 @@ class Ndb_item { ...@@ -149,9 +174,27 @@ class Ndb_item {
return 0; return 0;
}; };
Field * get_field() { return value.field_value->field; }; Field * get_field() { return value.field_value->field; };
int get_field_no() { return value.field_value->column_no; }; int get_field_no() { return value.field_value->column_no; };
char* get_val() { return value.field_value->field->ptr; };
const char* get_val()
{
switch(type) {
case(NDB_VALUE):
if(qualification.value_type == Item::STRING_ITEM)
return value.item->str_value.ptr();
break;
case(NDB_FIELD):
return value.field_value->field->ptr;
default:
break;
}
return NULL;
};
void save_in_field(Ndb_item *field_item) void save_in_field(Ndb_item *field_item)
{ {
Field *field = field_item->value.field_value->field; Field *field = field_item->value.field_value->field;
...@@ -159,7 +202,17 @@ class Ndb_item { ...@@ -159,7 +202,17 @@ class Ndb_item {
if (item && field) if (item && field)
((Item *)item)->save_in_field(field, false); ((Item *)item)->save_in_field(field, false);
} };
static Item_func::Functype negate(Item_func::Functype fun)
{
uint i;
for (i=0;
fun != neg_map[i].pos_fun &&
neg_map[i].pos_fun != Item_func::UNKNOWN_FUNC;
i++);
return neg_map[i].neg_fun;
};
NDB_ITEM_TYPE type; NDB_ITEM_TYPE type;
NDB_ITEM_QUALIFICATION qualification; NDB_ITEM_QUALIFICATION qualification;
...@@ -478,9 +531,11 @@ class ha_ndbcluster: public handler ...@@ -478,9 +531,11 @@ class ha_ndbcluster: public handler
void cond_clear(); void cond_clear();
bool serialize_cond(const COND *cond, Ndb_cond_stack *ndb_cond); bool serialize_cond(const COND *cond, Ndb_cond_stack *ndb_cond);
int build_scan_filter_predicate(Ndb_cond* &cond, int build_scan_filter_predicate(Ndb_cond* &cond,
NdbScanFilter* filter); NdbScanFilter* filter,
bool negated= false);
int build_scan_filter_group(Ndb_cond* &cond, int build_scan_filter_group(Ndb_cond* &cond,
NdbScanFilter* filter); NdbScanFilter* filter,
bool negated= false);
int build_scan_filter(Ndb_cond* &cond, NdbScanFilter* filter); int build_scan_filter(Ndb_cond* &cond, NdbScanFilter* filter);
int generate_scan_filter(Ndb_cond_stack* cond_stack, int generate_scan_filter(Ndb_cond_stack* cond_stack,
NdbScanOperation* op); NdbScanOperation* op);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment