Commit f52bf920 authored by Varun Gupta's avatar Varun Gupta

MDEV-21263: Allow packed values of non-sorted fields in the sort buffer

This task deals with packing the non-sorted fields (or addon fields).
This would lead to efficient usage of the memory allocated for the sort buffer.
The changes brought by this feature are
  1) Sort buffers would have records of variable length
  2) Each record in the sort buffer would be stored like
     <sort_key1><sort_key2>....<addon_length><null_bytes><field1><field2>....
     addon_length is the extra bytes that are required to store the variable
     length of addon field across different records.
  3) Changes in rr_unpack_from_buffer and rr_from_tempfile to take into account
     the variable length of records.

Ported  WL#1509 Pack values of non-sorted fields in the sort buffer from
MySQL by Tor Didriksen
parent ded128aa
set @save_rand_seed1= @@RAND_SEED1;
set @save_rand_seed2= @@RAND_SEED2;
set @@RAND_SEED1=810763568, @@RAND_SEED2=600681772;
create table t1(a int);
insert into t1 select seq from seq_1_to_10000 order by rand();
#
# parameters:
# mean mean for the column to be considered
# max_val max_value for the column to be considered
#
# This function generate a sample of a normal distribution
# This function return a point
# of the normal distribution with a given mean.
#
CREATE FUNCTION
generate_normal_distribution_sample(mean DOUBLE, max_val DOUBLE)RETURNS DOUBLE
BEGIN
DECLARE z DOUBLE DEFAULT 0;
SET z= (rand() + rand() + rand() + rand() + rand() + rand())/6;
SET z= 2*(max_val-mean)*z;
SET z= z + mean - (max_val-mean);
return z;
END|
#
# parameters:
# len length of the random string to be generated
#
# This function generates a random string for the length passed
# as an argument with characters in the range of [A,Z]
#
CREATE FUNCTION generate_random_string(len INT) RETURNS varchar(128)
BEGIN
DECLARE str VARCHAR(256) DEFAULT '';
DECLARE x INT DEFAULT 0;
WHILE (len > 0) DO
SET x =round(rand()*25);
SET str= CONCAT(str, CHAR(65 + x));
SET len= len-1;
END WHILE;
RETURN str;
END|
#
# parameters:
# mean mean for the column to be considered
# min_val min_value for the column to be considered
# max_val max_value for the column to be considered
#
# This function generate a normal distribution sample in the range of
# [min_val, max_val]
#
CREATE FUNCTION
clipped_normal_distribution(mean DOUBLE, min_val DOUBLE, max_val DOUBLE)
RETURNS INT
BEGIN
DECLARE r DOUBLE DEFAULT 0;
WHILE 1=1 DO
set r= generate_normal_distribution_sample(mean, max_val);
IF (r >= min_val AND r <= max_val) THEN
RETURN round(r);
end if;
END WHILE;
RETURN 0;
END|
create table t2 (id INT NOT NULL, a INT, b int);
insert into t2
select a, clipped_normal_distribution(12, 0, 64),
clipped_normal_distribution(32, 0, 128)
from t1;
CREATE TABLE t3(
id INT NOT NULL,
names VARCHAR(64),
address VARCHAR(128),
PRIMARY KEY (id)
);
#
# table t3 stores random strings calculated from the length stored in
# table t2
#
insert into t3
select id, generate_random_string(a), generate_random_string(b) from t2;
#
# All records fit in memory
#
set sort_buffer_size=262144*10;
flush status;
select id DIV 100 as x,
MD5(group_concat(substring(names,1,3), substring(address,1,3)
order by substring(names,1,3), substring(address,1,3)))
FROM t3
GROUP BY x;
x MD5(group_concat(substring(names,1,3), substring(address,1,3)
order by substring(names,1,3), substring(address,1,3)))
0 c2ecc41f0e37144931dbd51c286d3169
1 65104765025a5550d2070c320c896660
2 d2d10899abcc3be7de55092b260afbfa
3 a44e5be670968b49b0cb1b5665f53cc5
4 c906c31cc6f40c63f1901d257680d71f
5 07ded386a3d789b64462eb9a9525aee1
6 760ff88f2409ccb3319c57c3a38fe7d2
7 320d65f718acf0876c0dbda053129d24
8 73ace6baf48996f86b741105957ce46b
9 cf80b0efda4108abd584ba47fea0158d
10 6f8db75fbae31d381472c16d91c1f52b
11 762315d05abf1d8589eb15238d03d0f3
12 2948a913b6dfea8499605ac342c897a1
13 789c300f6576c27d7f9ed5694b0d8fba
14 8c83fad606cb84de677520dc6fb895f2
15 37f499f1e50d6fb5ecb09e78d1e2e692
16 2a953b23c198cf922a58d5ea5e12db0e
17 ebc1145c9f4324194fc8d85d6e29ed1b
18 2662c807f21177eb6fcdaf4688641819
19 a423cee02d23022f9e2ffbd8a3627a7c
20 0418584d7634ffde4f6988d93f5e2a0f
21 f832993560565217e6dd30f8300c1c11
22 b2ffce93f4fc478e0c1ca96fd4accee3
23 85c7299ac9207a88b6fd9bb69fbf43aa
24 ab2b9e611e3a8c2f470b05552cb82a4d
25 2cb134bd2ecf3d07b702ac1f3f171a9c
26 2e60abcf5605e65075b7e7587f2294a5
27 705872095781fd7a87882606bb8ab423
28 78a7e8ab78c35dae4ddf4c4f593c8cf4
29 943c1fbf466257bb7a4d7af6b6a19165
30 691c2ce1c6d20fdcfbf65b132c80b7a0
31 4854dd9d208e0b45cb2e5b6d0fefe6c8
32 f43993c3d59c03c5cf4c8e2a1ab95533
33 4c5d5e658aba5bd8aa770a86bff05200
34 f531b17fba7abce233f95941b42aad58
35 f44e1f990acfcd7902c5743409412563
36 a7d39877de8502cce6ffbc4809749eba
37 49062ade86448805b6a222707bf45ad0
38 c94368aa13843cc892b75eb71683aaba
39 483247b82751113caab96ea183a9ca3b
40 f6cf9046d05c13ae88ba7a5676b81f89
41 1ca6c62cd730db36d49daca2d1b0ec7c
42 2b519acd4877e780e7200659d01d43af
43 eb59acd9a1bf374468b4986325ec2b83
44 20137caed121517632937703d1011c18
45 72437c79e41164c2b759702cbb36b179
46 3c5479be06f2262e8867e967299bbb3b
47 2084e6be3e7b18019bd874cded1c8bd6
48 efc7de387fa2399a35a59318052a06f4
49 8a734e334febc6f2ca45db6b846be5d4
50 f53fafc0e8d431be62de75e0f16ee745
51 53a0f5f1b88776c84fe22e2c3254444c
52 9b53bc61795233850e5d958a9ba415f5
53 a703edddf3cedbca2d331c96199cf663
54 0e05181a4a223f43e36742021ac012ad
55 04dad2a66800104ed8c7fad8c05135a6
56 913795d9c8c3ddb865b81039a662ba0f
57 fd84229b981c0dcf9f2a79e9d94cf154
58 9ae667ec6ab7c2eb0bf5a06807042e59
59 2e445d3921c94fe2faf400708ea83e8b
60 dcd5f407001d00f8224bb1449254b3a3
61 1aaacf09ce7209ed23fa0875be09a133
62 e5face2b007eeaaa837895a6e32c63f0
63 43a38c4b0243555e9cf655ce60c8403a
64 6b3ca212cd7cf131f4bd8bdd9085618b
65 ffda0a086d4259b9c48bd26391f7ebaf
66 1f392bbb0662c3e1cb97073a5e119570
67 33c8041b5a8341f1b117e40b2f8bd0ee
68 b203f57d48d584ff5479f76ce76e6eba
69 72a6172c2b14abbf64ab74a27f03fc89
70 12a05415f69f217712ac65a0858ddfdc
71 8bdd033aa314600e78585dc42a88c28b
72 b2b70633781346cfa203e243319f6409
73 cb158a725e4f3f1ef6f2053751db07d0
74 82f3963cfebe83618ea21af5fd38d164
75 cfb9e6e451c6855e1a7dce78d7b4cc5a
76 eab0e37d16bbc78b21e234767f5056eb
77 c76407fe3c50d02fba700da0011167cc
78 1339da94058abc3fcfcf61a81e1597b7
79 3c58d27f69d6e927bd0cb1b16bdb30ba
80 1e500d97a85b3dd3a0b4048fe6ad54ae
81 d03d3e0bc34d1aec0569567d0ca25365
82 55d7ddafa551c8b7e0f6aec128ca6442
83 8ee668de06a360b26af96902edd1c12f
84 3b8914e6a79a67037057781cf25b6825
85 37039d22b6e3fb8e26c29eb459ca81ae
86 707da7bd7982b4ba685981bf32b20103
87 c3bf51c8c12289d200c2aa0532b3b6c3
88 5a20abf3322051f8cbc5c0d1d96724e1
89 1e3e3e730c8447afb79331f6740d379d
90 0414da13cd7ac8cc53ed97f9c12f52a8
91 b862c11cc516d750ccac6e10436c6746
92 0e7febc44370fd57a370d3fbdd9bf16c
93 85488f13dd47b028a9db94e136eae19d
94 f9605bb472f52d1775e669b86691e848
95 33b4d6bc8f327e1a48328d5f5071d7e7
96 917e41d300faaedfb5d1a67eafa85ef4
97 b7764a86130d50379833b9606460c2d2
98 f1b5d819e4441898a6278b99859a1774
99 1a4bcfaa64467f4bc03f0f00b27cf063
100 cbd5cef546acc0bfa33117d7c1ae58a8
show status like '%sort%';
Variable_name Value
Sort_merge_passes 0
Sort_priority_queue_sorts 0
Sort_range 0
Sort_rows 10000
Sort_scan 1
set sort_buffer_size=default;
#
# Test for merge_many_buff
#
set sort_buffer_size=32768;
flush status;
select id DIV 100 as x,
MD5(group_concat(substring(names,1,3), substring(address,1,3)
order by substring(names,1,3), substring(address,1,3)))
FROM t3
GROUP BY x;
x MD5(group_concat(substring(names,1,3), substring(address,1,3)
order by substring(names,1,3), substring(address,1,3)))
0 c2ecc41f0e37144931dbd51c286d3169
1 65104765025a5550d2070c320c896660
2 d2d10899abcc3be7de55092b260afbfa
3 a44e5be670968b49b0cb1b5665f53cc5
4 c906c31cc6f40c63f1901d257680d71f
5 07ded386a3d789b64462eb9a9525aee1
6 760ff88f2409ccb3319c57c3a38fe7d2
7 320d65f718acf0876c0dbda053129d24
8 73ace6baf48996f86b741105957ce46b
9 cf80b0efda4108abd584ba47fea0158d
10 6f8db75fbae31d381472c16d91c1f52b
11 762315d05abf1d8589eb15238d03d0f3
12 2948a913b6dfea8499605ac342c897a1
13 789c300f6576c27d7f9ed5694b0d8fba
14 8c83fad606cb84de677520dc6fb895f2
15 37f499f1e50d6fb5ecb09e78d1e2e692
16 2a953b23c198cf922a58d5ea5e12db0e
17 ebc1145c9f4324194fc8d85d6e29ed1b
18 2662c807f21177eb6fcdaf4688641819
19 a423cee02d23022f9e2ffbd8a3627a7c
20 0418584d7634ffde4f6988d93f5e2a0f
21 f832993560565217e6dd30f8300c1c11
22 b2ffce93f4fc478e0c1ca96fd4accee3
23 85c7299ac9207a88b6fd9bb69fbf43aa
24 ab2b9e611e3a8c2f470b05552cb82a4d
25 2cb134bd2ecf3d07b702ac1f3f171a9c
26 2e60abcf5605e65075b7e7587f2294a5
27 705872095781fd7a87882606bb8ab423
28 78a7e8ab78c35dae4ddf4c4f593c8cf4
29 943c1fbf466257bb7a4d7af6b6a19165
30 691c2ce1c6d20fdcfbf65b132c80b7a0
31 4854dd9d208e0b45cb2e5b6d0fefe6c8
32 f43993c3d59c03c5cf4c8e2a1ab95533
33 4c5d5e658aba5bd8aa770a86bff05200
34 f531b17fba7abce233f95941b42aad58
35 f44e1f990acfcd7902c5743409412563
36 a7d39877de8502cce6ffbc4809749eba
37 49062ade86448805b6a222707bf45ad0
38 c94368aa13843cc892b75eb71683aaba
39 483247b82751113caab96ea183a9ca3b
40 f6cf9046d05c13ae88ba7a5676b81f89
41 1ca6c62cd730db36d49daca2d1b0ec7c
42 2b519acd4877e780e7200659d01d43af
43 eb59acd9a1bf374468b4986325ec2b83
44 20137caed121517632937703d1011c18
45 72437c79e41164c2b759702cbb36b179
46 3c5479be06f2262e8867e967299bbb3b
47 2084e6be3e7b18019bd874cded1c8bd6
48 efc7de387fa2399a35a59318052a06f4
49 8a734e334febc6f2ca45db6b846be5d4
50 f53fafc0e8d431be62de75e0f16ee745
51 53a0f5f1b88776c84fe22e2c3254444c
52 9b53bc61795233850e5d958a9ba415f5
53 a703edddf3cedbca2d331c96199cf663
54 0e05181a4a223f43e36742021ac012ad
55 04dad2a66800104ed8c7fad8c05135a6
56 913795d9c8c3ddb865b81039a662ba0f
57 fd84229b981c0dcf9f2a79e9d94cf154
58 9ae667ec6ab7c2eb0bf5a06807042e59
59 2e445d3921c94fe2faf400708ea83e8b
60 dcd5f407001d00f8224bb1449254b3a3
61 1aaacf09ce7209ed23fa0875be09a133
62 e5face2b007eeaaa837895a6e32c63f0
63 43a38c4b0243555e9cf655ce60c8403a
64 6b3ca212cd7cf131f4bd8bdd9085618b
65 ffda0a086d4259b9c48bd26391f7ebaf
66 1f392bbb0662c3e1cb97073a5e119570
67 33c8041b5a8341f1b117e40b2f8bd0ee
68 b203f57d48d584ff5479f76ce76e6eba
69 72a6172c2b14abbf64ab74a27f03fc89
70 12a05415f69f217712ac65a0858ddfdc
71 8bdd033aa314600e78585dc42a88c28b
72 b2b70633781346cfa203e243319f6409
73 cb158a725e4f3f1ef6f2053751db07d0
74 82f3963cfebe83618ea21af5fd38d164
75 cfb9e6e451c6855e1a7dce78d7b4cc5a
76 eab0e37d16bbc78b21e234767f5056eb
77 c76407fe3c50d02fba700da0011167cc
78 1339da94058abc3fcfcf61a81e1597b7
79 3c58d27f69d6e927bd0cb1b16bdb30ba
80 1e500d97a85b3dd3a0b4048fe6ad54ae
81 d03d3e0bc34d1aec0569567d0ca25365
82 55d7ddafa551c8b7e0f6aec128ca6442
83 8ee668de06a360b26af96902edd1c12f
84 3b8914e6a79a67037057781cf25b6825
85 37039d22b6e3fb8e26c29eb459ca81ae
86 707da7bd7982b4ba685981bf32b20103
87 c3bf51c8c12289d200c2aa0532b3b6c3
88 5a20abf3322051f8cbc5c0d1d96724e1
89 1e3e3e730c8447afb79331f6740d379d
90 0414da13cd7ac8cc53ed97f9c12f52a8
91 b862c11cc516d750ccac6e10436c6746
92 0e7febc44370fd57a370d3fbdd9bf16c
93 85488f13dd47b028a9db94e136eae19d
94 f9605bb472f52d1775e669b86691e848
95 33b4d6bc8f327e1a48328d5f5071d7e7
96 917e41d300faaedfb5d1a67eafa85ef4
97 b7764a86130d50379833b9606460c2d2
98 f1b5d819e4441898a6278b99859a1774
99 1a4bcfaa64467f4bc03f0f00b27cf063
100 cbd5cef546acc0bfa33117d7c1ae58a8
show status like '%sort%';
Variable_name Value
Sort_merge_passes 4
Sort_priority_queue_sorts 0
Sort_range 0
Sort_rows 10000
Sort_scan 1
set sort_buffer_size=default;
set @@RAND_SEED1= @save_rand_seed1;
set @@RAND_SEED2= @save_rand_seed2;
drop function generate_normal_distribution_sample;
drop function generate_random_string;
drop function clipped_normal_distribution;
drop table t1, t2, t3;
--source include/big_test.inc
--source include/have_sequence.inc
--source include/have_64bit.inc
set @save_rand_seed1= @@RAND_SEED1;
set @save_rand_seed2= @@RAND_SEED2;
set @@RAND_SEED1=810763568, @@RAND_SEED2=600681772;
create table t1(a int);
insert into t1 select seq from seq_1_to_10000 order by rand();
delimiter |;
--echo #
--echo # parameters:
--echo # mean mean for the column to be considered
--echo # max_val max_value for the column to be considered
--echo #
--echo # This function generate a sample of a normal distribution
--echo # This function return a point
--echo # of the normal distribution with a given mean.
--echo #
CREATE FUNCTION
generate_normal_distribution_sample(mean DOUBLE, max_val DOUBLE)RETURNS DOUBLE
BEGIN
DECLARE z DOUBLE DEFAULT 0;
SET z= (rand() + rand() + rand() + rand() + rand() + rand())/6;
SET z= 2*(max_val-mean)*z;
SET z= z + mean - (max_val-mean);
return z;
END|
--echo #
--echo # parameters:
--echo # len length of the random string to be generated
--echo #
--echo # This function generates a random string for the length passed
--echo # as an argument with characters in the range of [A,Z]
--echo #
CREATE FUNCTION generate_random_string(len INT) RETURNS varchar(128)
BEGIN
DECLARE str VARCHAR(256) DEFAULT '';
DECLARE x INT DEFAULT 0;
WHILE (len > 0) DO
SET x =round(rand()*25);
SET str= CONCAT(str, CHAR(65 + x));
SET len= len-1;
END WHILE;
RETURN str;
END|
--echo #
--echo # parameters:
--echo # mean mean for the column to be considered
--echo # min_val min_value for the column to be considered
--echo # max_val max_value for the column to be considered
--echo #
--echo # This function generate a normal distribution sample in the range of
--echo # [min_val, max_val]
--echo #
CREATE FUNCTION
clipped_normal_distribution(mean DOUBLE, min_val DOUBLE, max_val DOUBLE)
RETURNS INT
BEGIN
DECLARE r DOUBLE DEFAULT 0;
WHILE 1=1 DO
set r= generate_normal_distribution_sample(mean, max_val);
IF (r >= min_val AND r <= max_val) THEN
RETURN round(r);
end if;
END WHILE;
RETURN 0;
END|
delimiter ;|
create table t2 (id INT NOT NULL, a INT, b int);
insert into t2
select a, clipped_normal_distribution(12, 0, 64),
clipped_normal_distribution(32, 0, 128)
from t1;
CREATE TABLE t3(
id INT NOT NULL,
names VARCHAR(64),
address VARCHAR(128),
PRIMARY KEY (id)
);
--echo #
--echo # table t3 stores random strings calculated from the length stored in
--echo # table t2
--echo #
insert into t3
select id, generate_random_string(a), generate_random_string(b) from t2;
let $query= select id DIV 100 as x,
MD5(group_concat(substring(names,1,3), substring(address,1,3)
order by substring(names,1,3), substring(address,1,3)))
FROM t3
GROUP BY x;
--echo #
--echo # All records fit in memory
--echo #
set sort_buffer_size=262144*10;
flush status;
eval $query;
show status like '%sort%';
set sort_buffer_size=default;
--echo #
--echo # Test for merge_many_buff
--echo #
set sort_buffer_size=32768;
flush status;
eval $query;
show status like '%sort%';
set sort_buffer_size=default;
set @@RAND_SEED1= @save_rand_seed1;
set @@RAND_SEED2= @save_rand_seed2;
drop function generate_normal_distribution_sample;
drop function generate_random_string;
drop function clipped_normal_distribution;
drop table t1, t2, t3;
......@@ -57,7 +57,7 @@ class Bounded_queue
@param to Where to put the key.
@param from The input data.
*/
typedef void (*keymaker_function)(Sort_param *param,
typedef uint (*keymaker_function)(Sort_param *param,
Key_type *to,
Element_type *from);
......@@ -181,7 +181,7 @@ void Bounded_queue<Element_type, Key_type>::push(Element_type *element)
{
// Replace top element with new key, and re-order the queue.
Key_type **pq_top= reinterpret_cast<Key_type **>(queue_top(&m_queue));
(*m_keymaker)(m_sort_param, *pq_top, element);
(void)(*m_keymaker)(m_sort_param, *pq_top, element);
queue_replace_top(&m_queue);
} else {
// Insert new key into the queue.
......
......@@ -1527,6 +1527,7 @@ class Field: public Value_source
{ return length;}
virtual uint max_packed_col_length(uint max_length)
{ return max_length;}
virtual bool is_packable() { return false; }
uint offset(const uchar *record) const
{
......@@ -2139,6 +2140,7 @@ class Field_longstr :public Field_str
bool can_optimize_range(const Item_bool_func *cond,
const Item *item,
bool is_eq_func) const;
bool is_packable() { return true; }
};
/* base class for float and double and decimal (old one) */
......
......@@ -48,17 +48,17 @@ static ha_rows find_all_keys(THD *thd, Sort_param *param, SQL_SELECT *select,
ha_rows *found_rows);
static bool write_keys(Sort_param *param, SORT_INFO *fs_info,
uint count, IO_CACHE *buffer_file, IO_CACHE *tempfile);
static void make_sortkey(Sort_param *param, uchar *to, uchar *ref_pos);
static uint make_sortkey(Sort_param *param, uchar *to, uchar *ref_pos);
static void register_used_fields(Sort_param *param);
static bool save_index(Sort_param *param, uint count,
SORT_INFO *table_sort);
static uint suffix_length(ulong string_length);
static uint sortlength(THD *thd, SORT_FIELD *sortorder, uint s_length,
bool *multi_byte_charset);
static SORT_ADDON_FIELD *get_addon_fields(TABLE *table, uint sortlength,
LEX_STRING *addon_buf);
static void unpack_addon_fields(struct st_sort_addon_field *addon_field,
uchar *buff, uchar *buff_end);
bool *multi_byte_charset);
static Addon_fields *get_addon_fields(TABLE *table, uint sortlength,
uint *addon_length,
uint *m_packable_length);
static bool check_if_pq_applicable(Sort_param *param, SORT_INFO *info,
TABLE *table,
ha_rows records, size_t memory_available);
......@@ -66,7 +66,7 @@ static bool check_if_pq_applicable(Sort_param *param, SORT_INFO *info,
void Sort_param::init_for_filesort(uint sortlen, TABLE *table,
ha_rows maxrows, bool sort_positions)
{
DBUG_ASSERT(addon_field == 0 && addon_buf.length == 0);
DBUG_ASSERT(addon_fields == NULL);
sort_length= sortlen;
ref_length= table->file->ref_length;
......@@ -77,12 +77,13 @@ void Sort_param::init_for_filesort(uint sortlen, TABLE *table,
Get the descriptors of all fields whose values are appended
to sorted fields and get its total length in addon_buf.length
*/
addon_field= get_addon_fields(table, sort_length, &addon_buf);
addon_fields= get_addon_fields(table, sort_length, &addon_length,
&m_packable_length);
}
if (addon_field)
if (using_addon_fields())
{
DBUG_ASSERT(addon_buf.length < UINT_MAX32);
res_length= (uint)addon_buf.length;
DBUG_ASSERT(addon_length < UINT_MAX32);
res_length= addon_length;
}
else
{
......@@ -93,11 +94,43 @@ void Sort_param::init_for_filesort(uint sortlen, TABLE *table,
*/
sort_length+= ref_length;
}
rec_length= sort_length + (uint)addon_buf.length;
rec_length= sort_length + addon_length;
max_rows= maxrows;
}
void Sort_param::try_to_pack_addons(ulong max_length_for_sort_data)
{
if (!using_addon_fields() || // no addons, or
using_packed_addons()) // already packed
return;
if (!Addon_fields::can_pack_addon_fields(res_length))
return;
const uint sz= Addon_fields::size_of_length_field;;
if (rec_length + sz > max_length_for_sort_data)
return;
// Heuristic: skip packing if potential savings are less than 10 bytes.
if (m_packable_length < (10 + sz))
return;
SORT_ADDON_FIELD *addonf= addon_fields->begin();
for (;addonf != addon_fields->end(); ++addonf)
{
addonf->offset+= sz;
addonf->null_offset+= sz;
}
addon_fields->set_using_packed_addons(true);
m_using_packed_addons= true;
addon_length+= sz;
res_length+= sz;
rec_length+= sz;
}
/**
Sort a table.
Creates a set of pointers that can be used to read the rows
......@@ -134,7 +167,7 @@ SORT_INFO *filesort(THD *thd, TABLE *table, Filesort *filesort,
DBUG_ASSERT(thd->variables.sortbuff_size <= SIZE_T_MAX);
size_t memory_available= (size_t)thd->variables.sortbuff_size;
uint maxbuffer;
BUFFPEK *buffpek;
Merge_chunk *buffpek;
ha_rows num_rows= HA_POS_ERROR;
IO_CACHE tempfile, buffpek_pointers, *outfile;
Sort_param param;
......@@ -164,13 +197,16 @@ SORT_INFO *filesort(THD *thd, TABLE *table, Filesort *filesort,
if (subselect && subselect->filesort_buffer.is_allocated())
{
/* Reuse cache from last call */
// Reuse cache from last call
sort->filesort_buffer= subselect->filesort_buffer;
sort->buffpek= subselect->sortbuffer;
subselect->filesort_buffer.reset();
subselect->sortbuffer.str=0;
}
DBUG_ASSERT(sort->sorted_result_in_fsbuf == FALSE ||
sort->record_pointers == NULL);
outfile= &sort->io_cache;
my_b_clear(&tempfile);
......@@ -183,9 +219,8 @@ SORT_INFO *filesort(THD *thd, TABLE *table, Filesort *filesort,
&multi_byte_charset),
table, max_rows, filesort->sort_positions);
sort->addon_buf= param.addon_buf;
sort->addon_field= param.addon_field;
sort->unpack= unpack_addon_fields;
sort->addon_fields= param.addon_fields;
if (multi_byte_charset &&
!(param.tmp_buffer= (char*) my_malloc(param.sort_length,
MYF(MY_WME | MY_THREAD_SPECIFIC))))
......@@ -208,7 +243,15 @@ SORT_INFO *filesort(THD *thd, TABLE *table, Filesort *filesort,
thd->query_plan_flags|= QPLAN_FILESORT_PRIORITY_QUEUE;
status_var_increment(thd->status_var.filesort_pq_sorts_);
tracker->incr_pq_used();
param.using_pq= true;
const size_t compare_length= param.sort_length;
/*
For PQ queries (with limit) we know exactly how many pointers/records
we have in the buffer, so to simplify things, we initialize
all pointers here. (We cannot pack fields anyways, so there is no
point in doing lazy initialization).
*/
sort->init_record_pointers();
if (pq.init(param.max_rows,
true, // max_at_top
NULL, // compare_function
......@@ -223,21 +266,23 @@ SORT_INFO *filesort(THD *thd, TABLE *table, Filesort *filesort,
DBUG_ASSERT(thd->is_error());
goto err;
}
// For PQ queries (with limit) we initialize all pointers.
sort->init_record_pointers();
}
else
{
DBUG_PRINT("info", ("filesort PQ is not applicable"));
param.try_to_pack_addons(thd->variables.max_length_for_sort_data);
param.using_pq= false;
size_t min_sort_memory= MY_MAX(MIN_SORT_MEMORY,
param.sort_length*MERGEBUFF2);
set_if_bigger(min_sort_memory, sizeof(BUFFPEK*)*MERGEBUFF2);
set_if_bigger(min_sort_memory, sizeof(Merge_chunk*)*MERGEBUFF2);
while (memory_available >= min_sort_memory)
{
ulonglong keys= memory_available / (param.rec_length + sizeof(char*));
param.max_keys_per_buffer= (uint) MY_MIN(num_rows, keys);
if (sort->alloc_sort_buffer(param.max_keys_per_buffer, param.rec_length))
sort->alloc_sort_buffer(param.max_keys_per_buffer, param.rec_length);
if (sort->sort_buffer_size() > 0)
break;
size_t old_memory_available= memory_available;
memory_available= memory_available/4*3;
......@@ -258,7 +303,9 @@ SORT_INFO *filesort(THD *thd, TABLE *table, Filesort *filesort,
goto err;
param.sort_form= table;
param.end=(param.local_sortorder=filesort->sortorder)+s_length;
param.local_sortorder=
Bounds_checked_array<SORT_FIELD>(filesort->sortorder, s_length);
num_rows= find_all_keys(thd, &param, select,
sort,
&buffpek_pointers,
......@@ -287,12 +334,20 @@ SORT_INFO *filesort(THD *thd, TABLE *table, Filesort *filesort,
my_free(sort->buffpek.str);
sort->buffpek.str= 0;
}
if (param.using_addon_fields())
{
DBUG_ASSERT(sort->addon_fields);
if (!sort->addon_fields->allocate_addon_buf(param.addon_length))
goto err;
}
if (!(sort->buffpek.str=
(char *) read_buffpek_from_file(&buffpek_pointers, maxbuffer,
(uchar*) sort->buffpek.str)))
goto err;
sort->buffpek.length= maxbuffer;
buffpek= (BUFFPEK *) sort->buffpek.str;
buffpek= (Merge_chunk *) sort->buffpek.str;
close_cached_file(&buffpek_pointers);
/* Open cached file if it isn't open */
if (! my_b_inited(outfile) &&
......@@ -306,25 +361,25 @@ SORT_INFO *filesort(THD *thd, TABLE *table, Filesort *filesort,
Use also the space previously used by string pointers in sort_buffer
for temporary key storage.
*/
param.max_keys_per_buffer=((param.max_keys_per_buffer *
(param.rec_length + sizeof(char*))) /
param.rec_length - 1);
param.max_keys_per_buffer= static_cast<uint>(sort->sort_buffer_size()) /
param.rec_length;
set_if_bigger(param.max_keys_per_buffer, 1);
maxbuffer--; // Offset from 0
if (merge_many_buff(&param,
(uchar*) sort->get_sort_keys(),
if (merge_many_buff(&param, sort->get_raw_buf(),
buffpek,&maxbuffer,
&tempfile))
&tempfile))
goto err;
if (flush_io_cache(&tempfile) ||
reinit_io_cache(&tempfile,READ_CACHE,0L,0,0))
goto err;
if (merge_index(&param,
(uchar*) sort->get_sort_keys(),
sort->get_raw_buf(),
buffpek,
maxbuffer,
&tempfile,
outfile))
outfile))
goto err;
}
......@@ -339,7 +394,8 @@ SORT_INFO *filesort(THD *thd, TABLE *table, Filesort *filesort,
my_free(param.tmp_buffer);
if (!subselect || !subselect->is_uncacheable())
{
sort->free_sort_buffer();
if (!param.using_addon_fields())
sort->free_sort_buffer();
my_free(sort->buffpek.str);
}
else
......@@ -347,7 +403,7 @@ SORT_INFO *filesort(THD *thd, TABLE *table, Filesort *filesort,
/* Remember sort buffers for next subquery call */
subselect->filesort_buffer= sort->filesort_buffer;
subselect->sortbuffer= sort->buffpek;
sort->filesort_buffer.reset(); // Don't free this
sort->filesort_buffer.reset(); // Don't free this*/
}
sort->buffpek.str= 0;
......@@ -361,7 +417,7 @@ SORT_INFO *filesort(THD *thd, TABLE *table, Filesort *filesort,
my_off_t save_pos=outfile->pos_in_file;
/* For following reads */
if (reinit_io_cache(outfile,READ_CACHE,0L,0,0))
error=1;
error=1;
outfile->end_of_file=save_pos;
}
}
......@@ -490,10 +546,10 @@ uint Filesort::make_sortorder(THD *thd, JOIN *join, table_map first_table_bit)
static uchar *read_buffpek_from_file(IO_CACHE *buffpek_pointers, uint count,
uchar *buf)
{
size_t length= sizeof(BUFFPEK)*count;
size_t length= sizeof(Merge_chunk)*count;
uchar *tmp= buf;
DBUG_ENTER("read_buffpek_from_file");
if (count > UINT_MAX/sizeof(BUFFPEK))
if (count > UINT_MAX/sizeof(Merge_chunk))
return 0; /* sizeof(BUFFPEK)*count will overflow */
if (!tmp)
tmp= (uchar *)my_malloc(length, MYF(MY_WME | MY_THREAD_SPECIFIC));
......@@ -702,7 +758,8 @@ static ha_rows find_all_keys(THD *thd, Sort_param *param, SQL_SELECT *select,
handler *file;
MY_BITMAP *save_read_set, *save_write_set;
Item *sort_cond;
ha_rows retval;
ha_rows num_records= 0;
const bool packed_addon_fields= param->using_packed_addons();
DBUG_ENTER("find_all_keys");
DBUG_PRINT("info",("using: %s",
(select ? select->quick ? "ranges" : "where":
......@@ -810,23 +867,27 @@ static ha_rows find_all_keys(THD *thd, Sort_param *param, SQL_SELECT *select,
if (write_record)
{
++(*found_rows);
if (pq)
{
pq->push(ref_pos);
idx= pq->num_elements();
}
else
{
if (idx == param->max_keys_per_buffer)
if (fs_info->isfull())
{
if (write_keys(param, fs_info, idx, buffpek_pointers, tempfile))
goto err;
idx= 0;
indexpos++;
idx= 0;
indexpos++;
}
make_sortkey(param, fs_info->get_record_buffer(idx++), ref_pos);
if (idx == 0)
fs_info->init_next_record_pointer();
uchar *start_of_rec= fs_info->get_next_record_pointer();
const uint rec_sz= make_sortkey(param, start_of_rec, ref_pos);
if (packed_addon_fields && rec_sz != param->rec_length)
fs_info->adjust_next_record_pointer(rec_sz);
idx++;
}
num_records++;
}
/* It does not make sense to read more keys in case of a fatal error */
......@@ -862,11 +923,14 @@ static ha_rows find_all_keys(THD *thd, Sort_param *param, SQL_SELECT *select,
if (indexpos && idx &&
write_keys(param, fs_info, idx, buffpek_pointers, tempfile))
DBUG_RETURN(HA_POS_ERROR); /* purecov: inspected */
retval= (my_b_inited(tempfile) ?
(ha_rows) (my_b_tell(tempfile)/param->rec_length) :
idx);
DBUG_PRINT("info", ("find_all_keys return %llu", (ulonglong) retval));
DBUG_RETURN(retval);
(*found_rows)= num_records;
if (pq)
num_records= pq->num_elements();
DBUG_PRINT("info", ("find_all_keys return %llu", (ulonglong) num_records));
DBUG_RETURN(num_records);
err:
sort_form->column_bitmaps_set(save_read_set, save_write_set);
......@@ -901,36 +965,48 @@ write_keys(Sort_param *param, SORT_INFO *fs_info, uint count,
IO_CACHE *buffpek_pointers, IO_CACHE *tempfile)
{
size_t rec_length;
uchar **end;
BUFFPEK buffpek;
Merge_chunk buffpek;
DBUG_ENTER("write_keys");
rec_length= param->rec_length;
uchar **sort_keys= fs_info->get_sort_keys();
fs_info->sort_buffer(param, count);
if (!my_b_inited(tempfile) &&
open_cached_file(tempfile, mysql_tmpdir, TEMP_PREFIX, DISK_BUFFER_SIZE,
MYF(MY_WME)))
goto err; /* purecov: inspected */
DBUG_RETURN(1); /* purecov: inspected */
/* check we won't have more buffpeks than we can possibly keep in memory */
if (my_b_tell(buffpek_pointers) + sizeof(BUFFPEK) > (ulonglong)UINT_MAX)
goto err;
if (my_b_tell(buffpek_pointers) + sizeof(Merge_chunk) > (ulonglong)UINT_MAX)
DBUG_RETURN(1);
bzero(&buffpek, sizeof(buffpek));
buffpek.file_pos= my_b_tell(tempfile);
buffpek.set_file_position(my_b_tell(tempfile));
if ((ha_rows) count > param->max_rows)
count=(uint) param->max_rows; /* purecov: inspected */
buffpek.count=(ha_rows) count;
for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
if (my_b_write(tempfile, (uchar*) *sort_keys, (uint) rec_length))
goto err;
buffpek.set_rowcount(static_cast<ha_rows>(count));
const bool packed_addon_fields= param->using_packed_addons();
for (uint ix= 0; ix < count; ++ix)
{
uchar *record= fs_info->get_sorted_record(ix);
if (packed_addon_fields)
{
rec_length= param->sort_length +
Addon_fields::read_addon_length(record + param->sort_length);
}
else
rec_length= param->rec_length;
if (my_b_write(tempfile, record, rec_length))
DBUG_RETURN(1); /* purecov: inspected */
}
if (my_b_write(buffpek_pointers, (uchar*) &buffpek, sizeof(buffpek)))
goto err;
DBUG_RETURN(1);
DBUG_RETURN(0);
err:
DBUG_RETURN(1);
} /* write_keys */
......@@ -1168,14 +1244,15 @@ Type_handler_real_result::make_sort_key(uchar *to, Item *item,
/** Make a sort-key from record. */
static void make_sortkey(Sort_param *param, uchar *to, uchar *ref_pos)
static uint make_sortkey(Sort_param *param, uchar *to, uchar *ref_pos)
{
Field *field;
SORT_FIELD *sort_field;
uint length;
uchar *orig_to= to;
for (sort_field=param->local_sortorder ;
sort_field != param->end ;
for (sort_field=param->local_sortorder.begin() ;
sort_field != param->local_sortorder.end() ;
sort_field++)
{
bool maybe_null=0;
......@@ -1202,15 +1279,15 @@ static void make_sortkey(Sort_param *param, uchar *to, uchar *ref_pos)
length=sort_field->length;
while (length--)
{
*to = (uchar) (~ *to);
to++;
*to = (uchar) (~ *to);
to++;
}
}
else
to+= sort_field->length;
}
if (param->addon_field)
if (param->using_addon_fields())
{
/*
Save field values appended to sorted fields.
......@@ -1218,41 +1295,44 @@ static void make_sortkey(Sort_param *param, uchar *to, uchar *ref_pos)
In this implementation we use fixed layout for field values -
the same for all records.
*/
SORT_ADDON_FIELD *addonf= param->addon_field;
SORT_ADDON_FIELD *addonf= param->addon_fields->begin();
uchar *nulls= to;
uchar *p_len= to;
DBUG_ASSERT(addonf != 0);
const bool packed_addon_fields= param->addon_fields->using_packed_addons();
uint32 res_len= addonf->offset;
memset(nulls, 0, addonf->offset);
to+= addonf->offset;
for ( ; (field= addonf->field) ; addonf++)
for ( ; addonf != param->addon_fields->end() ; addonf++)
{
Field *field= addonf->field;
if (addonf->null_bit && field->is_null())
{
nulls[addonf->null_offset]|= addonf->null_bit;
#ifdef HAVE_valgrind
bzero(to, addonf->length);
#endif
if (!packed_addon_fields)
to+= addonf->length;
}
else
{
#ifdef HAVE_valgrind
uchar *end= field->pack(to, field->ptr);
uint length= (uint) ((to + addonf->length) - end);
DBUG_ASSERT((int) length >= 0);
if (length)
bzero(end, length);
#else
(void) field->pack(to, field->ptr);
#endif
int sz= static_cast<int>(end - to);
res_len += sz;
if (packed_addon_fields)
to+= sz;
else
to+= addonf->length;
}
to+= addonf->length;
}
if (packed_addon_fields)
Addon_fields::store_addon_length(p_len, res_len);
}
else
{
/* Save filepos last */
memcpy((uchar*) to, ref_pos, (size_t) param->ref_length);
to+= param->ref_length;
}
return;
return static_cast<uint>(to - orig_to);
}
......@@ -1265,8 +1345,8 @@ static void register_used_fields(Sort_param *param)
SORT_FIELD *sort_field;
TABLE *table=param->sort_form;
for (sort_field= param->local_sortorder ;
sort_field != param->end ;
for (sort_field= param->local_sortorder.begin() ;
sort_field != param->local_sortorder.end() ;
sort_field++)
{
Field *field;
......@@ -1281,12 +1361,14 @@ static void register_used_fields(Sort_param *param)
}
}
if (param->addon_field)
if (param->using_addon_fields())
{
SORT_ADDON_FIELD *addonf= param->addon_field;
Field *field;
for ( ; (field= addonf->field) ; addonf++)
SORT_ADDON_FIELD *addonf= param->addon_fields->begin();
for ( ; (addonf != param->addon_fields->end()) ; addonf++)
{
Field *field= addonf->field;
field->register_field_in_read_map();
}
}
else
{
......@@ -1305,16 +1387,24 @@ static bool save_index(Sort_param *param, uint count,
DBUG_ASSERT(table_sort->record_pointers == 0);
table_sort->sort_buffer(param, count);
if (param->using_addon_fields())
{
table_sort->sorted_result_in_fsbuf= TRUE;
table_sort->set_sort_length(param->sort_length);
DBUG_RETURN(0);
}
res_length= param->res_length;
offset= param->rec_length-res_length;
if (!(to= table_sort->record_pointers=
(uchar*) my_malloc(res_length*count,
MYF(MY_WME | MY_THREAD_SPECIFIC))))
DBUG_RETURN(1); /* purecov: inspected */
uchar **sort_keys= table_sort->get_sort_keys();
for (uchar **end= sort_keys+count ; sort_keys != end ; sort_keys++)
for (uint ix= 0; ix < count; ++ix)
{
memcpy(to, *sort_keys+offset, res_length);
uchar *record= table_sort->get_sorted_record(ix);
memcpy(to, record + offset, res_length);
to+= res_length;
}
DBUG_RETURN(0);
......@@ -1385,8 +1475,9 @@ static bool check_if_pq_applicable(Sort_param *param,
// The whole source set fits into memory.
if (param->max_rows < num_rows/PQ_slowness )
{
DBUG_RETURN(filesort_info->alloc_sort_buffer(param->max_keys_per_buffer,
param->rec_length) != NULL);
filesort_info->alloc_sort_buffer(param->max_keys_per_buffer,
param->rec_length);
DBUG_RETURN(filesort_info->sort_buffer_size() != 0);
}
else
{
......@@ -1398,12 +1489,13 @@ static bool check_if_pq_applicable(Sort_param *param,
// Do we have space for LIMIT rows in memory?
if (param->max_keys_per_buffer < num_available_keys)
{
DBUG_RETURN(filesort_info->alloc_sort_buffer(param->max_keys_per_buffer,
param->rec_length) != NULL);
filesort_info->alloc_sort_buffer(param->max_keys_per_buffer,
param->rec_length);
DBUG_RETURN(filesort_info->sort_buffer_size() != 0);
}
// Try to strip off addon fields.
if (param->addon_field)
if (param->addon_fields)
{
const size_t row_length=
param->sort_length + param->ref_length + sizeof(char*);
......@@ -1435,14 +1527,15 @@ static bool check_if_pq_applicable(Sort_param *param,
if (sort_merge_cost < pq_cost)
DBUG_RETURN(false);
if (filesort_info->alloc_sort_buffer(param->max_keys_per_buffer,
param->sort_length +
param->ref_length))
filesort_info->alloc_sort_buffer(param->max_keys_per_buffer,
param->sort_length + param->ref_length);
if (filesort_info->sort_buffer_size() > 0)
{
/* Make attached data to be references instead of fields. */
my_free(filesort_info->addon_field);
filesort_info->addon_field= NULL;
param->addon_field= NULL;
my_free(filesort_info->addon_fields);
filesort_info->addon_fields= NULL;
param->addon_fields= NULL;
param->res_length= param->ref_length;
param->sort_length+= param->ref_length;
......@@ -1458,12 +1551,12 @@ static bool check_if_pq_applicable(Sort_param *param,
/** Merge buffers to make < MERGEBUFF2 buffers. */
int merge_many_buff(Sort_param *param, uchar *sort_buffer,
BUFFPEK *buffpek, uint *maxbuffer, IO_CACHE *t_file)
int merge_many_buff(Sort_param *param, Sort_buffer sort_buffer,
Merge_chunk *buffpek, uint *maxbuffer, IO_CACHE *t_file)
{
uint i;
IO_CACHE t_file2,*from_file,*to_file,*temp;
BUFFPEK *lastbuff;
Merge_chunk *lastbuff;
DBUG_ENTER("merge_many_buff");
if (*maxbuffer < MERGEBUFF2)
......@@ -1483,11 +1576,11 @@ int merge_many_buff(Sort_param *param, uchar *sort_buffer,
lastbuff=buffpek;
for (i=0 ; i <= *maxbuffer-MERGEBUFF*3/2 ; i+=MERGEBUFF)
{
if (merge_buffers(param,from_file,to_file,sort_buffer,lastbuff++,
if (merge_buffers(param,from_file,to_file,sort_buffer, lastbuff++,
buffpek+i,buffpek+i+MERGEBUFF-1,0))
goto cleanup;
}
if (merge_buffers(param,from_file,to_file,sort_buffer,lastbuff++,
if (merge_buffers(param,from_file,to_file,sort_buffer, lastbuff++,
buffpek+i,buffpek+ *maxbuffer,0))
break; /* purecov: inspected */
if (flush_io_cache(to_file))
......@@ -1513,24 +1606,68 @@ int merge_many_buff(Sort_param *param, uchar *sort_buffer,
(ulong)-1 if something goes wrong
*/
ulong read_to_buffer(IO_CACHE *fromfile, BUFFPEK *buffpek,
uint rec_length)
ulong read_to_buffer(IO_CACHE *fromfile, Merge_chunk *buffpek,
Sort_param *param)
{
ulong count;
ulong length= 0;
ha_rows count;
uint rec_length= param->rec_length;
if ((count= (ulong) MY_MIN((ha_rows) buffpek->max_keys,buffpek->count)))
if ((count= MY_MIN(buffpek->max_keys(),buffpek->rowcount())))
{
length= rec_length*count;
if (unlikely(my_b_pread(fromfile, (uchar*) buffpek->base, length,
buffpek->file_pos)))
size_t bytes_to_read;
if (param->using_packed_addons())
{
count= buffpek->rowcount();
bytes_to_read= MY_MIN(buffpek->buffer_size(),
static_cast<size_t>(fromfile->end_of_file -
buffpek->file_position()));
}
else
bytes_to_read= rec_length * static_cast<size_t>(count);
if (unlikely(my_b_pread(fromfile, buffpek->buffer_start(),
bytes_to_read, buffpek->file_position())))
return ((ulong) -1);
buffpek->key=buffpek->base;
buffpek->file_pos+= length; /* New filepos */
buffpek->count-= count;
buffpek->mem_count= count;
size_t num_bytes_read;
if (param->using_packed_addons())
{
/*
The last record read is most likely not complete here.
We need to loop through all the records, reading the length fields,
and then "chop off" the final incomplete record.
*/
uchar *record= buffpek->buffer_start();
uint ix= 0;
for (; ix < count; ++ix)
{
if (record + param->sort_length + Addon_fields::size_of_length_field >
buffpek->buffer_end())
break; // Incomplete record.
uchar *plen= record + param->sort_length;
uint res_length= Addon_fields::read_addon_length(plen);
if (plen + res_length > buffpek->buffer_end())
break; // Incomplete record.
DBUG_ASSERT(res_length > 0);
record+= param->sort_length;
record+= res_length;
}
DBUG_ASSERT(ix > 0);
count= ix;
num_bytes_read= record - buffpek->buffer_start();
DBUG_PRINT("info", ("read %llu bytes of complete records",
static_cast<ulonglong>(bytes_to_read)));
}
else
num_bytes_read= bytes_to_read;
buffpek->init_current_key();
buffpek->advance_file_position(num_bytes_read); /* New filepos */
buffpek->decrement_rowcount(count);
buffpek->set_mem_count(count);
return (ulong) num_bytes_read;
}
return (length);
return 0;
} /* read_to_buffer */
......@@ -1545,25 +1682,15 @@ ulong read_to_buffer(IO_CACHE *fromfile, BUFFPEK *buffpek,
@param[in] key_length key length
*/
void reuse_freed_buff(QUEUE *queue, BUFFPEK *reuse, uint key_length)
void reuse_freed_buff(QUEUE *queue, Merge_chunk *reuse, uint key_length)
{
uchar *reuse_end= reuse->base + reuse->max_keys * key_length;
for (uint i= queue_first_element(queue);
i <= queue_last_element(queue);
i++)
{
BUFFPEK *bp= (BUFFPEK *) queue_element(queue, i);
if (bp->base + bp->max_keys * key_length == reuse->base)
{
bp->max_keys+= reuse->max_keys;
Merge_chunk *bp= (Merge_chunk *) queue_element(queue, i);
if (reuse->merge_freed_buff(bp))
return;
}
else if (bp->base == reuse_end)
{
bp->base= reuse->base;
bp->max_keys+= reuse->max_keys;
return;
}
}
DBUG_ASSERT(0);
}
......@@ -1588,8 +1715,8 @@ void reuse_freed_buff(QUEUE *queue, BUFFPEK *reuse, uint key_length)
*/
bool merge_buffers(Sort_param *param, IO_CACHE *from_file,
IO_CACHE *to_file, uchar *sort_buffer,
BUFFPEK *lastbuff, BUFFPEK *Fb, BUFFPEK *Tb,
IO_CACHE *to_file, Sort_buffer sort_buffer,
Merge_chunk *lastbuff, Merge_chunk *Fb, Merge_chunk *Tb,
int flag)
{
bool error= 0;
......@@ -1599,7 +1726,7 @@ bool merge_buffers(Sort_param *param, IO_CACHE *from_file,
ha_rows max_rows,org_max_rows;
my_off_t to_start_filepos;
uchar *strpos;
BUFFPEK *buffpek;
Merge_chunk *buffpek;
QUEUE queue;
qsort2_cmp cmp;
void *first_cmp_arg;
......@@ -1625,7 +1752,7 @@ bool merge_buffers(Sort_param *param, IO_CACHE *from_file,
uint wr_offset= flag ? offset : 0;
maxcount= (ulong) (param->max_keys_per_buffer/((uint) (Tb-Fb) +1));
to_start_filepos= my_b_tell(to_file);
strpos= sort_buffer;
strpos= sort_buffer.array();
org_max_rows=max_rows= param->max_rows;
set_if_bigger(maxcount, 1);
......@@ -1640,19 +1767,23 @@ bool merge_buffers(Sort_param *param, IO_CACHE *from_file,
cmp= get_ptr_compare(sort_length);
first_cmp_arg= (void*) &sort_length;
}
if (unlikely(init_queue(&queue, (uint) (Tb-Fb)+1, offsetof(BUFFPEK,key), 0,
if (unlikely(init_queue(&queue, (uint) (Tb-Fb)+1,
offsetof(Merge_chunk,m_current_key), 0,
(queue_compare) cmp, first_cmp_arg, 0, 0)))
DBUG_RETURN(1); /* purecov: inspected */
for (buffpek= Fb ; buffpek <= Tb ; buffpek++)
{
buffpek->base= strpos;
buffpek->max_keys= maxcount;
bytes_read= read_to_buffer(from_file, buffpek, rec_length);
buffpek->set_buffer(strpos,
strpos + (sort_buffer.size()/((uint) (Tb-Fb) +1)));
buffpek->set_max_keys(maxcount);
bytes_read= read_to_buffer(from_file, buffpek, param);
if (unlikely(bytes_read == (ulong) -1))
goto err; /* purecov: inspected */
strpos+= bytes_read;
buffpek->max_keys= buffpek->mem_count; // If less data in buffers than expected
buffpek->set_buffer_end(strpos);
// If less data in buffers than expected
buffpek->set_max_keys(buffpek->mem_count());
queue_insert(&queue, (uchar*) buffpek);
}
......@@ -1663,16 +1794,17 @@ bool merge_buffers(Sort_param *param, IO_CACHE *from_file,
Copy the first argument to unique_buff for unique removal.
Store it also in 'to_file'.
*/
buffpek= (BUFFPEK*) queue_top(&queue);
memcpy(unique_buff, buffpek->key, rec_length);
buffpek= (Merge_chunk*) queue_top(&queue);
memcpy(unique_buff, buffpek->current_key(), rec_length);
if (min_dupl_count)
memcpy(&dupl_count, unique_buff+dupl_count_ofs,
sizeof(dupl_count));
buffpek->key+= rec_length;
if (! --buffpek->mem_count)
buffpek->advance_current_key(rec_length);
buffpek->decrement_mem_count();
if (buffpek->mem_count() == 0)
{
if (unlikely(!(bytes_read= read_to_buffer(from_file, buffpek,
rec_length))))
param))))
{
(void) queue_remove_top(&queue);
reuse_freed_buff(&queue, buffpek, rec_length);
......@@ -1692,61 +1824,68 @@ bool merge_buffers(Sort_param *param, IO_CACHE *from_file,
for (;;)
{
buffpek= (BUFFPEK*) queue_top(&queue);
src= buffpek->key;
buffpek= (Merge_chunk*) queue_top(&queue);
src= buffpek->current_key();
if (cmp) // Remove duplicates
{
if (!(*cmp)(first_cmp_arg, &unique_buff,
(uchar**) &buffpek->key))
{
uchar *current_key= buffpek->current_key();
if (!(*cmp)(first_cmp_arg, &unique_buff, &current_key))
{
if (min_dupl_count)
{
{
element_count cnt;
memcpy(&cnt, (uchar *) buffpek->key+dupl_count_ofs, sizeof(cnt));
memcpy(&cnt, buffpek->current_key() + dupl_count_ofs, sizeof(cnt));
dupl_count+= cnt;
}
goto skip_duplicate;
}
if (min_dupl_count)
{
{
memcpy(unique_buff+dupl_count_ofs, &dupl_count,
sizeof(dupl_count));
}
src= unique_buff;
}
/*
Do not write into the output file if this is the final merge called
for a Unique object used for intersection and dupl_count is less
than min_dupl_count.
If the Unique object is used to intersect N sets of unique elements
then for any element:
dupl_count >= N <=> the element is occurred in each of these N sets.
*/
if (!check_dupl_count || dupl_count >= min_dupl_count)
{
if (my_b_write(to_file, src+wr_offset, wr_len))
goto err; /* purecov: inspected */
src= unique_buff;
}
if (cmp)
{
memcpy(unique_buff, (uchar*) buffpek->key, rec_length);
if (min_dupl_count)
memcpy(&dupl_count, unique_buff+dupl_count_ofs,
sizeof(dupl_count));
}
if (!--max_rows)
{
/* Nothing more to do */
goto end; /* purecov: inspected */
}
param->get_rec_and_res_len(buffpek->current_key(),
&rec_length, &res_length);
const uint bytes_to_write= (flag == 0) ? rec_length : res_length;
/*
Do not write into the output file if this is the final merge called
for a Unique object used for intersection and dupl_count is less
than min_dupl_count.
If the Unique object is used to intersect N sets of unique elements
then for any element:
dupl_count >= N <=> the element is occurred in each of these N sets.
*/
if (!check_dupl_count || dupl_count >= min_dupl_count)
{
if (my_b_write(to_file, src + wr_offset, bytes_to_write))
goto err; /* purecov: inspected */
}
if (cmp)
{
memcpy(unique_buff, buffpek->current_key(), rec_length);
if (min_dupl_count)
memcpy(&dupl_count, unique_buff+dupl_count_ofs,
sizeof(dupl_count));
}
if (!--max_rows)
{
/* Nothing more to do */
goto end; /* purecov: inspected */
}
}
skip_duplicate:
buffpek->key+= rec_length;
if (! --buffpek->mem_count)
buffpek->advance_current_key(rec_length);
buffpek->decrement_mem_count();
if (buffpek->mem_count() == 0)
{
if (unlikely(!(bytes_read= read_to_buffer(from_file, buffpek,
rec_length))))
param))))
{
(void) queue_remove_top(&queue);
reuse_freed_buff(&queue, buffpek, rec_length);
......@@ -1758,9 +1897,10 @@ bool merge_buffers(Sort_param *param, IO_CACHE *from_file,
queue_replace_top(&queue); /* Top element has been replaced */
}
}
buffpek= (BUFFPEK*) queue_top(&queue);
buffpek->base= (uchar*) sort_buffer;
buffpek->max_keys= param->max_keys_per_buffer;
buffpek= (Merge_chunk*) queue_top(&queue);
buffpek->set_buffer(sort_buffer.array(),
sort_buffer.array() + sort_buffer.size());
buffpek->set_max_keys(param->max_keys_per_buffer);
/*
As we know all entries in the buffer are unique, we only have to
......@@ -1768,16 +1908,17 @@ bool merge_buffers(Sort_param *param, IO_CACHE *from_file,
*/
if (cmp)
{
if (!(*cmp)(first_cmp_arg, &unique_buff, (uchar**) &buffpek->key))
uchar *current_key= buffpek->current_key();
if (!(*cmp)(first_cmp_arg, &unique_buff, &current_key))
{
if (min_dupl_count)
{
element_count cnt;
memcpy(&cnt, (uchar *) buffpek->key+dupl_count_ofs, sizeof(cnt));
memcpy(&cnt, buffpek->current_key() + dupl_count_ofs, sizeof(cnt));
dupl_count+= cnt;
}
buffpek->key+= rec_length;
--buffpek->mem_count;
buffpek->advance_current_key(rec_length);
buffpek->decrement_mem_count();
}
if (min_dupl_count)
......@@ -1796,45 +1937,40 @@ bool merge_buffers(Sort_param *param, IO_CACHE *from_file,
do
{
if ((ha_rows) buffpek->mem_count > max_rows)
if (buffpek->mem_count() > max_rows)
{ /* Don't write too many records */
buffpek->mem_count= (uint) max_rows;
buffpek->count= 0; /* Don't read more */
buffpek->set_mem_count(max_rows);
buffpek->set_rowcount(0); /* Don't read more */
}
max_rows-= buffpek->mem_count;
if (flag == 0)
max_rows-= buffpek->mem_count();
for (uint ix= 0; ix < buffpek->mem_count(); ++ix)
{
if (my_b_write(to_file, (uchar*) buffpek->key,
(size_t)(rec_length*buffpek->mem_count)))
goto err; /* purecov: inspected */
}
else
{
uchar *end;
src= buffpek->key+offset;
for (end= src+buffpek->mem_count*rec_length ;
src != end ;
src+= rec_length)
param->get_rec_and_res_len(buffpek->current_key(),
&rec_length, &res_length);
const uint bytes_to_write= (flag == 0) ? rec_length : res_length;
if (check_dupl_count)
{
if (check_dupl_count)
{
memcpy((uchar *) &dupl_count, src+dupl_count_ofs, sizeof(dupl_count));
if (dupl_count < min_dupl_count)
continue;
}
if (my_b_write(to_file, src, wr_len))
goto err;
memcpy((uchar *) &dupl_count,
buffpek->current_key() + offset + dupl_count_ofs,
sizeof(dupl_count));
if (dupl_count < min_dupl_count)
continue;
}
if (my_b_write(to_file, buffpek->current_key() + wr_offset,
bytes_to_write))
goto err;
buffpek->advance_current_key(rec_length);
}
}
while (likely(!(error=
(bytes_read= read_to_buffer(from_file, buffpek,
rec_length)) == (ulong) -1)) &&
param)) == (ulong) -1)) &&
bytes_read != 0);
end:
lastbuff->count= MY_MIN(org_max_rows-max_rows, param->max_rows);
lastbuff->file_pos= to_start_filepos;
lastbuff->set_rowcount(MY_MIN(org_max_rows-max_rows, param->max_rows));
lastbuff->set_file_position(to_start_filepos);
cleanup:
delete_queue(&queue);
DBUG_RETURN(error);
......@@ -1848,13 +1984,13 @@ bool merge_buffers(Sort_param *param, IO_CACHE *from_file,
/* Do a merge to output-file (save only positions) */
int merge_index(Sort_param *param, uchar *sort_buffer,
BUFFPEK *buffpek, uint maxbuffer,
IO_CACHE *tempfile, IO_CACHE *outfile)
int merge_index(Sort_param *param, Sort_buffer sort_buffer,
Merge_chunk *buffpek, uint maxbuffer,
IO_CACHE *tempfile, IO_CACHE *outfile)
{
DBUG_ENTER("merge_index");
if (merge_buffers(param,tempfile,outfile,sort_buffer,buffpek,buffpek,
buffpek+maxbuffer,1))
if (merge_buffers(param, tempfile, outfile, sort_buffer, buffpek, buffpek,
buffpek + maxbuffer, 1))
DBUG_RETURN(1); /* purecov: inspected */
DBUG_RETURN(0);
} /* merge_index */
......@@ -1977,7 +2113,7 @@ sortlength(THD *thd, SORT_FIELD *sortorder, uint s_length,
sortorder->length= (uint)cs->coll->strnxfrmlen(cs, sortorder->length);
}
if (sortorder->field->maybe_null())
length++; // Place for NULL marker
length++; // Place for NULL marker
}
else
{
......@@ -1988,21 +2124,40 @@ sortlength(THD *thd, SORT_FIELD *sortorder, uint s_length,
*multi_byte_charset= true;
}
if (sortorder->item->maybe_null)
length++; // Place for NULL marker
length++; // Place for NULL marker
}
set_if_smaller(sortorder->length, thd->variables.max_sort_length);
length+=sortorder->length;
}
sortorder->field= (Field*) 0; // end marker
sortorder->field= NULL; // end marker
DBUG_PRINT("info",("sort_length: %d",length));
return length;
}
/*
Check whether addon fields can be used or not.
@param table Table structure
@param sortlength Length of sort key
@param length [OUT] Max length of addon fields
@param fields [OUT] Number of addon fields
@param null_fields [OUT] Number of nullable addon fields
@param packable_length [OUT] Max length of addon fields that can be
packed
@retval
TRUE Addon fields can be used
FALSE Otherwise
*/
bool filesort_use_addons(TABLE *table, uint sortlength,
uint *length, uint *fields, uint *null_fields)
uint *length, uint *fields, uint *null_fields,
uint *packable_length)
{
Field **pfield, *field;
*length= *fields= *null_fields= 0;
*length= *fields= *null_fields= *packable_length= 0;
uint field_length=0;
for (pfield= table->field; (field= *pfield) ; pfield++)
{
......@@ -2010,7 +2165,12 @@ bool filesort_use_addons(TABLE *table, uint sortlength,
continue;
if (field->flags & BLOB_FLAG)
return false;
(*length)+= field->max_packed_col_length(field->pack_length());
field_length= field->max_packed_col_length(field->pack_length());
(*length)+= field_length;
if (field->maybe_null() || field->is_packable())
(*packable_length)+= field_length;
if (field->maybe_null())
(*null_fields)++;
(*fields)++;
......@@ -2035,11 +2195,11 @@ bool filesort_use_addons(TABLE *table, uint sortlength,
layouts for the values of the non-sorted fields in the buffer and
fills them.
@param thd Current thread
@param ptabfield Array of references to the table fields
@param sortlength Total length of sorted fields
@param [out] addon_buf Buffer to us for appended fields
@param table Table structure
@param sortlength Total length of sorted fields
@param addon_length [OUT] Length of addon fields
@param m_packable_length [OUT] Length of the addon fields that can be
packed
@note
The null bits for the appended values are supposed to be put together
and stored the buffer just ahead of the value of the first field.
......@@ -2050,13 +2210,13 @@ bool filesort_use_addons(TABLE *table, uint sortlength,
NULL if we do not store field values with sort data.
*/
static SORT_ADDON_FIELD *
get_addon_fields(TABLE *table, uint sortlength, LEX_STRING *addon_buf)
static Addon_fields*
get_addon_fields(TABLE *table, uint sortlength,
uint *addon_length, uint *m_packable_length)
{
Field **pfield;
Field *field;
SORT_ADDON_FIELD *addonf;
uint length, fields, null_fields;
uint length, fields, null_fields, packable_length;
MY_BITMAP *read_set= table->read_set;
DBUG_ENTER("get_addon_fields");
......@@ -2070,23 +2230,34 @@ get_addon_fields(TABLE *table, uint sortlength, LEX_STRING *addon_buf)
the values directly from sorted fields.
But beware the case when item->cmp_type() != item->result_type()
*/
addon_buf->str= 0;
addon_buf->length= 0;
// see remove_const() for HA_SLOW_RND_POS explanation
if (table->file->ha_table_flags() & HA_SLOW_RND_POS)
sortlength= 0;
if (!filesort_use_addons(table, sortlength, &length, &fields, &null_fields) ||
!my_multi_malloc(MYF(MY_WME | MY_THREAD_SPECIFIC), &addonf,
sizeof(SORT_ADDON_FIELD) * (fields+1),
&addon_buf->str, length, NullS))
void *raw_mem_addon_field, *raw_mem;
if (!filesort_use_addons(table, sortlength, &length, &fields, &null_fields,
&packable_length) ||
!(my_multi_malloc(MYF(MY_WME | MY_THREAD_SPECIFIC),
&raw_mem, sizeof(Addon_fields),
&raw_mem_addon_field,
sizeof(SORT_ADDON_FIELD) * fields,
NullS)))
DBUG_RETURN(0);
addon_buf->length= length;
Addon_fields_array
addon_array(static_cast<SORT_ADDON_FIELD*>(raw_mem_addon_field), fields);
Addon_fields *addon_fields= new (raw_mem) Addon_fields(addon_array);
DBUG_ASSERT(addon_fields);
(*addon_length)= length;
(*m_packable_length)= packable_length;
length= (null_fields+7)/8;
null_fields= 0;
SORT_ADDON_FIELD* addonf= addon_fields->begin();
for (pfield= table->field; (field= *pfield) ; pfield++)
{
if (!bitmap_is_set(read_set, field->field_index))
......@@ -2108,10 +2279,9 @@ get_addon_fields(TABLE *table, uint sortlength, LEX_STRING *addon_buf)
length+= addonf->length;
addonf++;
}
addonf->field= 0; // Put end marker
DBUG_PRINT("info",("addon_length: %d",length));
DBUG_RETURN(addonf-fields);
DBUG_RETURN(addon_fields);
}
......@@ -2130,24 +2300,7 @@ get_addon_fields(TABLE *table, uint sortlength, LEX_STRING *addon_buf)
void.
*/
static void
unpack_addon_fields(struct st_sort_addon_field *addon_field, uchar *buff,
uchar *buff_end)
{
Field *field;
SORT_ADDON_FIELD *addonf= addon_field;
for ( ; (field= addonf->field) ; addonf++)
{
if (addonf->null_bit && (addonf->null_bit & buff[addonf->null_offset]))
{
field->set_null();
continue;
}
field->set_notnull();
field->unpack(field->ptr, buff + addonf->offset, buff_end, 0);
}
}
/*
** functions to change a double or float to a sortable string
......@@ -2197,6 +2350,17 @@ void change_double_for_sort(double nr,uchar *to)
}
}
bool SORT_INFO::using_packed_addons()
{
return addon_fields != NULL && addon_fields->using_packed_addons();
}
void SORT_INFO::free_addon_buff()
{
if (addon_fields)
addon_fields->free_addon_buff();
}
/**
Free SORT_INFO
*/
......
......@@ -27,7 +27,7 @@ class Filesort_tracker;
struct SORT_FIELD;
typedef struct st_order ORDER;
class JOIN;
class Addon_fields;
/**
Sorting related info.
......@@ -87,7 +87,8 @@ class SORT_INFO
public:
SORT_INFO()
:addon_field(0), record_pointers(0)
:addon_fields(NULL), record_pointers(0),
sorted_result_in_fsbuf(FALSE)
{
buffpek.str= 0;
my_b_clear(&io_cache);
......@@ -98,9 +99,11 @@ class SORT_INFO
void free_data()
{
close_cached_file(&io_cache);
free_addon_buff();
my_free(record_pointers);
my_free(buffpek.str);
my_free(addon_field);
my_free(addon_fields);
free_sort_buffer();
}
void reset()
......@@ -108,17 +111,26 @@ class SORT_INFO
free_data();
record_pointers= 0;
buffpek.str= 0;
addon_field= 0;
addon_fields= 0;
sorted_result_in_fsbuf= false;
}
void free_addon_buff();
IO_CACHE io_cache; /* If sorted through filesort */
LEX_STRING buffpek; /* Buffer for buffpek structures */
LEX_STRING addon_buf; /* Pointer to a buffer if sorted with fields */
struct st_sort_addon_field *addon_field; /* Pointer to the fields info */
/* To unpack back */
void (*unpack)(struct st_sort_addon_field *, uchar *, uchar *);
Addon_fields *addon_fields; /* Addon field descriptors */
uchar *record_pointers; /* If sorted in memory */
/**
If the entire result of filesort fits in memory, we skip the merge phase.
We may leave the result in filesort_buffer
(indicated by sorted_result_in_fsbuf), or we may strip away
the sort keys, and copy the sorted result into a new buffer.
@see save_index()
*/
bool sorted_result_in_fsbuf;
/*
How many rows in final result.
Also how many rows in record_pointers, if used
......@@ -131,27 +143,65 @@ class SORT_INFO
void sort_buffer(Sort_param *param, uint count)
{ filesort_buffer.sort_buffer(param, count); }
/**
Accessors for Filesort_buffer (which @c).
*/
uchar *get_record_buffer(uint idx)
{ return filesort_buffer.get_record_buffer(idx); }
uchar **get_sort_keys()
{ return filesort_buffer.get_sort_keys(); }
uchar **alloc_sort_buffer(uint num_records, uint record_length)
uchar *get_sorted_record(uint ix)
{ return filesort_buffer.get_sorted_record(ix); }
uchar *alloc_sort_buffer(uint num_records, uint record_length)
{ return filesort_buffer.alloc_sort_buffer(num_records, record_length); }
void free_sort_buffer()
{ filesort_buffer.free_sort_buffer(); }
bool isfull() const
{ return filesort_buffer.isfull(); }
void init_record_pointers()
{ filesort_buffer.init_record_pointers(); }
void init_next_record_pointer()
{ filesort_buffer.init_next_record_pointer(); }
uchar *get_next_record_pointer()
{ return filesort_buffer.get_next_record_pointer(); }
void adjust_next_record_pointer(uint val)
{ filesort_buffer.adjust_next_record_pointer(val); }
Bounds_checked_array<uchar> get_raw_buf()
{ return filesort_buffer.get_raw_buf(); }
size_t sort_buffer_size() const
{ return filesort_buffer.sort_buffer_size(); }
bool is_allocated() const
{ return filesort_buffer.is_allocated(); }
void set_sort_length(uint val)
{ filesort_buffer.set_sort_length(val); }
uint get_sort_length() const
{ return filesort_buffer.get_sort_length(); }
bool has_filesort_result_in_memory() const
{
return record_pointers || sorted_result_in_fsbuf;
}
/// Are we using "addon fields"?
bool using_addon_fields() const
{
return addon_fields != NULL;
}
/// Are we using "packed addon fields"?
bool using_packed_addons();
/**
Copies (unpacks) values appended to sorted fields from a buffer back to
their regular positions specified by the Field::ptr pointers.
@param buff Buffer which to unpack the value from
*/
template<bool Packed_addon_fields>
inline void unpack_addon_fields(uchar *buff);
friend SORT_INFO *filesort(THD *thd, TABLE *table, Filesort *filesort,
Filesort_tracker* tracker, JOIN *join,
table_map first_table_bit);
......@@ -162,7 +212,8 @@ SORT_INFO *filesort(THD *thd, TABLE *table, Filesort *filesort,
table_map first_table_bit=0);
bool filesort_use_addons(TABLE *table, uint sortlength,
uint *length, uint *fields, uint *null_fields);
uint *length, uint *fields, uint *null_fields,
uint *m_packable_length);
void change_double_for_sort(double nr,uchar *to);
......
......@@ -96,82 +96,92 @@ double get_merge_many_buffs_cost_fast(ha_rows num_rows,
# Pointer to allocated buffer
*/
uchar **Filesort_buffer::alloc_sort_buffer(uint num_records,
uint record_length)
uchar *Filesort_buffer::alloc_sort_buffer(uint num_records,
uint record_length)
{
size_t buff_size;
uchar **sort_keys, **start_of_data;
DBUG_ENTER("alloc_sort_buffer");
DBUG_EXECUTE_IF("alloc_sort_buffer_fail",
DBUG_SET("+d,simulate_out_of_memory"););
buff_size= ((size_t)num_records) * (record_length + sizeof(uchar*));
set_if_bigger(buff_size, record_length * MERGEBUFF2);
buff_size= ALIGN_SIZE(num_records * (record_length + sizeof(uchar*)));
if (!m_idx_array.is_null())
/*
The minimum memory required should be each merge buffer can hold atmost
one key.
TODO varun: move this to the place where min_sort_memory is used.
*/
set_if_bigger(buff_size, (record_length +sizeof(uchar*)) * MERGEBUFF2);
if (m_rawmem)
{
/*
Reuse old buffer if exists and is large enough
Note that we don't make the buffer smaller, as we want to be
prepared for next subquery iteration.
*/
sort_keys= m_idx_array.array();
if (buff_size > allocated_size)
if (buff_size > m_size_in_bytes)
{
/*
Better to free and alloc than realloc as we don't have to remember
the old values
*/
my_free(sort_keys);
if (!(sort_keys= (uchar**) my_malloc(buff_size,
MYF(MY_THREAD_SPECIFIC))))
my_free(m_rawmem);
if (!(m_rawmem= (uchar*) my_malloc(buff_size, MYF(MY_THREAD_SPECIFIC))))
{
reset();
m_size_in_bytes= 0;
DBUG_RETURN(0);
}
allocated_size= buff_size;
}
}
else
{
if (!(sort_keys= (uchar**) my_malloc(buff_size, MYF(MY_THREAD_SPECIFIC))))
if (!(m_rawmem= (uchar*) my_malloc(buff_size, MYF(MY_THREAD_SPECIFIC))))
{
m_size_in_bytes= 0;
DBUG_RETURN(0);
allocated_size= buff_size;
}
}
m_idx_array= Idx_array(sort_keys, num_records);
m_size_in_bytes= buff_size;
m_record_pointers= reinterpret_cast<uchar**>(m_rawmem) +
((m_size_in_bytes / sizeof(uchar*)) - 1);
m_num_records= num_records;
m_record_length= record_length;
start_of_data= m_idx_array.array() + m_idx_array.size();
m_start_of_data= reinterpret_cast<uchar*>(start_of_data);
DBUG_RETURN(m_idx_array.array());
m_idx= 0;
DBUG_RETURN(m_rawmem);
}
void Filesort_buffer::free_sort_buffer()
{
my_free(m_idx_array.array());
m_idx_array.reset();
m_start_of_data= NULL;
my_free(m_rawmem);
*this= Filesort_buffer();
}
void Filesort_buffer::sort_buffer(const Sort_param *param, uint count)
{
size_t size= param->sort_length;
m_sort_keys= get_sort_keys();
if (count <= 1 || size == 0)
return;
uchar **keys= get_sort_keys();
// dont reverse for PQ, it is already done
if (!param->using_pq)
reverse_record_pointers();
uchar **buffer= NULL;
if (radixsort_is_appliccable(count, param->sort_length) &&
(buffer= (uchar**) my_malloc(count*sizeof(char*),
MYF(MY_THREAD_SPECIFIC))))
{
radixsort_for_str_ptr(keys, count, param->sort_length, buffer);
radixsort_for_str_ptr(m_sort_keys, count, param->sort_length, buffer);
my_free(buffer);
return;
}
my_qsort2(keys, count, sizeof(uchar*), get_ptr_compare(size), &size);
my_qsort2(m_sort_keys, count, sizeof(uchar*), get_ptr_compare(size), &size);
}
......@@ -46,68 +46,194 @@ double get_merge_many_buffs_cost_fast(ha_rows num_rows,
/**
A wrapper class around the buffer used by filesort().
The buffer is a contiguous chunk of memory,
where the first part is <num_records> pointers to the actual data.
The sort buffer is a contiguous chunk of memory,
containing both records to be sorted, and pointers to said records:
<start of buffer | still unused | end of buffer>
|rec 0|record 1 |rec 2| ............ |ptr to rec2|ptr to rec1|ptr to rec0|
Records will be inserted "left-to-right". Records are not necessarily
fixed-size, they can be packed and stored without any "gaps".
Record pointers will be inserted "right-to-left", as a side-effect
of inserting the actual records.
We wrap the buffer in order to be able to do lazy initialization of the
pointers: the buffer is often much larger than what we actually need.
With this allocation scheme, and lazy initialization of the pointers,
we are able to pack variable-sized records in the buffer,
and thus possibly have space for more records than we initially estimated.
The buffer must be kept available for multiple executions of the
same sort operation, so we have explicit allocate and free functions,
rather than doing alloc/free in CTOR/DTOR.
*/
class Filesort_buffer
{
public:
Filesort_buffer()
: m_idx_array(), m_start_of_data(NULL), allocated_size(0)
Filesort_buffer() :
m_next_rec_ptr(NULL), m_rawmem(NULL), m_record_pointers(NULL),
m_sort_keys(NULL),
m_num_records(0), m_record_length(0),
m_sort_length(0),
m_size_in_bytes(0), m_idx(0)
{}
~Filesort_buffer()
/** Sort me... */
void sort_buffer(const Sort_param *param, uint count);
/**
Reverses the record pointer array, to avoid recording new results for
non-deterministic mtr tests.
*/
void reverse_record_pointers()
{
my_free(m_idx_array.array());
if (m_idx < 2) // There is nothing to swap.
return;
uchar **keys= get_sort_keys();
const longlong count= m_idx - 1;
for (longlong ix= 0; ix <= count/2; ++ix)
{
uchar *tmp= keys[count - ix];
keys[count - ix] = keys[ix];
keys[ix]= tmp;
}
}
bool is_allocated()
/**
Initializes all the record pointers.
*/
void init_record_pointers()
{
return m_idx_array.array() != 0;
init_next_record_pointer();
while (m_idx < m_num_records)
(void) get_next_record_pointer();
reverse_record_pointers();
}
void reset()
/**
Prepares the buffer for the next batch of records to process.
*/
void init_next_record_pointer()
{
m_idx_array.reset();
m_idx= 0;
m_next_rec_ptr= m_rawmem;
m_sort_keys= NULL;
}
/** Sort me... */
void sort_buffer(const Sort_param *param, uint count);
/**
@returns the number of bytes currently in use for data.
*/
size_t space_used_for_data() const
{
return m_next_rec_ptr ? m_next_rec_ptr - m_rawmem : 0;
}
/// Initializes a record pointer.
uchar *get_record_buffer(uint idx)
/**
@returns the number of bytes left in the buffer.
*/
size_t spaceleft() const
{
m_idx_array[idx]= m_start_of_data + (idx * m_record_length);
return m_idx_array[idx];
DBUG_ASSERT(m_next_rec_ptr >= m_rawmem);
const size_t spaceused=
(m_next_rec_ptr - m_rawmem) +
(static_cast<size_t>(m_idx) * sizeof(uchar*));
return m_size_in_bytes - spaceused;
}
/// Initializes all the record pointers.
void init_record_pointers()
/**
Is the buffer full?
*/
bool isfull() const
{
if (m_idx < m_num_records)
return false;
return spaceleft() < (m_record_length + sizeof(uchar*));
}
/**
Where should the next record be stored?
*/
uchar *get_next_record_pointer()
{
uchar *retval= m_next_rec_ptr;
// Save the return value in the record pointer array.
m_record_pointers[-m_idx]= m_next_rec_ptr;
// Prepare for the subsequent request.
m_idx++;
m_next_rec_ptr+= m_record_length;
return retval;
}
/**
Adjusts for actual record length. get_next_record_pointer() above was
pessimistic, and assumed that the record could not be packed.
*/
void adjust_next_record_pointer(uint val)
{
for (uint ix= 0; ix < m_idx_array.size(); ++ix)
(void) get_record_buffer(ix);
m_next_rec_ptr-= (m_record_length - val);
}
/// Returns total size: pointer array + record buffers.
size_t sort_buffer_size() const
{
return allocated_size;
return m_size_in_bytes;
}
/// Allocates the buffer, but does *not* initialize pointers.
uchar **alloc_sort_buffer(uint num_records, uint record_length);
bool is_allocated() const
{
return m_rawmem;
}
/**
Allocates the buffer, but does *not* initialize pointers.
Total size = (num_records * record_length) + (num_records * sizeof(pointer))
space for records space for pointer to records
Caller is responsible for raising an error if allocation fails.
@param num_records Number of records.
@param record_length (maximum) size of each record.
@returns Pointer to allocated area, or NULL in case of out-of-memory.
*/
uchar *alloc_sort_buffer(uint num_records, uint record_length);
/// Frees the buffer.
void free_sort_buffer();
/// Getter, for calling routines which still use the uchar** interface.
uchar **get_sort_keys() { return m_idx_array.array(); }
void reset()
{
m_rawmem= NULL;
}
/**
Used to access the "right-to-left" array of record pointers as an ordinary
"left-to-right" array, so that we can pass it directly on to std::sort().
*/
uchar **get_sort_keys()
{
if (m_idx == 0)
return NULL;
return &m_record_pointers[1 - m_idx];
}
/**
Gets sorted record number ix. @see get_sort_keys()
Only valid after buffer has been sorted!
*/
uchar *get_sorted_record(uint ix)
{
return m_sort_keys[ix];
}
/**
@returns The entire buffer, as a character array.
This is for reusing the memory for merge buffers.
*/
Bounds_checked_array<uchar> get_raw_buf()
{
return Bounds_checked_array<uchar>(m_rawmem, m_size_in_bytes);
}
/**
We need an assignment operator, see filesort().
......@@ -117,20 +243,40 @@ class Filesort_buffer
*/
Filesort_buffer &operator=(const Filesort_buffer &rhs)
{
m_idx_array= rhs.m_idx_array;
m_next_rec_ptr= rhs.m_next_rec_ptr;
m_rawmem= rhs.m_rawmem;
m_record_pointers= rhs.m_record_pointers;
m_sort_keys= rhs.m_sort_keys;
m_num_records= rhs.m_num_records;
m_record_length= rhs.m_record_length;
m_start_of_data= rhs.m_start_of_data;
allocated_size= rhs.allocated_size;
m_sort_length= rhs.m_sort_length;
m_size_in_bytes= rhs.m_size_in_bytes;
m_idx= rhs.m_idx;
return *this;
}
uint get_sort_length() const { return m_sort_length; }
void set_sort_length(uint val) { m_sort_length= val; }
private:
typedef Bounds_checked_array<uchar*> Idx_array;
uchar *m_next_rec_ptr; /// The next record will be inserted here.
uchar *m_rawmem; /// The raw memory buffer.
uchar **m_record_pointers; /// The "right-to-left" array of record pointers.
uchar **m_sort_keys; /// Caches the value of get_sort_keys()
uint m_num_records; /// Saved value from alloc_sort_buffer()
uint m_record_length; /// Saved value from alloc_sort_buffer()
uint m_sort_length; /// The length of the sort key.
size_t m_size_in_bytes; /// Size of raw buffer, in bytes.
Idx_array m_idx_array; /* Pointers to key data */
uint m_record_length;
uchar *m_start_of_data; /* Start of key data */
size_t allocated_size;
/**
This is the index in the "right-to-left" array of the next record to
be inserted into the buffer. It is signed, because we use it in signed
expressions like:
m_record_pointers[-m_idx];
It is longlong rather than int, to ensure that it covers UINT_MAX32
without any casting/warning.
*/
longlong m_idx;
};
#endif // FILESORT_UTILS_INCLUDED
......@@ -38,8 +38,8 @@
static int rr_quick(READ_RECORD *info);
int rr_sequential(READ_RECORD *info);
static int rr_from_tempfile(READ_RECORD *info);
static int rr_unpack_from_tempfile(READ_RECORD *info);
static int rr_unpack_from_buffer(READ_RECORD *info);
template<bool> static int rr_unpack_from_tempfile(READ_RECORD *info);
template<bool> static int rr_unpack_from_buffer(READ_RECORD *info);
int rr_from_pointers(READ_RECORD *info);
static int rr_from_cache(READ_RECORD *info);
static int init_rr_cache(THD *thd, READ_RECORD *info);
......@@ -187,23 +187,23 @@ bool init_read_record(READ_RECORD *info,THD *thd, TABLE *table,
bool disable_rr_cache)
{
IO_CACHE *tempfile;
SORT_ADDON_FIELD *addon_field= filesort ? filesort->addon_field : 0;
DBUG_ENTER("init_read_record");
const bool using_addon_fields= filesort && filesort->using_addon_fields();
bzero((char*) info,sizeof(*info));
info->thd=thd;
info->table=table;
info->addon_field= addon_field;
info->sort_info= filesort;
if ((table->s->tmp_table == INTERNAL_TMP_TABLE) &&
!addon_field)
!using_addon_fields)
(void) table->file->extra(HA_EXTRA_MMAP);
if (addon_field)
if (using_addon_fields)
{
info->rec_buf= (uchar*) filesort->addon_buf.str;
info->ref_length= (uint)filesort->addon_buf.length;
info->unpack= filesort->unpack;
info->rec_buf= filesort->addon_fields->get_addon_buf();
info->ref_length= filesort->addon_fields->get_addon_buf_length();
}
else
{
......@@ -223,9 +223,20 @@ bool init_read_record(READ_RECORD *info,THD *thd, TABLE *table,
if (tempfile && !(select && select->quick))
{
DBUG_PRINT("info",("using rr_from_tempfile"));
info->read_record_func=
addon_field ? rr_unpack_from_tempfile : rr_from_tempfile;
if (using_addon_fields)
{
DBUG_PRINT("info",("using rr_from_tempfile"));
if (filesort->addon_fields->using_packed_addons())
info->read_record_func= rr_unpack_from_tempfile<true>;
else
info->read_record_func= rr_unpack_from_tempfile<false>;
}
else
{
DBUG_PRINT("info",("using rr_from_tempfile"));
info->read_record_func= rr_from_tempfile;
}
info->io_cache= tempfile;
reinit_io_cache(info->io_cache,READ_CACHE,0L,0,0);
info->ref_pos=table->file->ref;
......@@ -239,7 +250,7 @@ bool init_read_record(READ_RECORD *info,THD *thd, TABLE *table,
and filesort->io_cache is read sequentially
*/
if (!disable_rr_cache &&
!addon_field &&
!using_addon_fields &&
thd->variables.read_rnd_buff_size &&
!(table->file->ha_table_flags() & HA_FAST_KEY_READ) &&
(table->db_stat & HA_READ_ONLY ||
......@@ -264,16 +275,29 @@ bool init_read_record(READ_RECORD *info,THD *thd, TABLE *table,
DBUG_PRINT("info",("using rr_quick"));
info->read_record_func= rr_quick;
}
else if (filesort && filesort->record_pointers)
else if (filesort && filesort->has_filesort_result_in_memory())
{
DBUG_PRINT("info",("using record_pointers"));
if (unlikely(table->file->ha_rnd_init_with_error(0)))
DBUG_RETURN(1);
info->cache_pos= filesort->record_pointers;
info->cache_end= (info->cache_pos+
filesort->return_rows * info->ref_length);
info->read_record_func=
addon_field ? rr_unpack_from_buffer : rr_from_pointers;
if (using_addon_fields)
{
DBUG_PRINT("info",("using rr_unpack_from_buffer"));
DBUG_ASSERT(filesort->sorted_result_in_fsbuf);
info->unpack_counter= 0;
if (filesort->using_packed_addons())
info->read_record_func= rr_unpack_from_buffer<true>;
else
info->read_record_func= rr_unpack_from_buffer<false>;
}
else
{
info->cache_end= (info->cache_pos+
filesort->return_rows * info->ref_length);
info->read_record_func= rr_from_pointers;
}
}
else if (table->file->keyread_enabled())
{
......@@ -510,7 +534,11 @@ static int rr_from_tempfile(READ_RECORD *info)
the fields values use in the result set from this buffer into their
positions in the regular record buffer.
@param info Reference to the context including record descriptors
@param info Reference to the context including record
descriptors
@param Packed_addon_fields Are the addon fields packed?
This is a compile-time constant, to
avoid if (....) tests during execution.
@retval
0 Record successfully read.
......@@ -518,12 +546,38 @@ static int rr_from_tempfile(READ_RECORD *info)
-1 There is no record to be read anymore.
*/
template<bool Packed_addon_fields>
static int rr_unpack_from_tempfile(READ_RECORD *info)
{
if (my_b_read(info->io_cache, info->rec_buf, info->ref_length))
return -1;
(*info->unpack)(info->addon_field, info->rec_buf,
info->rec_buf + info->ref_length);
uchar *destination= info->rec_buf;
#ifndef DBUG_OFF
my_off_t where= my_b_tell(info->io_cache);
#endif
if (Packed_addon_fields)
{
const uint len_sz= Addon_fields::size_of_length_field;
// First read length of the record.
if (my_b_read(info->io_cache, destination, len_sz))
return -1;
uint res_length= Addon_fields::read_addon_length(destination);
DBUG_PRINT("info", ("rr_unpack from %llu to %p sz %u",
static_cast<ulonglong>(where),
destination, res_length));
DBUG_ASSERT(res_length > len_sz);
DBUG_ASSERT(info->sort_info->using_addon_fields());
// Then read the rest of the record.
if (my_b_read(info->io_cache, destination + len_sz, res_length - len_sz))
return -1; /* purecov: inspected */
}
else
{
if (my_b_read(info->io_cache, destination, info->ref_length))
return -1;
}
info->sort_info->unpack_addon_fields<Packed_addon_fields>(destination);
return 0;
}
......@@ -560,7 +614,11 @@ int rr_from_pointers(READ_RECORD *info)
the fields values use in the result set from this buffer into their
positions in the regular record buffer.
@param info Reference to the context including record descriptors
@param info Reference to the context including record
descriptors
@param Packed_addon_fields Are the addon fields packed?
This is a compile-time constant, to
avoid if (....) tests during execution.
@retval
0 Record successfully read.
......@@ -568,13 +626,17 @@ int rr_from_pointers(READ_RECORD *info)
-1 There is no record to be read anymore.
*/
template<bool Packed_addon_fields>
static int rr_unpack_from_buffer(READ_RECORD *info)
{
if (info->cache_pos == info->cache_end)
if (info->unpack_counter == info->sort_info->return_rows)
return -1; /* End of buffer */
(*info->unpack)(info->addon_field, info->cache_pos,
info->cache_end);
info->cache_pos+= info->ref_length;
uchar *record= info->sort_info->get_sorted_record(
static_cast<uint>(info->unpack_counter));
uchar *plen= record + info->sort_info->get_sort_length();
info->sort_info->unpack_addon_fields<Packed_addon_fields>(plen);
info->unpack_counter++;
return 0;
}
/* cacheing of records from a database */
......@@ -709,3 +771,26 @@ static int rr_cmp(uchar *a,uchar *b)
return (int) a[7] - (int) b[7];
#endif
}
template<bool Packed_addon_fields>
inline void SORT_INFO::unpack_addon_fields(uchar *buff)
{
SORT_ADDON_FIELD *addonf= addon_fields->begin();
uchar *buff_end= buff + sort_buffer_size();
const uchar *start_of_record= buff + addonf->offset;
for ( ; addonf != addon_fields->end() ; addonf++)
{
Field *field= addonf->field;
if (addonf->null_bit && (addonf->null_bit & buff[addonf->null_offset]))
{
field->set_null();
continue;
}
field->set_notnull();
if (Packed_addon_fields)
start_of_record= field->unpack(field->ptr, start_of_record, buff_end, 0);
else
field->unpack(field->ptr, buff + addonf->offset, buff_end, 0);
}
}
......@@ -58,13 +58,23 @@ struct READ_RECORD
THD *thd;
SQL_SELECT *select;
uint ref_length, reclength, rec_cache_size, error_offset;
/**
Counting records when reading result from filesort().
Used when filesort leaves the result in the filesort buffer.
*/
ha_rows unpack_counter;
uchar *ref_pos; /* pointer to form->refpos */
uchar *rec_buf; /* to read field values after filesort */
uchar *cache,*cache_pos,*cache_end,*read_positions;
struct st_sort_addon_field *addon_field; /* Pointer to the fields info */
/*
Structure storing information about sorting
*/
SORT_INFO *sort_info;
struct st_io_cache *io_cache;
bool print_error;
void (*unpack)(struct st_sort_addon_field *, uchar *, uchar *);
int read_record() { return read_record_func(this); }
uchar *record() const { return table->record[0]; }
......
......@@ -85,6 +85,10 @@ template <typename Element_type> class Bounds_checked_array
Element_type *array() const { return m_array; }
Element_type *begin() const { return array(); }
Element_type *end() const { return array() + m_size; }
bool operator==(const Bounds_checked_array<Element_type>&rhs) const
{
return m_array == rhs.m_array && m_size == rhs.m_size;
......
......@@ -13997,7 +13997,7 @@ remove_const(JOIN *join,ORDER *first_order, COND *cond,
*simple_order= head->on_expr_ref[0] == NULL;
if (*simple_order && head->table->file->ha_table_flags() & HA_SLOW_RND_POS)
{
uint u1, u2, u3;
uint u1, u2, u3, u4;
/*
normally the condition is (see filesort_use_addons())
......@@ -14008,7 +14008,7 @@ remove_const(JOIN *join,ORDER *first_order, COND *cond,
TODO proper cost estimations
*/
*simple_order= filesort_use_addons(head->table, 0, &u1, &u2, &u3);
*simple_order= filesort_use_addons(head->table, 0, &u1, &u2, &u3, &u4);
}
}
else
......
......@@ -20,8 +20,6 @@
#include <my_sys.h> /* qsort2_cmp */
#include "queues.h"
typedef struct st_buffpek BUFFPEK;
struct SORT_FIELD;
class Field;
struct TABLE;
......@@ -64,21 +62,236 @@ struct BUFFPEK_COMPARE_CONTEXT
};
/**
Descriptor for a merge chunk to be sort-merged.
A merge chunk is a sequence of pre-sorted records, written to a
temporary file. A Merge_chunk instance describes where this chunk is stored
in the file, and where it is located when it is in memory.
It is a POD because
- we read/write them from/to files.
We have accessors (getters/setters) for all struct members.
*/
struct Merge_chunk {
public:
Merge_chunk(): m_current_key(NULL),
m_file_position(0),
m_buffer_start(NULL),
m_buffer_end(NULL),
m_rowcount(0),
m_mem_count(0),
m_max_keys(0)
{}
my_off_t file_position() const { return m_file_position; }
void set_file_position(my_off_t val) { m_file_position= val; }
void advance_file_position(my_off_t val) { m_file_position+= val; }
uchar *buffer_start() { return m_buffer_start; }
const uchar *buffer_end() const { return m_buffer_end; }
void set_buffer(uchar *start, uchar *end)
{
m_buffer_start= start;
m_buffer_end= end;
}
void set_buffer_start(uchar *start)
{
m_buffer_start= start;
}
void set_buffer_end(uchar *end)
{
DBUG_ASSERT(m_buffer_end == NULL || end <= m_buffer_end);
m_buffer_end= end;
}
void init_current_key() { m_current_key= m_buffer_start; }
uchar *current_key() { return m_current_key; }
void advance_current_key(uint val) { m_current_key+= val; }
void decrement_rowcount(ha_rows val) { m_rowcount-= val; }
void set_rowcount(ha_rows val) { m_rowcount= val; }
ha_rows rowcount() const { return m_rowcount; }
ha_rows mem_count() const { return m_mem_count; }
void set_mem_count(ha_rows val) { m_mem_count= val; }
ha_rows decrement_mem_count() { return --m_mem_count; }
ha_rows max_keys() const { return m_max_keys; }
void set_max_keys(ha_rows val) { m_max_keys= val; }
size_t buffer_size() const { return m_buffer_end - m_buffer_start; }
/**
Tries to merge *this with *mc, returns true if successful.
The assumption is that *this is no longer in use,
and the space it has been allocated can be handed over to a
buffer which is adjacent to it.
*/
bool merge_freed_buff(Merge_chunk *mc) const
{
if (mc->m_buffer_end == m_buffer_start)
{
mc->m_buffer_end= m_buffer_end;
mc->m_max_keys+= m_max_keys;
return true;
}
else if (mc->m_buffer_start == m_buffer_end)
{
mc->m_buffer_start= m_buffer_start;
mc->m_max_keys+= m_max_keys;
return true;
}
return false;
}
uchar *m_current_key; /// The current key for this chunk.
my_off_t m_file_position;/// Current position in the file to be sorted.
uchar *m_buffer_start; /// Start of main-memory buffer for this chunk.
uchar *m_buffer_end; /// End of main-memory buffer for this chunk.
ha_rows m_rowcount; /// Number of unread rows in this chunk.
ha_rows m_mem_count; /// Number of rows in the main-memory buffer.
ha_rows m_max_keys; /// If we have fixed-size rows:
/// max number of rows in buffer.
};
typedef Bounds_checked_array<SORT_ADDON_FIELD> Addon_fields_array;
/**
This class wraps information about usage of addon fields.
An Addon_fields object is used both during packing of data in the filesort
buffer, and later during unpacking in 'Filesort_info::unpack_addon_fields'.
@see documentation for the Sort_addon_field struct.
@see documentation for get_addon_fields()
*/
class Addon_fields {
public:
Addon_fields(Addon_fields_array arr)
: m_field_descriptors(arr),
m_addon_buf(),
m_addon_buf_length(),
m_using_packed_addons(false)
{
DBUG_ASSERT(!arr.is_null());
}
SORT_ADDON_FIELD *begin() { return m_field_descriptors.begin(); }
SORT_ADDON_FIELD *end() { return m_field_descriptors.end(); }
/// rr_unpack_from_tempfile needs an extra buffer when unpacking.
uchar *allocate_addon_buf(uint sz)
{
m_addon_buf= (uchar *)my_malloc(sz, MYF(MY_WME | MY_THREAD_SPECIFIC));
if (m_addon_buf)
m_addon_buf_length= sz;
return m_addon_buf;
}
void free_addon_buff()
{
my_free(m_addon_buf);
m_addon_buf= NULL;
m_addon_buf_length= 0;
}
uchar *get_addon_buf() { return m_addon_buf; }
uint get_addon_buf_length() const { return m_addon_buf_length; }
void set_using_packed_addons(bool val)
{
m_using_packed_addons= val;
}
bool using_packed_addons() const
{
return m_using_packed_addons;
}
static bool can_pack_addon_fields(uint record_length)
{
return (record_length <= (0xFFFF));
}
/**
@returns Total number of bytes used for packed addon fields.
the size of the length field + size of null bits + sum of field sizes.
*/
static uint read_addon_length(uchar *p)
{
return size_of_length_field + uint2korr(p);
}
/**
Stores the number of bytes used for packed addon fields.
*/
static void store_addon_length(uchar *p, uint sz)
{
// We actually store the length of everything *after* the length field.
int2store(p, sz - size_of_length_field);
}
static const uint size_of_length_field= 2;
private:
Addon_fields_array m_field_descriptors;
uchar *m_addon_buf; ///< Buffer for unpacking addon fields.
uint m_addon_buf_length; ///< Length of the buffer.
bool m_using_packed_addons; ///< Are we packing the addon fields?
};
/**
There are two record formats for sorting:
|<key a><key b>...|<rowid>|
/ sort_length / ref_l /
or with "addon fields"
|<key a><key b>...|<null bits>|<field a><field b>...|
/ sort_length / addon_length /
The packed format for "addon fields"
|<key a><key b>...|<length>|<null bits>|<field a><field b>...|
/ sort_length / addon_length /
<key> Fields are fixed-size, specially encoded with
Field::make_sort_key() so we can do byte-by-byte compare.
<length> Contains the *actual* packed length (after packing) of
everything after the sort keys.
The size of the length field is 2 bytes,
which should cover most use cases: addon data <= 65535 bytes.
This is the same as max record size in MySQL.
<null bits> One bit for each nullable field, indicating whether the field
is null or not. May have size zero if no fields are nullable.
<field xx> Are stored with field->pack(), and retrieved with
field->unpack(). Addon fields within a record are stored
consecutively, with no "holes" or padding. They will have zero
size for NULL values.
*/
class Sort_param {
public:
uint rec_length; // Length of sorted records.
uint sort_length; // Length of sorted columns.
uint ref_length; // Length of record ref.
uint addon_length; // Length of addon_fields
uint res_length; // Length of records in final sorted file/buffer.
uint max_keys_per_buffer; // Max keys / buffer.
uint min_dupl_count;
ha_rows max_rows; // Select limit, or HA_POS_ERROR if unlimited.
ha_rows examined_rows; // Number of examined rows.
TABLE *sort_form; // For quicker make_sortkey.
SORT_FIELD *local_sortorder;
SORT_FIELD *end;
SORT_ADDON_FIELD *addon_field; // Descriptors for companion fields.
LEX_STRING addon_buf; // Buffer & length of added packed fields.
/**
ORDER BY list with some precalculated info for filesort.
Array is created and owned by a Filesort instance.
*/
Bounds_checked_array<SORT_FIELD> local_sortorder;
Addon_fields *addon_fields; // Descriptors for companion fields.
bool using_pq;
uchar *unique_buff;
bool not_killable;
......@@ -93,21 +306,63 @@ class Sort_param {
}
void init_for_filesort(uint sortlen, TABLE *table,
ha_rows maxrows, bool sort_positions);
/// Enables the packing of addons if possible.
void try_to_pack_addons(ulong max_length_for_sort_data);
/// Are we packing the "addon fields"?
bool using_packed_addons() const
{
DBUG_ASSERT(m_using_packed_addons ==
(addon_fields != NULL &&
addon_fields->using_packed_addons()));
return m_using_packed_addons;
}
/// Are we using "addon fields"?
bool using_addon_fields() const
{
return addon_fields != NULL;
}
/**
Getter for record length and result length.
@param record_start Pointer to record.
@param [out] recl Store record length here.
@param [out] resl Store result length here.
*/
void get_rec_and_res_len(uchar *record_start, uint *recl, uint *resl)
{
if (!using_packed_addons())
{
*recl= rec_length;
*resl= res_length;
return;
}
uchar *plen= record_start + sort_length;
*resl= Addon_fields::read_addon_length(plen);
DBUG_ASSERT(*resl <= res_length);
const uchar *record_end= plen + *resl;
*recl= static_cast<uint>(record_end - record_start);
}
private:
uint m_packable_length;
bool m_using_packed_addons; ///< caches the value of using_packed_addons()
};
typedef Bounds_checked_array<uchar> Sort_buffer;
int merge_many_buff(Sort_param *param, uchar *sort_buffer,
BUFFPEK *buffpek,
uint *maxbuffer, IO_CACHE *t_file);
ulong read_to_buffer(IO_CACHE *fromfile,BUFFPEK *buffpek,
uint sort_length);
int merge_many_buff(Sort_param *param, Sort_buffer sort_buffer,
Merge_chunk *buffpek, uint *maxbuffer, IO_CACHE *t_file);
ulong read_to_buffer(IO_CACHE *fromfile, Merge_chunk *buffpek,
Sort_param *param);
bool merge_buffers(Sort_param *param,IO_CACHE *from_file,
IO_CACHE *to_file, uchar *sort_buffer,
BUFFPEK *lastbuff,BUFFPEK *Fb,
BUFFPEK *Tb,int flag);
int merge_index(Sort_param *param, uchar *sort_buffer,
BUFFPEK *buffpek, uint maxbuffer,
IO_CACHE *tempfile, IO_CACHE *outfile);
void reuse_freed_buff(QUEUE *queue, BUFFPEK *reuse, uint key_length);
IO_CACHE *to_file, Sort_buffer sort_buffer,
Merge_chunk *lastbuff, Merge_chunk *Fb,
Merge_chunk *Tb, int flag);
int merge_index(Sort_param *param, Sort_buffer sort_buffer,
Merge_chunk *buffpek, uint maxbuffer,
IO_CACHE *tempfile, IO_CACHE *outfile);
void reuse_freed_buff(QUEUE *queue, Merge_chunk *reuse, uint key_length);
#endif /* SQL_SORT_INCLUDED */
......@@ -39,7 +39,6 @@
#include "my_tree.h" // element_count
#include "uniques.h" // Unique
#include "sql_sort.h"
#include "myisamchk.h" // BUFFPEK
int unique_write_to_file(uchar* key, element_count count, Unique *unique)
{
......@@ -94,7 +93,7 @@ Unique::Unique(qsort_cmp2 comp_func, void * comp_func_fixed_arg,
init_tree(&tree, (max_in_memory_size / 16), 0, size, comp_func,
NULL, comp_func_fixed_arg, MYF(MY_THREAD_SPECIFIC));
/* If the following fail's the next add will also fail */
my_init_dynamic_array(&file_ptrs, sizeof(BUFFPEK), 16, 16,
my_init_dynamic_array(&file_ptrs, sizeof(Merge_chunk), 16, 16,
MYF(MY_THREAD_SPECIFIC));
/*
If you change the following, change it in get_max_elements function, too.
......@@ -375,10 +374,10 @@ Unique::~Unique()
/* Write tree to disk; clear tree */
bool Unique::flush()
{
BUFFPEK file_ptr;
Merge_chunk file_ptr;
elements+= tree.elements_in_tree;
file_ptr.count=tree.elements_in_tree;
file_ptr.file_pos=my_b_tell(&file);
file_ptr.set_rowcount(tree.elements_in_tree);
file_ptr.set_file_position(my_b_tell(&file));
tree_walk_action action= min_dupl_count ?
(tree_walk_action) unique_write_to_file_with_count :
......@@ -490,7 +489,7 @@ void put_counter_into_merged_element(void *ptr, uint ofs, element_count cnt)
*/
static bool merge_walk(uchar *merge_buffer, size_t merge_buffer_size,
uint key_length, BUFFPEK *begin, BUFFPEK *end,
uint key_length, Merge_chunk *begin, Merge_chunk *end,
tree_walk_action walk_action, void *walk_action_arg,
qsort_cmp2 compare, void *compare_arg,
IO_CACHE *file, bool with_counters)
......@@ -499,7 +498,8 @@ static bool merge_walk(uchar *merge_buffer, size_t merge_buffer_size,
QUEUE queue;
if (end <= begin ||
merge_buffer_size < (size_t) (key_length * (end - begin + 1)) ||
init_queue(&queue, (uint) (end - begin), offsetof(BUFFPEK, key), 0,
init_queue(&queue, (uint) (end - begin),
offsetof(Merge_chunk, m_current_key), 0,
buffpek_compare, &compare_context, 0, 0))
return 1;
/* we need space for one key when a piece of merge buffer is re-read */
......@@ -510,10 +510,16 @@ static bool merge_walk(uchar *merge_buffer, size_t merge_buffer_size,
/* if piece_size is aligned reuse_freed_buffer will always hit */
uint piece_size= max_key_count_per_piece * key_length;
ulong bytes_read; /* to hold return value of read_to_buffer */
BUFFPEK *top;
Merge_chunk *top;
int res= 1;
uint cnt_ofs= key_length - (with_counters ? sizeof(element_count) : 0);
element_count cnt;
// read_to_buffer() needs only rec_length.
Sort_param sort_param;
sort_param.rec_length= key_length;
DBUG_ASSERT(!sort_param.using_addon_fields());
/*
Invariant: queue must contain top element from each tree, until a tree
is not completely walked through.
......@@ -522,15 +528,16 @@ static bool merge_walk(uchar *merge_buffer, size_t merge_buffer_size,
*/
for (top= begin; top != end; ++top)
{
top->base= merge_buffer + (top - begin) * piece_size;
top->max_keys= max_key_count_per_piece;
bytes_read= read_to_buffer(file, top, key_length);
top->set_buffer_start(merge_buffer + (top - begin) * piece_size);
top->set_buffer_end(top->buffer_start() + piece_size);
top->set_max_keys(max_key_count_per_piece);
bytes_read= read_to_buffer(file, top, &sort_param);
if (unlikely(bytes_read == (ulong) -1))
goto end;
DBUG_ASSERT(bytes_read);
queue_insert(&queue, (uchar *) top);
}
top= (BUFFPEK *) queue_top(&queue);
top= (Merge_chunk *) queue_top(&queue);
while (queue.elements > 1)
{
/*
......@@ -540,20 +547,21 @@ static bool merge_walk(uchar *merge_buffer, size_t merge_buffer_size,
elements in each tree are unique. Action is applied only to unique
elements.
*/
void *old_key= top->key;
void *old_key= top->current_key();
/*
read next key from the cache or from the file and push it to the
queue; this gives new top.
*/
top->key+= key_length;
if (--top->mem_count)
top->advance_current_key(key_length);
top->decrement_mem_count();
if (top->mem_count())
queue_replace_top(&queue);
else /* next piece should be read */
{
/* save old_key not to overwrite it in read_to_buffer */
memcpy(save_key_buff, old_key, key_length);
old_key= save_key_buff;
bytes_read= read_to_buffer(file, top, key_length);
bytes_read= read_to_buffer(file, top, &sort_param);
if (unlikely(bytes_read == (ulong) -1))
goto end;
else if (bytes_read) /* top->key, top->mem_count are reset */
......@@ -568,9 +576,9 @@ static bool merge_walk(uchar *merge_buffer, size_t merge_buffer_size,
reuse_freed_buff(&queue, top, key_length);
}
}
top= (BUFFPEK *) queue_top(&queue);
top= (Merge_chunk *) queue_top(&queue);
/* new top has been obtained; if old top is unique, apply the action */
if (compare(compare_arg, old_key, top->key))
if (compare(compare_arg, old_key, top->current_key()))
{
cnt= with_counters ?
get_counter_from_merged_element(old_key, cnt_ofs) : 1;
......@@ -579,9 +587,9 @@ static bool merge_walk(uchar *merge_buffer, size_t merge_buffer_size,
}
else if (with_counters)
{
cnt= get_counter_from_merged_element(top->key, cnt_ofs);
cnt= get_counter_from_merged_element(top->current_key(), cnt_ofs);
cnt+= get_counter_from_merged_element(old_key, cnt_ofs);
put_counter_into_merged_element(top->key, cnt_ofs, cnt);
put_counter_into_merged_element(top->current_key(), cnt_ofs, cnt);
}
}
/*
......@@ -595,13 +603,13 @@ static bool merge_walk(uchar *merge_buffer, size_t merge_buffer_size,
{
cnt= with_counters ?
get_counter_from_merged_element(top->key, cnt_ofs) : 1;
if (walk_action(top->key, cnt, walk_action_arg))
get_counter_from_merged_element(top->current_key(), cnt_ofs) : 1;
if (walk_action(top->current_key(), cnt, walk_action_arg))
goto end;
top->key+= key_length;
top->advance_current_key(key_length);
}
while (--top->mem_count);
bytes_read= read_to_buffer(file, top, key_length);
while (top->decrement_mem_count());
bytes_read= read_to_buffer(file, top, &sort_param);
if (unlikely(bytes_read == (ulong) -1))
goto end;
}
......@@ -657,13 +665,14 @@ bool Unique::walk(TABLE *table, tree_walk_action action, void *walk_action_arg)
if (!(merge_buffer = (uchar *)my_malloc(buff_sz, MYF(MY_WME))))
return 1;
if (buff_sz < full_size * (file_ptrs.elements + 1UL))
res= merge(table, merge_buffer, buff_sz >= full_size * MERGEBUFF2) ;
res= merge(table, merge_buffer, buff_sz,
buff_sz >= full_size * MERGEBUFF2) ;
if (!res)
{
res= merge_walk(merge_buffer, buff_sz, full_size,
(BUFFPEK *) file_ptrs.buffer,
(BUFFPEK *) file_ptrs.buffer + file_ptrs.elements,
(Merge_chunk *) file_ptrs.buffer,
(Merge_chunk *) file_ptrs.buffer + file_ptrs.elements,
action, walk_action_arg,
tree.compare, tree.custom_arg, &file, with_counters);
}
......@@ -684,16 +693,18 @@ bool Unique::walk(TABLE *table, tree_walk_action action, void *walk_action_arg)
All params are 'IN':
table the parameter to access sort context
buff merge buffer
buff_size size of merge buffer
without_last_merge TRUE <=> do not perform the last merge
RETURN VALUE
0 OK
<> 0 error
*/
bool Unique::merge(TABLE *table, uchar *buff, bool without_last_merge)
bool Unique::merge(TABLE *table, uchar *buff, size_t buff_size,
bool without_last_merge)
{
IO_CACHE *outfile= &sort.io_cache;
BUFFPEK *file_ptr= (BUFFPEK*) file_ptrs.buffer;
Merge_chunk *file_ptr= (Merge_chunk*) file_ptrs.buffer;
uint maxbuffer= file_ptrs.elements - 1;
my_off_t save_pos;
bool error= 1;
......@@ -724,7 +735,9 @@ bool Unique::merge(TABLE *table, uchar *buff, bool without_last_merge)
sort_param.cmp_context.key_compare_arg= tree.custom_arg;
/* Merge the buffers to one file, removing duplicates */
if (merge_many_buff(&sort_param,buff,file_ptr,&maxbuffer,&file))
if (merge_many_buff(&sort_param,
Bounds_checked_array<uchar>(buff, buff_size),
file_ptr,&maxbuffer,&file))
goto err;
if (flush_io_cache(&file) ||
reinit_io_cache(&file,READ_CACHE,0L,0,0))
......@@ -736,7 +749,8 @@ bool Unique::merge(TABLE *table, uchar *buff, bool without_last_merge)
file_ptrs.elements= maxbuffer+1;
return 0;
}
if (merge_index(&sort_param, buff, file_ptr, maxbuffer, &file, outfile))
if (merge_index(&sort_param, Bounds_checked_array<uchar>(buff, buff_size),
file_ptr, maxbuffer, &file, outfile))
goto err;
error= 0;
err:
......@@ -791,7 +805,7 @@ bool Unique::get(TABLE *table)
MYF(MY_THREAD_SPECIFIC|MY_WME))))
DBUG_RETURN(1);
if (merge(table, sort_buffer, FALSE))
if (merge(table, sort_buffer, buff_sz, FALSE))
goto err;
rc= 0;
......
......@@ -39,7 +39,7 @@ class Unique :public Sql_alloc
uint min_dupl_count; /* always 0 for unions, > 0 for intersections */
bool with_counters;
bool merge(TABLE *table, uchar *buff, bool without_last_merge);
bool merge(TABLE *table, uchar *buff, size_t size, bool without_last_merge);
bool flush();
public:
......
......@@ -299,11 +299,11 @@ matricule nom prenom
7626 HENIN PHILIPPE
403 HERMITTE PHILIPPE
9096 HELENA PHILIPPE
SELECT matricule, nom, prenom FROM t2 ORDER BY nom LIMIT 10;
SELECT matricule, nom, prenom FROM t2 ORDER BY nom,prenom LIMIT 10;
matricule nom prenom
4552 ABBADIE MONIQUE
6627 ABBAYE GERALD
307 ABBAYE ANNICK
6627 ABBAYE GERALD
7961 ABBE KATIA
1340 ABBE MICHELE
9270 ABBE SOPHIE
......
......@@ -120,7 +120,7 @@ SELECT matricule, nom, prenom FROM t2 WHERE nom <= 'ABEL' OR nom > 'YVON';
SELECT matricule, nom, prenom FROM t2 WHERE nom > 'HELEN' AND nom < 'HEROS';
SELECT matricule, nom, prenom FROM t2 WHERE nom BETWEEN 'HELEN' AND 'HEROS';
SELECT matricule, nom, prenom FROM t2 WHERE nom BETWEEN 'HELEN' AND 'HEROS' AND prenom = 'PHILIPPE';
SELECT matricule, nom, prenom FROM t2 ORDER BY nom LIMIT 10;
SELECT matricule, nom, prenom FROM t2 ORDER BY nom,prenom LIMIT 10;
SELECT a.nom, a.prenom, b.nom FROM t1 a STRAIGHT_JOIN t2 b ON a.prenom = b.prenom WHERE a.nom = 'FOCH' AND a.nom != b.nom;
DROP TABLE t2;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment