Commit 1fa12cdf authored by Sergei Petrunia's avatar Sergei Petrunia

MDEV-9676: RANGE-type frames for window functions

Support RANGE ... CURRENT ROW as frame's first and second bound.
parent b579a626
......@@ -460,3 +460,111 @@ part_id pk a CNT
1 8 1 3
1 9 1 2
drop table t0, t2;
#
# RANGE-type bounds
#
create table t3 (
pk int,
val int
);
insert into t3 values
(0, 1),
(1, 1),
(2, 1),
(3, 2),
(4, 2),
(5, 2),
(6, 2);
select
pk,
val,
count(val) over (order by val
range between current row and
current row)
as CNT
from t3;
pk val CNT
0 1 3
1 1 3
2 1 3
3 2 4
4 2 4
5 2 4
6 2 4
insert into t3 values
(7, 3),
(8, 3);
select
pk,
val,
count(val) over (order by val
range between current row and
current row)
as CNT
from t3;
pk val CNT
0 1 3
1 1 3
2 1 3
3 2 4
4 2 4
5 2 4
6 2 4
7 3 2
8 3 2
drop table t3;
# Now, check with PARTITION BY
create table t4 (
part_id int,
pk int,
val int
);
insert into t4 values
(1234, 100, 1),
(1234, 101, 1),
(1234, 102, 1),
(1234, 103, 2),
(1234, 104, 2),
(1234, 105, 2),
(1234, 106, 2),
(1234, 107, 3),
(1234, 108, 3),
(5678, 200, 1),
(5678, 201, 1),
(5678, 202, 1),
(5678, 203, 2),
(5678, 204, 2),
(5678, 205, 2),
(5678, 206, 2),
(5678, 207, 3),
(5678, 208, 3);
select
part_id,
pk,
val,
count(val) over (partition by part_id
order by val
range between current row and
current row)
as CNT
from t4;
part_id pk val CNT
1234 100 1 3
1234 101 1 3
1234 102 1 3
1234 103 2 4
1234 104 2 4
1234 105 2 4
1234 106 2 4
1234 107 3 2
1234 108 3 2
5678 200 1 3
5678 201 1 3
5678 202 1 3
5678 203 2 4
5678 204 2 4
5678 205 2 4
5678 206 2 4
5678 207 3 2
5678 208 3 2
drop table t4;
......@@ -303,3 +303,86 @@ from t2;
drop table t0, t2;
--echo #
--echo # RANGE-type bounds
--echo #
create table t3 (
pk int,
val int
);
insert into t3 values
(0, 1),
(1, 1),
(2, 1),
(3, 2),
(4, 2),
(5, 2),
(6, 2);
select
pk,
val,
count(val) over (order by val
range between current row and
current row)
as CNT
from t3;
insert into t3 values
(7, 3),
(8, 3);
select
pk,
val,
count(val) over (order by val
range between current row and
current row)
as CNT
from t3;
drop table t3;
--echo # Now, check with PARTITION BY
create table t4 (
part_id int,
pk int,
val int
);
insert into t4 values
(1234, 100, 1),
(1234, 101, 1),
(1234, 102, 1),
(1234, 103, 2),
(1234, 104, 2),
(1234, 105, 2),
(1234, 106, 2),
(1234, 107, 3),
(1234, 108, 3),
(5678, 200, 1),
(5678, 201, 1),
(5678, 202, 1),
(5678, 203, 2),
(5678, 204, 2),
(5678, 205, 2),
(5678, 206, 2),
(5678, 207, 3),
(5678, 208, 3);
select
part_id,
pk,
val,
count(val) over (partition by part_id
order by val
range between current row and
current row)
as CNT
from t4;
drop table t4;
......@@ -4778,17 +4778,10 @@ class Item_copy_decimal : public Item_copy
- cmp() method that compares the saved value with the current value of the
source item, and if they were not equal saves item's value into the saved
value.
*/
/*
Cached_item_XXX objects are not exactly caches. They do the following:
Each Cached_item_XXX object has
- its source item
- saved value of the source item
- cmp() method that compares the saved value with the current value of the
source item, and if they were not equal saves item's value into the saved
value.
TODO: add here:
- a way to save the new value w/o comparison
- a way to do less/equal/greater comparison
*/
class Cached_item :public Sql_alloc
......@@ -4796,7 +4789,18 @@ class Cached_item :public Sql_alloc
public:
bool null_value;
Cached_item() :null_value(0) {}
/*
Compare the cached value with the source value. If not equal, copy
the source value to the cache.
@return
true - Not equal
false - Equal
*/
virtual bool cmp(void)=0;
/* Compare the cached value with the source value, without copying */
virtual int cmp_read_only()=0;
virtual ~Cached_item(); /*line -e1509 */
};
......@@ -4808,6 +4812,7 @@ class Cached_item_str :public Cached_item
public:
Cached_item_str(THD *thd, Item *arg);
bool cmp(void);
int cmp_read_only();
~Cached_item_str(); // Deallocate String:s
};
......@@ -4819,6 +4824,7 @@ class Cached_item_real :public Cached_item
public:
Cached_item_real(Item *item_par) :item(item_par),value(0.0) {}
bool cmp(void);
int cmp_read_only();
};
class Cached_item_int :public Cached_item
......@@ -4828,6 +4834,7 @@ class Cached_item_int :public Cached_item
public:
Cached_item_int(Item *item_par) :item(item_par),value(0) {}
bool cmp(void);
int cmp_read_only();
};
......@@ -4838,6 +4845,7 @@ class Cached_item_decimal :public Cached_item
public:
Cached_item_decimal(Item *item_par);
bool cmp(void);
int cmp_read_only();
};
class Cached_item_field :public Cached_item
......@@ -4854,6 +4862,7 @@ class Cached_item_field :public Cached_item
buff= (uchar*) thd_calloc(thd, length= field->pack_length());
}
bool cmp(void);
int cmp_read_only();
};
class Item_default_value : public Item_field
......
......@@ -98,6 +98,25 @@ bool Cached_item_str::cmp(void)
return tmp;
}
int Cached_item_str::cmp_read_only()
{
String *res= item->val_str(&tmp_value);
if (null_value)
{
if (item->null_value)
return 0;
else
return -1;
}
if (item->null_value)
return 1;
return sortcmp(&value, res, item->collation.collation);
}
Cached_item_str::~Cached_item_str()
{
item=0; // Safety
......@@ -115,6 +134,23 @@ bool Cached_item_real::cmp(void)
return FALSE;
}
int Cached_item_real::cmp_read_only()
{
double nr= item->val_real();
if (null_value)
{
if (item->null_value)
return 0;
else
return -1;
}
if (item->null_value)
return 1;
return (nr == value)? 0 : ((nr < value)? 1: -1);
}
bool Cached_item_int::cmp(void)
{
longlong nr=item->val_int();
......@@ -128,6 +164,22 @@ bool Cached_item_int::cmp(void)
}
int Cached_item_int::cmp_read_only()
{
longlong nr= item->val_int();
if (null_value)
{
if (item->null_value)
return 0;
else
return -1;
}
if (item->null_value)
return 1;
return (nr == value)? 0 : ((nr < value)? 1: -1);
}
bool Cached_item_field::cmp(void)
{
bool tmp= FALSE; // Value is identical
......@@ -148,6 +200,22 @@ bool Cached_item_field::cmp(void)
}
int Cached_item_field::cmp_read_only()
{
if (null_value)
{
if (field->is_null())
return 0;
else
return -1;
}
if (field->is_null())
return 1;
return field->cmp(buff);
}
Cached_item_decimal::Cached_item_decimal(Item *it)
:item(it)
{
......@@ -174,3 +242,20 @@ bool Cached_item_decimal::cmp()
return FALSE;
}
int Cached_item_decimal::cmp_read_only()
{
my_decimal tmp;
my_decimal *ptmp= item->val_decimal(&tmp);
if (null_value)
{
if (item->null_value)
return 0;
else
return -1;
}
if (item->null_value)
return 1;
return my_decimal_cmp(&value, ptmp);
}
......@@ -221,6 +221,15 @@ class Rowid_seq_cursor
}
protected:
bool at_eof() { return (cache_pos == cache_end); }
uchar *get_last_rowid()
{
if (cache_pos == cache_start)
return NULL;
else
return cache_pos - ref_length;
}
uchar *get_curr_rowid() { return cache_pos; }
};
......@@ -259,6 +268,18 @@ class Table_read_cursor : public Rowid_seq_cursor
return res;
}
bool restore_last_row()
{
uchar *p;
if ((p= get_last_rowid()))
{
int rc= read_record->table->file->ha_rnd_pos(read_record->record, p);
if (!rc)
return true;
}
return false;
}
// todo: should move_to() also read row here?
};
......@@ -294,6 +315,19 @@ class Group_bound_tracker
return true;
return false;
}
int compare_with_cache()
{
List_iterator<Cached_item> li(group_fields);
Cached_item *ptr;
int res;
while ((ptr= li++))
{
if ((res= ptr->cmp_read_only()))
return res;
}
return 0;
}
};
......@@ -316,7 +350,8 @@ class Frame_cursor : public Sql_alloc
{
public:
virtual void init(THD *thd, READ_RECORD *info,
SQL_I_List<ORDER> *partition_list)
SQL_I_List<ORDER> *partition_list,
SQL_I_List<ORDER> *order_list)
{}
/*
......@@ -335,18 +370,178 @@ class Frame_cursor : public Sql_alloc
- The callee may move tbl->file and tbl->record[0] to point to some other
row.
*/
virtual void next_partition(bool first, Item_sum* item)=0;
virtual void pre_next_partition(longlong rownum, Item_sum* item){};
virtual void next_partition(longlong rownum, Item_sum* item)=0;
/*
The current row has moved one row forward.
Move this frame bound accordingly, and update the value of aggregate
function as necessary.
*/
virtual void pre_next_row(Item_sum* item){};
virtual void next_row(Item_sum* item)=0;
virtual ~Frame_cursor(){}
};
//
// RANGE-type frames
//
/*
RANGE BETWEEN ... AND CURRENT ROW
This is a bottom endpoint of RANGE-CURRENT ROW frame.
It moves ahead of the current_row. It is located just in front of the first
peer of the currrent_row.
*/
class Frame_range_current_row_bottom: public Frame_cursor
{
Table_read_cursor cursor;
Group_bound_tracker peer_tracker;
bool dont_move;
public:
void init(THD *thd, READ_RECORD *info,
SQL_I_List<ORDER> *partition_list,
SQL_I_List<ORDER> *order_list)
{
cursor.init(info);
peer_tracker.init(thd, order_list);
}
void pre_next_partition(longlong rownum, Item_sum* item)
{
// Save the value of the current_row
peer_tracker.check_if_next_group();
if (rownum != 0)
item->add(); // current row is in
}
void next_partition(longlong rownum, Item_sum* item)
{
walk_till_non_peer(item);
}
void pre_next_row(Item_sum* item)
{
// Check if our cursor is pointing at a peer of the current row.
// If not, move forward until that becomes true
dont_move= !peer_tracker.check_if_next_group();
if (!dont_move)
item->add();
}
// New condition: this now assumes that table's current
// row is pointing to the current_row's position
void next_row(Item_sum* item)
{
// Check if our cursor is pointing at a peer of the current row.
// If not, move forward until that becomes true
if (dont_move)
{
/*
Our current is not a peer of the current row.
No need to move the bound.
*/
return;
}
walk_till_non_peer(item);
}
private:
void walk_till_non_peer(Item_sum* item)
{
/*
Walk forward until we've met first row that's not a peer of the current
row
*/
while (!cursor.get_next())
{
if (peer_tracker.compare_with_cache())
break;
item->add();
}
}
};
/*
RANGE BETWEEN CURRENT ROW AND ...
This is a top endpoint of RANGE-CURRENT ROW frame.
It moves behind the current_row. It is located right after the first peer of
the current_row.
*/
class Frame_range_current_row_top : public Frame_cursor
{
Group_bound_tracker bound_tracker;
Table_read_cursor cursor;
Group_bound_tracker peer_tracker;
bool move;
bool at_partition_start;
public:
void init(THD *thd, READ_RECORD *info,
SQL_I_List<ORDER> *partition_list,
SQL_I_List<ORDER> *order_list)
{
bound_tracker.init(thd, partition_list);
cursor.init(info);
peer_tracker.init(thd, order_list);
}
void pre_next_partition(longlong rownum, Item_sum* item)
{
// fetch the value from the first row
peer_tracker.check_if_next_group();
}
void next_partition(longlong rownum, Item_sum* item)
{
at_partition_start= true;
cursor.move_to(rownum+1);
}
void pre_next_row(Item_sum* item)
{
// Check if our current row is pointing to a peer of the current row.
// If not, move forward until that becomes true.
move= peer_tracker.check_if_next_group();
}
void next_row(Item_sum* item)
{
bool was_at_partition_start= at_partition_start;
at_partition_start= false;
if (move)
{
if (!was_at_partition_start &&
cursor.restore_last_row())
{
item->remove();
}
do
{
if (cursor.get_next())
return;
if (!peer_tracker.compare_with_cache())
return;
item->remove();
}
while (1);
}
}
};
//////////////////////////////////////////////////////////////////////////////////
/*
UNBOUNDED PRECEDING frame bound
......@@ -354,7 +549,7 @@ class Frame_cursor : public Sql_alloc
class Frame_unbounded_preceding : public Frame_cursor
{
public:
void next_partition(bool first, Item_sum* item)
void next_partition(longlong rownum, Item_sum* item)
{
/*
UNBOUNDED PRECEDING frame end just stays on the first row.
......@@ -378,15 +573,16 @@ class Frame_unbounded_following : public Frame_cursor
Group_bound_tracker bound_tracker;
public:
void init(THD *thd, READ_RECORD *info, SQL_I_List<ORDER> *partition_list)
void init(THD *thd, READ_RECORD *info, SQL_I_List<ORDER> *partition_list,
SQL_I_List<ORDER> *order_list)
{
cursor.init(info);
bound_tracker.init(thd, partition_list);
}
void next_partition(bool first, Item_sum* item)
void next_partition(longlong rownum, Item_sum* item)
{
if (first)
if (!rownum)
{
/* Read the first row */
if (cursor.get_next())
......@@ -407,7 +603,7 @@ class Frame_unbounded_following : public Frame_cursor
void next_row(Item_sum* item)
{
/* Do nothing, UNBOUNDED FOLLOWING frame end doesn't */
/* Do nothing, UNBOUNDED FOLLOWING frame end doesn't move */
}
};
......@@ -436,7 +632,8 @@ class Frame_n_rows : public Frame_cursor
is_top_bound(is_top_bound_arg), n_rows(n_rows_arg), is_preceding(is_preceding_arg)
{}
void init(THD *thd, READ_RECORD *info, SQL_I_List<ORDER> *partition_list)
void init(THD *thd, READ_RECORD *info, SQL_I_List<ORDER> *partition_list,
SQL_I_List<ORDER> *order_list)
{
cursor.init(info);
cursor_eof= false;
......@@ -444,14 +641,14 @@ class Frame_n_rows : public Frame_cursor
bound_tracker.init(thd, partition_list);
}
void next_partition(bool first, Item_sum* item)
void next_partition(longlong rownum, Item_sum* item)
{
cursor_eof= false;
at_partition_start= true;
at_partition_end= false;
if (is_preceding)
{
if (!first)
if (rownum != 0)
{
/*
The cursor in "ROWS n PRECEDING" lags behind by n_rows rows.
......@@ -474,9 +671,10 @@ class Frame_n_rows : public Frame_cursor
*/
n_rows_to_skip= 0;
if (!first && (!is_top_bound || n_rows))
if ((rownum != 0) && (!is_top_bound || n_rows))
{
// We are positioned at the first row in the partition:
// We are positioned at the first row in the partition anyway
//cursor.restore_cur_row();
if (is_top_bound) // this is frame top endpoint
item->remove();
else
......@@ -485,13 +683,18 @@ class Frame_n_rows : public Frame_cursor
/*
Note: i_end=-1 when this is a top-endpoint "CURRENT ROW" which is
implemented as "ROWS 0 FOLLOWING".
*/
longlong i_end= n_rows + (first?1:0)- is_top_bound;
*/
longlong i_end= n_rows + ((rownum==0)?1:0)- is_top_bound;
for (longlong i= 0; i < i_end; i++)
{
if (next_row_intern(item))
break;
}
if (i_end == -1)
{
if (!cursor.get_next())
bound_tracker.check_if_next_group();
}
}
}
......@@ -547,29 +750,49 @@ class Frame_current_row : public Frame_n_rows
};
Frame_cursor *get_frame_cursor(Window_frame_bound *bound, bool is_top_bound)
Frame_cursor *get_frame_cursor(Window_frame *frame, bool is_top_bound)
{
Window_frame_bound *bound= is_top_bound? frame->top_bound :
frame->bottom_bound;
if (bound->precedence_type == Window_frame_bound::PRECEDING ||
bound->precedence_type == Window_frame_bound::FOLLOWING)
bound->precedence_type == Window_frame_bound::FOLLOWING)
{
bool is_preceding= (bound->precedence_type ==
Window_frame_bound::PRECEDING);
if (bound->offset == NULL) /* this is UNBOUNDED */
{
/* The following serve both RANGE and ROWS: */
if (is_preceding)
return new Frame_unbounded_preceding;
else
return new Frame_unbounded_following;
}
longlong n_rows= bound->offset->val_int();
return new Frame_n_rows(is_top_bound, is_preceding, n_rows);
if (frame->units == Window_frame::UNITS_ROWS)
{
longlong n_rows= bound->offset->val_int();
return new Frame_n_rows(is_top_bound, is_preceding, n_rows);
}
else
{
// todo: Frame_range_n_rows here .
DBUG_ASSERT(0);
}
}
if (bound->precedence_type == Window_frame_bound::CURRENT)
{
return new Frame_current_row(is_top_bound);
if (frame->units == Window_frame::UNITS_ROWS)
return new Frame_current_row(is_top_bound);
else
{
if (is_top_bound)
return new Frame_range_current_row_top;
else
return new Frame_range_current_row_bottom;
}
}
return NULL;
}
......@@ -624,71 +847,63 @@ bool compute_window_func_with_frames(Item_window_func *item_win,
sum_func->set_aggregator(Aggregator::SIMPLE_AGGREGATOR);
Window_frame *window_frame= item_win->window_spec->window_frame;
DBUG_ASSERT(window_frame->units == Window_frame::UNITS_ROWS);
top_bound= get_frame_cursor(window_frame->top_bound, true);
bottom_bound= get_frame_cursor(window_frame->bottom_bound, false);
top_bound= get_frame_cursor(window_frame, true);
bottom_bound= get_frame_cursor(window_frame, false);
top_bound->init(thd, info, &item_win->window_spec->partition_list);
bottom_bound->init(thd, info, &item_win->window_spec->partition_list);
top_bound->init(thd, info, &item_win->window_spec->partition_list,
&item_win->window_spec->order_list);
bottom_bound->init(thd, info, &item_win->window_spec->partition_list,
&item_win->window_spec->order_list);
bool is_error= false;
bool first_row= true;
longlong rownum= 0;
uchar *rowid_buf= (uchar*) my_malloc(tbl->file->ref_length, MYF(0));
while (true)
{
if (first_row)
{
/* Start the first partition */
sum_func->clear();
bottom_bound->next_partition(true, sum_func);
top_bound->next_partition(true, sum_func);
}
else
{
/* These can write into tbl->record[0] */
bottom_bound->next_row(sum_func);
top_bound->next_row(sum_func);
}
/* Move the current_row */
if ((err=info->read_record(info)))
{
/* End of file */
break;
break; /* End of file */
}
store_record(tbl,record[1]);
bool partition_changed= (item_win->check_partition_bound() > -1)? true:
false;
if (!first_row && partition_changed)
tbl->file->position(tbl->record[0]);
memcpy(rowid_buf, tbl->file->ref, tbl->file->ref_length);
/* Adjust partition bounds */
if (partition_changed || (rownum == 0))
{
/* Start the first partition */
sum_func->clear();
tbl->file->position(tbl->record[0]);
memcpy(rowid_buf, tbl->file->ref, tbl->file->ref_length);
/*
Ok, the current row is the first row in the new partition.
bottom_bound->pre_next_partition(rownum, sum_func);
top_bound->pre_next_partition(rownum, sum_func);
/*
We move bottom_bound first, because we want rows to be added into the
aggregate before top_bound attempts to remove them.
*/
bottom_bound->next_partition(false, sum_func);
/*
The problem is, the above call may have made tbl->record[0] to point to
some other record.
*/
tbl->file->ha_rnd_pos(tbl->record[0], rowid_buf);
top_bound->next_partition(false, sum_func);
bottom_bound->next_partition(rownum, sum_func);
top_bound->next_partition(rownum, sum_func);
}
else
{
bottom_bound->pre_next_row(sum_func);
top_bound->pre_next_row(sum_func);
/*
The same problem again. The above call may have moved table's current
record. We need to make the current row current, so that ha_update_row
call below updates the right row.
*/
tbl->file->ha_rnd_pos(tbl->record[0], rowid_buf);
/* These can write into tbl->record[0] */
bottom_bound->next_row(sum_func);
top_bound->next_row(sum_func);
}
first_row= false;
/* Read the current row and update it */
rownum++;
/*
The bounds may have made tbl->record[0] to point to some record other
than current_row. This applies to tbl->file's internal state, too.
Fix this by reading the current row again.
*/
tbl->file->ha_rnd_pos(tbl->record[0], rowid_buf);
store_record(tbl,record[1]);
item_win->save_in_field(item_win->result_field, true);
err= tbl->file->ha_update_row(tbl->record[1], tbl->record[0]);
if (err && err != HA_ERR_RECORD_IS_THE_SAME)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment