Commit 3fd91eca authored by unknown's avatar unknown

Fixed parsing of column names and foreign key constraints in Innobase to...

Fixed parsing of column names and foreign key constraints in Innobase to handle quoted identifiers and identifiers with space. (Bug #1725)
Fix optimizer tuning bug when first used key part was a constant. (Bug #1679)


innobase/dict/dict0dict.c:
  Fixed parsing of column names and foreign key constraints to handle quoted identifiers and identifiers with space. (Bug #1725)
mysql-test/r/innodb.result:
  Test of innodb internal parsing
mysql-test/t/innodb.test:
  Test of innodb internal parsing
sql/sql_class.cc:
  Safety fix for select into outfile and select into dumpfile. Before calling send_error() could cause end_io_cache() to be called several times.
sql/sql_class.h:
  Add path to dumpfile so that we can delete the generated file if something goes wrong.
sql/sql_select.cc:
  Fix optimizer tuning bug when first used key part was a constant.
  Previously all keys that had this key part first was regarded as equal, even if the query used more key parts for some of the keys.
  Now we use the range optimizer results to just limit the number of estimated rows if not all key parts where constants.
  (Bug #1679)
parent 1bd2982a
...@@ -2138,19 +2138,37 @@ dict_scan_col( ...@@ -2138,19 +2138,37 @@ dict_scan_col(
return(ptr); return(ptr);
} }
if (*ptr == '`') { if (*ptr == '`' || *ptr == '"') {
ptr++; /*
} The identifier is quoted. Search for end quote.
We can't use the general code here as the name may contain
old_ptr = ptr; special characters like space.
*/
char quote= *ptr++;
old_ptr= ptr;
/*
The colum name should always end with 'quote' but we check for
end zero just to be safe if this is called outside of MySQL
*/
while (*ptr && *ptr != quote)
ptr++;
*column_name_len = (ulint)(ptr - old_ptr);
if (*ptr) /* Skip end quote */
ptr++;
}
else
{
old_ptr = ptr;
while (!isspace(*ptr) && *ptr != ',' && *ptr != ')' && *ptr != '`' while (!isspace(*ptr) && *ptr != ',' && *ptr != ')'
&& *ptr != '\0') { && *ptr != '\0') {
ptr++; ptr++;
}
*column_name_len = (ulint)(ptr - old_ptr);
} }
*column_name_len = (ulint)(ptr - old_ptr);
if (table == NULL) { if (table == NULL) {
*success = TRUE; *success = TRUE;
...@@ -2161,9 +2179,9 @@ dict_scan_col( ...@@ -2161,9 +2179,9 @@ dict_scan_col(
col = dict_table_get_nth_col(table, i); col = dict_table_get_nth_col(table, i);
if (ut_strlen(col->name) == (ulint)(ptr - old_ptr) if (ut_strlen(col->name) == *column_name_len
&& 0 == ut_cmp_in_lower_case(col->name, old_ptr, && 0 == ut_cmp_in_lower_case(col->name, old_ptr,
(ulint)(ptr - old_ptr))) { *column_name_len)) {
/* Found */ /* Found */
*success = TRUE; *success = TRUE;
...@@ -2175,10 +2193,6 @@ dict_scan_col( ...@@ -2175,10 +2193,6 @@ dict_scan_col(
} }
} }
if (*ptr == '`') {
ptr++;
}
return(ptr); return(ptr);
} }
...@@ -2200,6 +2214,7 @@ dict_scan_table_name( ...@@ -2200,6 +2214,7 @@ dict_scan_table_name(
char* dot_ptr = NULL; char* dot_ptr = NULL;
char* old_ptr; char* old_ptr;
ulint i; ulint i;
char quote = 0;
*success = FALSE; *success = FALSE;
*table = NULL; *table = NULL;
...@@ -2213,14 +2228,16 @@ dict_scan_table_name( ...@@ -2213,14 +2228,16 @@ dict_scan_table_name(
return(ptr); return(ptr);
} }
if (*ptr == '`') { if (*ptr == '`' || *ptr == '"') {
ptr++; quote= *ptr++;
} }
old_ptr = ptr; old_ptr = ptr;
while (!isspace(*ptr) && *ptr != '(' && *ptr != '`' && *ptr != '\0') { while (*ptr != quote &&
if (*ptr == '.') { (quote || (!isspace(*ptr) && *ptr != '(')) &&
*ptr != '\0') {
if (!quote && *ptr == '.') {
dot_ptr = ptr; dot_ptr = ptr;
} }
...@@ -2273,7 +2290,7 @@ dict_scan_table_name( ...@@ -2273,7 +2290,7 @@ dict_scan_table_name(
*table = dict_table_get_low(second_table_name); *table = dict_table_get_low(second_table_name);
if (*ptr == '`') { if (*ptr && *ptr == quote) {
ptr++; ptr++;
} }
...@@ -2293,7 +2310,7 @@ dict_scan_id( ...@@ -2293,7 +2310,7 @@ dict_scan_id(
scannable */ scannable */
ulint* len) /* out: length of the id */ ulint* len) /* out: length of the id */
{ {
ibool scanned_backquote = FALSE; char quote = 0;
*start = NULL; *start = NULL;
...@@ -2306,23 +2323,23 @@ dict_scan_id( ...@@ -2306,23 +2323,23 @@ dict_scan_id(
return(ptr); return(ptr);
} }
if (*ptr == '`') { if (*ptr == '`' || *ptr == '"') {
scanned_backquote = TRUE; quote = *ptr++;
ptr++;
} }
*start = ptr; *start = ptr;
while (!isspace(*ptr) && *ptr != ',' && *ptr != '(' && *ptr != ')' while (*ptr != quote &&
&& *ptr != '\0' && *ptr != '`') { (!quote || (!isspace(*ptr) && *ptr != ',' && *ptr != '(' &&
*ptr != ')'))
&& *ptr != '\0') {
ptr++; ptr++;
} }
*len = (ulint) (ptr - *start); *len = (ulint) (ptr - *start);
if (scanned_backquote) { if (quote) {
if (*ptr == '`') { if (*ptr == quote) {
ptr++; ptr++;
} else { } else {
/* Syntax error */ /* Syntax error */
......
...@@ -1243,3 +1243,6 @@ a ...@@ -1243,3 +1243,6 @@ a
3 3
4 4
drop table t1; drop table t1;
CREATE TABLE t1 (`id 1` INT NOT NULL, PRIMARY KEY (`id 1`)) TYPE=INNODB;
CREATE TABLE t2 (id INT PRIMARY KEY, t1_id INT, INDEX par_ind (t1_id), FOREIGN KEY (`t1_id`) REFERENCES `t1`(`id 1`) ON DELETE CASCADE ) TYPE=INNODB;
drop table t1,t2;
...@@ -869,3 +869,12 @@ truncate table t1; ...@@ -869,3 +869,12 @@ truncate table t1;
insert into t1 (a) values (NULL),(NULL); insert into t1 (a) values (NULL),(NULL);
SELECT * from t1; SELECT * from t1;
drop table t1; drop table t1;
#
# Test dictionary handling with spaceand quoting
#
CREATE TABLE t1 (`id 1` INT NOT NULL, PRIMARY KEY (`id 1`)) TYPE=INNODB;
CREATE TABLE t2 (id INT PRIMARY KEY, t1_id INT, INDEX par_ind (t1_id), FOREIGN KEY (`t1_id`) REFERENCES `t1`(`id 1`) ON DELETE CASCADE ) TYPE=INNODB;
#show create table t2;
drop table t1,t2;
...@@ -538,7 +538,6 @@ select_export::~select_export() ...@@ -538,7 +538,6 @@ select_export::~select_export()
int int
select_export::prepare(List<Item> &list) select_export::prepare(List<Item> &list)
{ {
char path[FN_REFLEN];
uint option=4; uint option=4;
bool blob_flag=0; bool blob_flag=0;
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS #ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
...@@ -739,9 +738,13 @@ bool select_export::send_data(List<Item> &items) ...@@ -739,9 +738,13 @@ bool select_export::send_data(List<Item> &items)
void select_export::send_error(uint errcode,const char *err) void select_export::send_error(uint errcode,const char *err)
{ {
::send_error(&thd->net,errcode,err); ::send_error(&thd->net,errcode,err);
(void) end_io_cache(&cache); if (file > 0)
(void) my_close(file,MYF(0)); {
file= -1; (void) end_io_cache(&cache);
(void) my_close(file,MYF(0));
(void) my_delete(path,MYF(0)); // Delete file on error
file= -1;
}
} }
...@@ -849,10 +852,13 @@ bool select_dump::send_data(List<Item> &items) ...@@ -849,10 +852,13 @@ bool select_dump::send_data(List<Item> &items)
void select_dump::send_error(uint errcode,const char *err) void select_dump::send_error(uint errcode,const char *err)
{ {
::send_error(&thd->net,errcode,err); ::send_error(&thd->net,errcode,err);
(void) end_io_cache(&cache); if (file > 0)
(void) my_close(file,MYF(0)); {
(void) my_delete(path,MYF(0)); // Delete file on error (void) end_io_cache(&cache);
file= -1; (void) my_close(file,MYF(0));
(void) my_delete(path,MYF(0)); // Delete file on error
file= -1;
}
} }
......
...@@ -665,11 +665,13 @@ class select_export :public select_result { ...@@ -665,11 +665,13 @@ class select_export :public select_result {
File file; File file;
IO_CACHE cache; IO_CACHE cache;
ha_rows row_count; ha_rows row_count;
char path[FN_REFLEN];
uint field_term_length; uint field_term_length;
int field_sep_char,escape_char,line_sep_char; int field_sep_char,escape_char,line_sep_char;
bool fixed_row_size; bool fixed_row_size;
public: public:
select_export(sql_exchange *ex) :exchange(ex),file(-1),row_count(0L) {} select_export(sql_exchange *ex) :exchange(ex),file(-1),row_count(0L)
{ path[0]=0; }
~select_export(); ~select_export();
int prepare(List<Item> &list); int prepare(List<Item> &list);
bool send_fields(List<Item> &list, bool send_fields(List<Item> &list,
......
...@@ -1912,7 +1912,8 @@ find_best(JOIN *join,table_map rest_tables,uint idx,double record_count, ...@@ -1912,7 +1912,8 @@ find_best(JOIN *join,table_map rest_tables,uint idx,double record_count,
read_time+=record_count/(double) TIME_FOR_COMPARE; read_time+=record_count/(double) TIME_FOR_COMPARE;
if (join->sort_by_table && if (join->sort_by_table &&
join->sort_by_table != join->positions[join->const_tables].table->table) join->sort_by_table !=
join->positions[join->const_tables].table->table)
read_time+=record_count; // We have to make a temp table read_time+=record_count; // We have to make a temp table
if (read_time < join->best_read) if (read_time < join->best_read)
{ {
...@@ -1946,7 +1947,7 @@ find_best(JOIN *join,table_map rest_tables,uint idx,double record_count, ...@@ -1946,7 +1947,7 @@ find_best(JOIN *join,table_map rest_tables,uint idx,double record_count,
uint max_key_part=0; uint max_key_part=0;
/* Test how we can use keys */ /* Test how we can use keys */
rec= s->records/MATCHING_ROWS_IN_OTHER_TABLE; /* Assumed records/key */ rec= s->records/MATCHING_ROWS_IN_OTHER_TABLE; // Assumed records/key
for (keyuse=s->keyuse ; keyuse->table == table ;) for (keyuse=s->keyuse ; keyuse->table == table ;)
{ {
key_map found_part=0; key_map found_part=0;
...@@ -2085,7 +2086,7 @@ find_best(JOIN *join,table_map rest_tables,uint idx,double record_count, ...@@ -2085,7 +2086,7 @@ find_best(JOIN *join,table_map rest_tables,uint idx,double record_count,
will match will match
*/ */
if (table->quick_keys & ((key_map) 1 << key) && if (table->quick_keys & ((key_map) 1 << key) &&
table->quick_key_parts[key] <= max_key_part) table->quick_key_parts[key] == max_key_part)
tmp=records= (double) table->quick_rows[key]; tmp=records= (double) table->quick_rows[key];
else else
{ {
...@@ -2127,6 +2128,14 @@ find_best(JOIN *join,table_map rest_tables,uint idx,double record_count, ...@@ -2127,6 +2128,14 @@ find_best(JOIN *join,table_map rest_tables,uint idx,double record_count,
} }
records=(ulong) tmp; records=(ulong) tmp;
} }
/*
If quick_select was used on a part of this key, we know
the maximum number of rows that the key can match.
*/
if (table->quick_keys & ((key_map) 1 << key) &&
table->quick_key_parts[key] <= max_key_part &&
records > (double) table->quick_rows[key])
tmp= records= (double) table->quick_rows[key];
} }
/* Limit the number of matched rows */ /* Limit the number of matched rows */
set_if_smaller(tmp, (double) thd->variables.max_seeks_for_key); set_if_smaller(tmp, (double) thd->variables.max_seeks_for_key);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment