Commit 9bd194b1 authored by Sergei Petrunia's avatar Sergei Petrunia

MDEV-9848: Window functions: reuse sorting and/or scanning

- Rename Window_funcs_computation to Window_funcs_computation_step
- Introduce Window_func_sort which invokes filesort and then
  invokes computation of all window functions that use this ordering.
- Expose Window functions' sort operations in EXPLAIN|ANALYZE FORMAT=JSON
parent e30bd913
......@@ -1412,6 +1412,11 @@ EXPLAIN
"query_block": {
"select_id": 1,
"window_functions_computation": {
"sorts": {
"filesort": {
"sort_key": "t0.a"
}
},
"temporary_table": {
"table": {
"table_name": "t0",
......@@ -1438,6 +1443,11 @@ EXPLAIN
"filesort": {
"sort_key": "t1.a",
"window_functions_computation": {
"sorts": {
"filesort": {
"sort_key": "sum(t1.b)"
}
},
"temporary_table": {
"table": {
"table_name": "t1",
......@@ -1462,6 +1472,11 @@ EXPLAIN
"query_block": {
"select_id": 1,
"window_functions_computation": {
"sorts": {
"filesort": {
"sort_key": "sum(t1.b)"
}
},
"temporary_table": {
"table": {
"table_name": "t1",
......@@ -1491,6 +1506,11 @@ EXPLAIN
"filesort": {
"sort_key": "t1.b",
"window_functions_computation": {
"sorts": {
"filesort": {
"sort_key": "t1.b"
}
},
"temporary_table": {
"table": {
"table_name": "t1",
......@@ -1580,3 +1600,97 @@ NULL NULL 24 1.0000 NULL
NULL NULL 38 1.0000 NULL
NULL NULL 42 1.0000 NULL
drop table t1;
#
# MDEV-9848: Window functions: reuse sorting and/or scanning
#
create table t1 (a int, b int, c int);
insert into t1 values
(1,3,1),
(2,2,1),
(3,1,1);
# Check using counters
flush status;
select
rank() over (partition by c order by a),
rank() over (partition by c order by b)
from t1;
rank() over (partition by c order by a) rank() over (partition by c order by b)
1 3
2 2
3 1
show status like '%sort%';
Variable_name Value
Sort_merge_passes 0
Sort_priority_queue_sorts 0
Sort_range 0
Sort_rows 6
Sort_scan 2
flush status;
select
rank() over (partition by c order by a),
rank() over (partition by c order by a)
from t1;
rank() over (partition by c order by a) rank() over (partition by c order by a)
1 1
2 2
3 3
show status like '%sort%';
Variable_name Value
Sort_merge_passes 0
Sort_priority_queue_sorts 0
Sort_range 0
Sort_rows 3
Sort_scan 1
explain format=json
select
rank() over (partition by c order by a),
rank() over (partition by c order by a)
from t1;
EXPLAIN
{
"query_block": {
"select_id": 1,
"window_functions_computation": {
"sorts": {
"filesort": {
"sort_key": "t1.c, t1.a"
}
},
"temporary_table": {
"table": {
"table_name": "t1",
"access_type": "ALL",
"rows": 3,
"filtered": 100
}
}
}
}
}
explain format=json
select
rank() over (order by a),
row_number() over (order by a)
from t1;
EXPLAIN
{
"query_block": {
"select_id": 1,
"window_functions_computation": {
"sorts": {
"filesort": {
"sort_key": "t1.a"
}
},
"temporary_table": {
"table": {
"table_name": "t1",
"access_type": "ALL",
"rows": 3,
"filtered": 100
}
}
}
}
}
drop table t1;
......@@ -1014,3 +1014,52 @@ window w1 as (partition by a order by b,pk),
drop table t1;
--echo #
--echo # MDEV-9848: Window functions: reuse sorting and/or scanning
--echo #
create table t1 (a int, b int, c int);
insert into t1 values
(1,3,1),
(2,2,1),
(3,1,1);
--echo # Check using counters
flush status;
select
rank() over (partition by c order by a),
rank() over (partition by c order by b)
from t1;
show status like '%sort%';
flush status;
select
rank() over (partition by c order by a),
rank() over (partition by c order by a)
from t1;
show status like '%sort%';
# Check using EXPLAIN FORMAT=JSON
explain format=json
select
rank() over (partition by c order by a),
rank() over (partition by c order by a)
from t1;
explain format=json
select
rank() over (order by a),
row_number() over (order by a)
from t1;
--disable_parsing
explain format=json
select
rank() over (partition by c order by a),
count(*) over (partition by c)
from t1;
--enable_parsing
drop table t1;
......@@ -885,8 +885,12 @@ void Explain_select::print_explain_json(Explain_query *query,
writer->add_member("duplicate_removal").start_object();
break;
case AGGR_OP_WINDOW_FUNCS:
{
//TODO: make print_json_members virtual?
writer->add_member("window_functions_computation").start_object();
((Explain_aggr_window_funcs*)node)->print_json_members(writer, is_analyze);
break;
}
default:
DBUG_ASSERT(0);
}
......@@ -905,15 +909,21 @@ void Explain_select::print_explain_json(Explain_query *query,
writer->end_object();
}
void Explain_aggr_filesort::init(THD *thd, Filesort *filesort)
Explain_aggr_filesort::Explain_aggr_filesort(MEM_ROOT *mem_root,
bool is_analyze,
Filesort *filesort)
: tracker(is_analyze)
{
child= NULL;
for (ORDER *ord= filesort->order; ord; ord= ord->next)
{
sort_items.push_back(ord->item[0], thd->mem_root);
sort_items.push_back(ord->item[0], mem_root);
}
filesort->tracker= &tracker;
}
void Explain_aggr_filesort::print_json_members(Json_writer *writer,
bool is_analyze)
{
......@@ -941,6 +951,23 @@ void Explain_aggr_filesort::print_json_members(Json_writer *writer,
tracker.print_json_members(writer);
}
void Explain_aggr_window_funcs::print_json_members(Json_writer *writer,
bool is_analyze)
{
Explain_aggr_filesort *srt;
List_iterator<Explain_aggr_filesort> it(sorts);
writer->add_member("sorts").start_object();
while ((srt= it++))
{
writer->add_member("filesort").start_object();
srt->print_json_members(writer, is_analyze);
writer->end_object(); // filesort
}
writer->end_object(); // sorts
}
void Explain_basic_join::print_explain_json(Explain_query *query,
Json_writer *writer,
bool is_analyze)
......
......@@ -279,19 +279,16 @@ public:
Explain_aggr_node *child;
};
class Explain_aggr_filesort : public Explain_aggr_node
class Explain_aggr_filesort : public Explain_aggr_node
{
List<Item> sort_items;
public:
enum_explain_aggr_node_type get_type() { return AGGR_OP_FILESORT; }
Filesort_tracker tracker;
Explain_aggr_filesort(bool is_analyze) : tracker(is_analyze)
{
child= NULL;
}
Explain_aggr_filesort(MEM_ROOT *mem_root, bool is_analyze,
Filesort *filesort);
void init(THD* thd, Filesort *filesort);
void print_json_members(Json_writer *writer, bool is_analyze);
};
......@@ -309,8 +306,12 @@ public:
class Explain_aggr_window_funcs : public Explain_aggr_node
{
List<Explain_aggr_filesort> sorts;
public:
enum_explain_aggr_node_type get_type() { return AGGR_OP_WINDOW_FUNCS; }
void print_json_members(Json_writer *writer, bool is_analyze);
friend class Window_funcs_computation_step;
};
/////////////////////////////////////////////////////////////////////////////
......
......@@ -451,6 +451,11 @@ public:
el= &current->next;
return current->info;
}
/* Get what calling next() would return, without moving the iterator */
inline void *peek()
{
return (*el)->info;
}
inline void *next_fast(void)
{
list_node *tmp;
......@@ -503,6 +508,10 @@ public:
{
return el == &list->last_ref()->next;
}
inline bool at_end()
{
return current == &end_of_list;
}
friend class error_list_iterator;
};
......@@ -550,6 +559,7 @@ public:
List_iterator() : base_list_iterator() {}
inline void init(List<T> &a) { base_list_iterator::init(a); }
inline T* operator++(int) { return (T*) base_list_iterator::next(); }
inline T* peek() { return (T*) base_list_iterator::peek(); }
inline T *replace(T *a) { return (T*) base_list_iterator::replace(a); }
inline T *replace(List<T> &a) { return (T*) base_list_iterator::replace(a); }
inline void rewind(void) { base_list_iterator::rewind(); }
......
......@@ -2661,7 +2661,7 @@ bool JOIN::make_aggr_tables_info()
curr_tab= join_tab + top_join_tab_count + aggr_tables - 1;
if (select_lex->window_funcs.elements)
{
curr_tab->window_funcs_step= new Window_funcs_computation;
curr_tab->window_funcs_step= new Window_funcs_computation_step;
if (curr_tab->window_funcs_step->setup(thd, &select_lex->window_funcs,
curr_tab))
DBUG_RETURN(true);
......@@ -23766,8 +23766,9 @@ void JOIN_TAB::save_explain_data(Explain_table_access *eta,
if (filesort)
{
eta->pre_join_sort= new Explain_aggr_filesort(thd->lex->analyze_stmt);
eta->pre_join_sort->init(thd, filesort);
eta->pre_join_sort= new Explain_aggr_filesort(thd->mem_root,
thd->lex->analyze_stmt,
filesort);
}
tracker= &eta->tracker;
......@@ -24183,6 +24184,8 @@ void save_agg_explain_data(JOIN *join, Explain_select *xpl_sel)
JOIN_TAB *join_tab=join->join_tab + join->top_join_tab_count;
Explain_aggr_node *prev_node;
Explain_aggr_node *node= xpl_sel->aggr_tree;
bool is_analyze= join->thd->lex->analyze_stmt;
THD *thd= join->thd;
for (uint i= 0; i < join->aggr_tables; i++, join_tab++)
{
......@@ -24193,9 +24196,15 @@ void save_agg_explain_data(JOIN *join, Explain_select *xpl_sel)
if (join_tab->window_funcs_step)
{
prev_node=node;
node= new Explain_aggr_window_funcs;
node->child= prev_node;
Explain_aggr_node *new_node=
join_tab->window_funcs_step->save_explain_plan(thd->mem_root,
is_analyze);
if (new_node)
{
prev_node=node;
node= new_node;
node->child= prev_node;
}
}
/* The below matches execution in join_init_read_record() */
......@@ -24208,10 +24217,8 @@ void save_agg_explain_data(JOIN *join, Explain_select *xpl_sel)
if (join_tab->filesort)
{
bool is_analyze= join->thd->lex->analyze_stmt;
Explain_aggr_filesort *eaf = new Explain_aggr_filesort(is_analyze);
eaf->init(join->thd, join_tab->filesort);
Explain_aggr_filesort *eaf =
new Explain_aggr_filesort(thd->mem_root, is_analyze, join_tab->filesort);
prev_node= node;
node= eaf;
node->child= prev_node;
......
......@@ -428,7 +428,7 @@ typedef struct st_join_table {
Non-NULL value means this join_tab must do window function computation
before reading.
*/
Window_funcs_computation* window_funcs_step;
Window_funcs_computation_step* window_funcs_step;
/**
List of topmost expressions in the select list. The *next* JOIN TAB
......
......@@ -440,7 +440,19 @@ int compare_window_funcs_by_window_specs(Item_window_func *win_func1,
typedef int (*Item_window_func_cmp)(Item_window_func *f1,
Item_window_func *f2,
void *arg);
/*
@brief
Sort window functions so that those that can be computed together are
adjacent.
@detail
Sort window functions by their
- required sorting order,
- partition list,
- window frame compatibility.
The changes between the groups are marked by setting item_window_func->marker.
*/
static
void order_window_funcs_by_window_specs(List<Item_window_func> *win_func_list)
......@@ -479,7 +491,9 @@ void order_window_funcs_by_window_specs(List<Item_window_func> *win_func_list)
FRAME_CHANGE_FLAG;
}
else if (win_spec_prev->partition_list != win_spec_curr->partition_list)
{
curr->marker|= PARTITION_CHANGE_FLAG | FRAME_CHANGE_FLAG;
}
}
else if (win_spec_prev->window_frame != win_spec_curr->window_frame)
curr->marker|= FRAME_CHANGE_FLAG;
......@@ -1720,14 +1734,6 @@ static ORDER* concat_order_lists(MEM_ROOT *mem_root, ORDER *list1, ORDER *list2)
bool Window_func_runner::setup(THD *thd)
{
Window_spec *spec = win_func->window_spec;
ORDER* sort_order= concat_order_lists(thd->mem_root,
spec->partition_list->first,
spec->order_list->first);
filesort= new (thd->mem_root) Filesort(sort_order, HA_POS_ERROR, NULL);
filesort->tracker= new Filesort_tracker(thd->lex->analyze_stmt);
win_func->setup_partition_border_check(thd);
Item_sum::Sumfunctype type= win_func->window_func()->sum_func();
......@@ -1770,47 +1776,88 @@ bool Window_func_runner::setup(THD *thd)
/*
Compute the value of window function for all rows.
*/
bool Window_func_runner::exec(JOIN *join)
bool Window_func_runner::exec(TABLE *tbl, SORT_INFO *filesort_result)
{
THD *thd= join->thd;
JOIN_TAB *join_tab= &join->join_tab[join->top_join_tab_count];
if (create_sort_index(thd, join, join_tab,
filesort))
return true;
THD *thd= current_thd;
win_func->set_phase_to_computation();
/*
Go through the sorted array and compute the window function
*/
/* Go through the sorted array and compute the window function */
READ_RECORD info;
TABLE *tbl= join_tab->table;
if (init_read_record(&info, thd, tbl, NULL/*select*/, join_tab->filesort_result,
if (init_read_record(&info, thd, tbl, NULL/*select*/, filesort_result,
0, 1, FALSE))
return true;
bool is_error= compute_func(win_func, tbl, &info);
/* This calls filesort_free_buffers(): */
end_read_record(&info);
delete join_tab->filesort_result;
join_tab->filesort_result= NULL;
win_func->set_phase_to_retrieval();
end_read_record(&info);
return is_error;
}
bool Window_funcs_computation::setup(THD *thd,
List<Item_window_func> *window_funcs,
JOIN_TAB *tab)
bool Window_func_sort::exec(JOIN *join)
{
List_iterator_fast<Item_window_func> it(*window_funcs);
Item_window_func *item_win;
THD *thd= join->thd;
JOIN_TAB *join_tab= &join->join_tab[join->top_join_tab_count];
if (create_sort_index(thd, join, join_tab, filesort))
return true;
TABLE *tbl= join_tab->table;
SORT_INFO *filesort_result= join_tab->filesort_result;
bool is_error= false;
List_iterator<Window_func_runner> it(runners);
Window_func_runner *runner;
while ((runner= it++))
{
if ((is_error= runner->exec(tbl, filesort_result)))
break;
}
delete join_tab->filesort_result;
join_tab->filesort_result= NULL;
return is_error;
}
bool Window_func_sort::setup(THD *thd, SQL_SELECT *sel,
List_iterator<Item_window_func> &it)
{
Item_window_func *win_func= it.peek();
Window_spec *spec = win_func->window_spec;
ORDER* sort_order= concat_order_lists(thd->mem_root,
spec->partition_list->first,
spec->order_list->first);
filesort= new (thd->mem_root) Filesort(sort_order, HA_POS_ERROR, NULL);
/* Apply the same condition that the subsequent sort has. */
filesort->select= sel;
do
{
Window_func_runner *runner;
if (!(runner= new Window_func_runner(win_func)) ||
runner->setup(thd))
{
return true;
}
runners.push_back(runner);
it++;
} while ((win_func= it.peek()) && !(win_func->marker & SORTORDER_CHANGE_FLAG));
return false;
}
bool Window_funcs_computation_step::setup(THD *thd,
List<Item_window_func> *window_funcs,
JOIN_TAB *tab)
{
order_window_funcs_by_window_specs(window_funcs);
SQL_SELECT *sel= NULL;
......@@ -1820,46 +1867,61 @@ bool Window_funcs_computation::setup(THD *thd,
DBUG_ASSERT(!sel->quick);
}
// for each window function
while ((item_win= it++))
Window_func_sort *srt;
List_iterator<Item_window_func> iter(*window_funcs);
while (iter.peek())
{
// Create a runner and call setup for it
if (!(runner= new Window_func_runner(item_win)) ||
runner->setup(thd))
if (!(srt= new Window_func_sort()) ||
srt->setup(thd, sel, iter))
{
return true;
}
/* Apply the same condition that the subsequent sort will */
runner->filesort->select= sel;
win_func_runners.push_back(runner, thd->mem_root);
win_func_sorts.push_back(srt, thd->mem_root);
}
return false;
}
bool Window_funcs_computation::exec(JOIN *join)
bool Window_funcs_computation_step::exec(JOIN *join)
{
List_iterator<Window_func_runner> it(win_func_runners);
Window_func_runner *runner;
/* Execute each runner */
while ((runner = it++))
List_iterator<Window_func_sort> it(win_func_sorts);
Window_func_sort *srt;
/* Execute each sort */
while ((srt = it++))
{
if (runner->exec(join))
if (srt->exec(join))
return true;
}
return false;
}
void Window_funcs_computation::cleanup()
void Window_funcs_computation_step::cleanup()
{
List_iterator<Window_func_runner> it(win_func_runners);
Window_func_runner *runner;
while ((runner = it++))
List_iterator<Window_func_sort> it(win_func_sorts);
Window_func_sort *srt;
while ((srt = it++))
{
srt->cleanup();
delete srt;
}
}
Explain_aggr_window_funcs*
Window_funcs_computation_step::save_explain_plan(MEM_ROOT *mem_root,
bool is_analyze)
{
Explain_aggr_window_funcs *xpl= new Explain_aggr_window_funcs;
List_iterator<Window_func_sort> it(win_func_sorts);
Window_func_sort *srt;
while ((srt = it++))
{
runner->cleanup();
delete runner;
Explain_aggr_filesort *eaf=
new Explain_aggr_filesort(mem_root, is_analyze, srt->filesort);
xpl->sorts.push_back(eaf, mem_root);
}
return xpl;
}
/////////////////////////////////////////////////////////////////////////////
......
......@@ -166,12 +166,10 @@ typedef bool (*window_compute_func_t)(Item_window_func *item_win,
class Window_func_runner : public Sql_alloc
{
Item_window_func *win_func;
/* Window function can be computed over this sorting */
Filesort *filesort;
/* The function to use for computation*/
window_compute_func_t compute_func;
public:
Window_func_runner(Item_window_func *win_func_arg) :
win_func(win_func_arg)
......@@ -181,14 +179,33 @@ public:
bool setup(THD *thd);
// This sorts and runs the window function.
bool exec(JOIN *join);
bool exec(TABLE *tbl, SORT_INFO *filesort_result);
};
/*
Represents a group of window functions that require the same sorting of
rows and so share the filesort() call.
*/
class Window_func_sort : public Sql_alloc
{
List<Window_func_runner> runners;
/* Window functions can be computed over this sorting */
Filesort *filesort;
public:
bool setup(THD *thd, SQL_SELECT *sel, List_iterator<Item_window_func> &it);
bool exec(JOIN *join);
void cleanup() { delete filesort; }
friend class Window_funcs_computation;
friend class Window_funcs_computation_step;
};
struct st_join_table;
class Explain_aggr_window_funcs;
/*
This is a "window function computation phase": a single object of this class
takes care of computing all window functions in a SELECT.
......@@ -198,12 +215,14 @@ struct st_join_table;
temporary table.
*/
class Window_funcs_computation : public Sql_alloc
class Window_funcs_computation_step : public Sql_alloc
{
List<Window_func_runner> win_func_runners;
List<Window_func_sort> win_func_sorts;
public:
bool setup(THD *thd, List<Item_window_func> *window_funcs, st_join_table *tab);
bool exec(JOIN *join);
Explain_aggr_window_funcs *save_explain_plan(MEM_ROOT *mem_root, bool is_analyze);
void cleanup();
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment