Commit e859c2db authored by Sergei Petrunia's avatar Sergei Petrunia

MDEV-9676: RANGE-type frames for window functions

Add support for "RANGE n PRECEDING|FOLLOWING" frame bounds.
- n is currently limited to whatever Item and Item_sum_plus/minus
  can handle (i.e. no DATETIME intervals).
- Didn't check NULL value handling yet.
parent 879731aa
...@@ -781,3 +781,171 @@ pk a b bit_or ...@@ -781,3 +781,171 @@ pk a b bit_or
11 2 1024 1024 11 2 1024 1024
12 2 2048 2048 12 2 2048 2048
drop table t2; drop table t2;
#
# Try RANGE PRECEDING|FOLLWING n
#
create table t1 (
part_id int,
pk int,
a int
);
insert into t1 values
(10, 1, 1),
(10, 2, 2),
(10, 3, 4),
(10, 4, 8),
(10, 5,26),
(10, 6,27),
(10, 7,40),
(10, 8,71),
(10, 9,72);
select
pk, a,
count(a) over (ORDER BY a
RANGE BETWEEN UNBOUNDED PRECEDING
AND 10 FOLLOWING) as cnt
from t1;
pk a cnt
1 1 4
2 2 4
3 4 4
4 8 4
5 26 6
6 27 6
7 40 7
8 71 9
9 72 9
select
pk, a,
count(a) over (ORDER BY a
RANGE BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING) as cnt
from t1;
pk a cnt
1 1 2
2 2 2
3 4 3
4 8 4
5 26 6
6 27 6
7 40 7
8 71 9
9 72 9
select
pk, a,
count(a) over (ORDER BY a
RANGE BETWEEN UNBOUNDED PRECEDING
AND 10 PRECEDING) as cnt
from t1;
pk a cnt
1 1 0
2 2 0
3 4 0
4 8 0
5 26 4
6 27 4
7 40 6
8 71 7
9 72 7
select
pk, a,
count(a) over (ORDER BY a
RANGE BETWEEN UNBOUNDED PRECEDING
AND 1 PRECEDING) as cnt
from t1;
pk a cnt
1 1 0
2 2 1
3 4 2
4 8 3
5 26 4
6 27 5
7 40 6
8 71 7
9 72 8
select
pk, a,
count(a) over (ORDER BY a
RANGE BETWEEN 1 PRECEDING
AND CURRENT ROW) as cnt
from t1;
pk a cnt
1 1 1
2 2 2
3 4 1
4 8 1
5 26 1
6 27 2
7 40 1
8 71 1
9 72 2
select
pk, a,
count(a) over (ORDER BY a
RANGE BETWEEN 1 FOLLOWING
AND 3 FOLLOWING) as cnt
from t1;
pk a cnt
1 1 2
2 2 1
3 4 0
4 8 0
5 26 1
6 27 0
7 40 0
8 71 1
9 72 0
insert into t1 select 22, pk, a from t1;
select
part_id, pk, a,
count(a) over (PARTITION BY part_id
ORDER BY a
RANGE BETWEEN UNBOUNDED PRECEDING
AND 10 FOLLOWING) as cnt
from t1;
part_id pk a cnt
10 1 1 4
10 2 2 4
10 3 4 4
10 4 8 4
10 5 26 6
10 6 27 6
10 7 40 7
10 8 71 9
10 9 72 9
22 1 1 4
22 2 2 4
22 3 4 4
22 4 8 4
22 5 26 6
22 6 27 6
22 7 40 7
22 8 71 9
22 9 72 9
select
pk, a,
count(a) over (PARTITION BY part_id
ORDER BY a
RANGE BETWEEN UNBOUNDED PRECEDING
AND 1 PRECEDING) as cnt
from t1;
pk a cnt
1 1 0
2 2 1
3 4 2
4 8 3
5 26 4
6 27 5
7 40 6
8 71 7
9 72 8
1 1 0
2 2 1
3 4 2
4 8 3
5 26 4
6 27 5
7 40 6
8 71 7
9 72 8
drop table t1;
...@@ -115,4 +115,3 @@ pk a b bit_or bit_and bit_xor ...@@ -115,4 +115,3 @@ pk a b bit_or bit_and bit_xor
8 2 16 208 0 192 8 2 16 208 0 192
drop table t1; drop table t1;
drop table t2; drop table t2;
...@@ -487,3 +487,86 @@ from t2; ...@@ -487,3 +487,86 @@ from t2;
drop table t2; drop table t2;
--echo #
--echo # Try RANGE PRECEDING|FOLLWING n
--echo #
create table t1 (
part_id int,
pk int,
a int
);
insert into t1 values
(10, 1, 1),
(10, 2, 2),
(10, 3, 4),
(10, 4, 8),
(10, 5,26),
(10, 6,27),
(10, 7,40),
(10, 8,71),
(10, 9,72);
select
pk, a,
count(a) over (ORDER BY a
RANGE BETWEEN UNBOUNDED PRECEDING
AND 10 FOLLOWING) as cnt
from t1;
select
pk, a,
count(a) over (ORDER BY a
RANGE BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING) as cnt
from t1;
select
pk, a,
count(a) over (ORDER BY a
RANGE BETWEEN UNBOUNDED PRECEDING
AND 10 PRECEDING) as cnt
from t1;
select
pk, a,
count(a) over (ORDER BY a
RANGE BETWEEN UNBOUNDED PRECEDING
AND 1 PRECEDING) as cnt
from t1;
# Try bottom bound
select
pk, a,
count(a) over (ORDER BY a
RANGE BETWEEN 1 PRECEDING
AND CURRENT ROW) as cnt
from t1;
select
pk, a,
count(a) over (ORDER BY a
RANGE BETWEEN 1 FOLLOWING
AND 3 FOLLOWING) as cnt
from t1;
# Try with partitions
insert into t1 select 22, pk, a from t1;
select
part_id, pk, a,
count(a) over (PARTITION BY part_id
ORDER BY a
RANGE BETWEEN UNBOUNDED PRECEDING
AND 10 FOLLOWING) as cnt
from t1;
select
pk, a,
count(a) over (PARTITION BY part_id
ORDER BY a
RANGE BETWEEN UNBOUNDED PRECEDING
AND 1 PRECEDING) as cnt
from t1;
drop table t1;
...@@ -4804,9 +4804,24 @@ class Cached_item :public Sql_alloc ...@@ -4804,9 +4804,24 @@ class Cached_item :public Sql_alloc
virtual ~Cached_item(); /*line -e1509 */ virtual ~Cached_item(); /*line -e1509 */
}; };
class Cached_item_str :public Cached_item class Cached_item_item : public Cached_item
{ {
protected:
Item *item; Item *item;
Cached_item_item(Item *arg) : item(arg) {}
public:
void fetch_value_from(Item *new_item)
{
Item *save= item;
item= new_item;
cmp();
item= save;
}
};
class Cached_item_str :public Cached_item_item
{
uint32 value_max_length; uint32 value_max_length;
String value,tmp_value; String value,tmp_value;
public: public:
...@@ -4817,30 +4832,27 @@ class Cached_item_str :public Cached_item ...@@ -4817,30 +4832,27 @@ class Cached_item_str :public Cached_item
}; };
class Cached_item_real :public Cached_item class Cached_item_real :public Cached_item_item
{ {
Item *item;
double value; double value;
public: public:
Cached_item_real(Item *item_par) :item(item_par),value(0.0) {} Cached_item_real(Item *item_par) :Cached_item_item(item_par),value(0.0) {}
bool cmp(void); bool cmp(void);
int cmp_read_only(); int cmp_read_only();
}; };
class Cached_item_int :public Cached_item class Cached_item_int :public Cached_item_item
{ {
Item *item;
longlong value; longlong value;
public: public:
Cached_item_int(Item *item_par) :item(item_par),value(0) {} Cached_item_int(Item *item_par) :Cached_item_item(item_par),value(0) {}
bool cmp(void); bool cmp(void);
int cmp_read_only(); int cmp_read_only();
}; };
class Cached_item_decimal :public Cached_item class Cached_item_decimal :public Cached_item_item
{ {
Item *item;
my_decimal value; my_decimal value;
public: public:
Cached_item_decimal(Item *item_par); Cached_item_decimal(Item *item_par);
......
...@@ -71,7 +71,7 @@ Cached_item::~Cached_item() {} ...@@ -71,7 +71,7 @@ Cached_item::~Cached_item() {}
*/ */
Cached_item_str::Cached_item_str(THD *thd, Item *arg) Cached_item_str::Cached_item_str(THD *thd, Item *arg)
:item(arg), :Cached_item_item(arg),
value_max_length(MY_MIN(arg->max_length, thd->variables.max_sort_length)), value_max_length(MY_MIN(arg->max_length, thd->variables.max_sort_length)),
value(value_max_length) value(value_max_length)
{} {}
...@@ -217,7 +217,7 @@ int Cached_item_field::cmp_read_only() ...@@ -217,7 +217,7 @@ int Cached_item_field::cmp_read_only()
Cached_item_decimal::Cached_item_decimal(Item *it) Cached_item_decimal::Cached_item_decimal(Item *it)
:item(it) :Cached_item_item(it)
{ {
my_decimal_set_zero(&value); my_decimal_set_zero(&value);
} }
......
...@@ -388,6 +388,206 @@ class Frame_cursor : public Sql_alloc ...@@ -388,6 +388,206 @@ class Frame_cursor : public Sql_alloc
// RANGE-type frames // RANGE-type frames
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
/*
Frame_range_n_top handles the top end of RANGE-type frame.
That is, it handles:
RANGE BETWEEN n PRECEDING AND ...
RANGE BETWEEN n FOLLOWING AND ...
Top of the frame doesn't need to check for partition end, since bottom will
reach it before.
*/
class Frame_range_n_top : public Frame_cursor
{
Table_read_cursor cursor;
Cached_item_item *range_expr;
Item *n_val;
Item *item_add;
const bool is_preceding;
public:
Frame_range_n_top(bool is_preceding_arg, Item *n_val_arg) :
n_val(n_val_arg), item_add(NULL), is_preceding(is_preceding_arg)
{}
void init(THD *thd, READ_RECORD *info,
SQL_I_List<ORDER> *partition_list,
SQL_I_List<ORDER> *order_list)
{
cursor.init(info);
DBUG_ASSERT(order_list->elements == 1);
Item *src_expr= order_list->first->item[0];
range_expr= (Cached_item_item*) new_Cached_item(thd, src_expr, FALSE);
if (is_preceding)
item_add= new (thd->mem_root) Item_func_minus(thd, src_expr, n_val);
else
item_add= new (thd->mem_root) Item_func_plus(thd, src_expr, n_val);
item_add->fix_fields(thd, &item_add);
}
void pre_next_partition(longlong rownum, Item_sum* item)
{
// Save the value of FUNC(current_row)
range_expr->fetch_value_from(item_add);
}
void next_partition(longlong rownum, Item_sum* item)
{
cursor.move_to(rownum);
walk_till_non_peer(item);
}
void pre_next_row(Item_sum* item)
{
range_expr->fetch_value_from(item_add);
}
void next_row(Item_sum* item)
{
/*
Ok, our cursor is at the first row R where
(prev_row + n) >= R
We need to check about the current row.
*/
if (cursor.restore_last_row())
{
if (range_expr->cmp_read_only() <= 0)
return;
item->remove();
}
walk_till_non_peer(item);
}
private:
void walk_till_non_peer(Item_sum* item)
{
while (!cursor.get_next())
{
if (range_expr->cmp_read_only() <= 0)
break;
item->remove();
}
}
};
/*
Frame_range_n_bottom handles bottom end of RANGE-type frame.
That is, it handles frame bounds in form:
RANGE BETWEEN ... AND n PRECEDING
RANGE BETWEEN ... AND n FOLLOWING
Bottom end moves first so it needs to check for partition end
(todo: unless it's PRECEDING and in that case it doesnt)
*/
class Frame_range_n_bottom: public Frame_cursor
{
Table_read_cursor cursor;
Cached_item_item *range_expr;
Item *n_val;
Item *item_add;
const bool is_preceding;
Group_bound_tracker bound_tracker;
bool end_of_partition;
public:
Frame_range_n_bottom(bool is_preceding_arg, Item *n_val_arg) :
n_val(n_val_arg), item_add(NULL), is_preceding(is_preceding_arg)
{}
void init(THD *thd, READ_RECORD *info,
SQL_I_List<ORDER> *partition_list,
SQL_I_List<ORDER> *order_list)
{
cursor.init(info);
DBUG_ASSERT(order_list->elements == 1);
Item *src_expr= order_list->first->item[0];
range_expr= (Cached_item_item*) new_Cached_item(thd, src_expr, FALSE);
if (is_preceding)
item_add= new (thd->mem_root) Item_func_minus(thd, src_expr, n_val);
else
item_add= new (thd->mem_root) Item_func_plus(thd, src_expr, n_val);
item_add->fix_fields(thd, &item_add);
bound_tracker.init(thd, partition_list);
}
void pre_next_partition(longlong rownum, Item_sum* item)
{
// Save the value of FUNC(current_row)
range_expr->fetch_value_from(item_add);
bound_tracker.check_if_next_group();
end_of_partition= false;
}
void next_partition(longlong rownum, Item_sum* item)
{
cursor.move_to(rownum);
walk_till_non_peer(item);
}
void pre_next_row(Item_sum* item)
{
if (end_of_partition)
return;
range_expr->fetch_value_from(item_add);
}
void next_row(Item_sum* item)
{
if (end_of_partition)
return;
/*
Ok, our cursor is at the first row R where
(prev_row + n) >= R
We need to check about the current row.
*/
if (cursor.restore_last_row())
{
if (range_expr->cmp_read_only() < 0)
return;
item->add();
}
walk_till_non_peer(item);
}
private:
void walk_till_non_peer(Item_sum* item)
{
int res;
while (!(res= cursor.get_next()))
{
if (bound_tracker.check_if_next_group())
{
end_of_partition= true;
break;
}
if (range_expr->cmp_read_only() < 0)
break;
item->add();
}
if (res)
end_of_partition= true;
}
};
/* /*
RANGE BETWEEN ... AND CURRENT ROW, bottom frame bound for CURRENT ROW RANGE BETWEEN ... AND CURRENT ROW, bottom frame bound for CURRENT ROW
... ...
...@@ -469,7 +669,6 @@ class Frame_range_current_row_bottom: public Frame_cursor ...@@ -469,7 +669,6 @@ class Frame_range_current_row_bottom: public Frame_cursor
item->add(); item->add();
} }
} }
}; };
...@@ -893,8 +1092,10 @@ Frame_cursor *get_frame_cursor(Window_frame *frame, bool is_top_bound) ...@@ -893,8 +1092,10 @@ Frame_cursor *get_frame_cursor(Window_frame *frame, bool is_top_bound)
} }
else else
{ {
// todo: Frame_range_n_rows here . if (is_top_bound)
DBUG_ASSERT(0); return new Frame_range_n_top(is_preceding, bound->offset);
else
return new Frame_range_n_bottom(is_preceding, bound->offset);
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment