Commit 9695974e authored by Alexander Barkov's avatar Alexander Barkov

MDEV-33019 The database part is not case sensitive in SP names

Problem:

sp_cache erroneously looked up fully qualified SP names (e.g. `DB`.`SP`),
in case insensitive style. It was wrong, because only the "name"
part is always case insensitive, while the "db" part should be compared
according to lower_case_table_names (case sensitively for 0,
case insensitively for 1 and 2).

Fix:

Adding a "casedn_name" parameter make_qname() to tell
if the name part should be lower cased:
  `DB1`.`SP` -> "DB1.SP"  (when casedn_name=false)
  `DB1`.`SP` -> "DB1.sp"  (when casedn_name=true)
and using make_qname() with casedn_name=true when creating
sp_cache hash lookup keys.

Details:

As a result, it now works as follows:
- sp_head::m_db is converted to lower case if lower_case_table_names>0
  during the sp_name initialization phase. So when make_qname() is called,
  sp_head::m_db is already normalized. There are no changes in here.

- The initialization phase of sp_head when creating sp_head::m_qname
  now calls make_qname() with casedn_name=true,
  so sp_head::m_name gets written to sp_head::m_qname in lower case.

- sp_cache_lookup() now also calls make_qname() with casedn_name=true,
  so sp_head::m_name gets written to the temporary lookup key in lower case.

- sp_cache::m_hashtable now uses case sensitive comparison
parent 916caac2
......@@ -11,3 +11,39 @@ Database Create Database
mysql_TEST CREATE DATABASE `mysql_TEST` /*!40100 DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci */
DROP DATABASE mysql_test;
DROP DATABASE mysql_TEST;
#
# Start of 10.4 tests
#
#
# MDEV-33019 The database part is not case sensitive in SP names
#
CREATE DATABASE DB1;
CREATE DATABASE db1;
CREATE PROCEDURE DB1.sp() SELECT 'This is DB1.sp' AS ret;
CREATE PROCEDURE db1.sp() SELECT 'This is db1.sp' AS ret;
CALL DB1.sp();
ret
This is DB1.sp
CALL db1.sp();
ret
This is db1.sp
DROP DATABASE DB1;
CALL DB1.sp();
ERROR 42000: PROCEDURE DB1.sp does not exist
CALL db1.sp();
ret
This is db1.sp
DROP DATABASE db1;
CREATE PROCEDURE SP() SELECT 'This is SP' AS ret;
CREATE PROCEDURE sp() SELECT 'This is sp' AS ret;
ERROR 42000: PROCEDURE sp already exists
CALL SP();
ret
This is SP
CALL sp();
ret
This is SP
DROP PROCEDURE SP;
#
# End of 10.4 tests
#
......@@ -18,3 +18,34 @@ DROP DATABASE mysql_test;
DROP DATABASE mysql_TEST;
# End of 10.0 tests
--echo #
--echo # Start of 10.4 tests
--echo #
--echo #
--echo # MDEV-33019 The database part is not case sensitive in SP names
--echo #
CREATE DATABASE DB1;
CREATE DATABASE db1;
CREATE PROCEDURE DB1.sp() SELECT 'This is DB1.sp' AS ret;
CREATE PROCEDURE db1.sp() SELECT 'This is db1.sp' AS ret;
CALL DB1.sp();
CALL db1.sp();
DROP DATABASE DB1;
--error ER_SP_DOES_NOT_EXIST
CALL DB1.sp();
CALL db1.sp();
DROP DATABASE db1;
CREATE PROCEDURE SP() SELECT 'This is SP' AS ret;
--error ER_SP_ALREADY_EXISTS
CREATE PROCEDURE sp() SELECT 'This is sp' AS ret;
CALL SP();
CALL sp();
DROP PROCEDURE SP;
--echo #
--echo # End of 10.4 tests
--echo #
......@@ -195,7 +195,7 @@ sp_head *sp_cache_lookup(sp_cache **cp, const Database_qualified_name *name)
sp_cache *c= *cp;
if (! c)
return NULL;
return c->lookup(buf, name->make_qname(buf, sizeof(buf)));
return c->lookup(buf, name->make_qname(buf, sizeof(buf), true));
}
......@@ -302,7 +302,7 @@ sp_cache::~sp_cache()
void
sp_cache::init()
{
my_hash_init(&m_hashtable, system_charset_info, 0, 0, 0,
my_hash_init(&m_hashtable, &my_charset_bin, 0, 0, 0,
hash_get_key_for_sp_head, hash_free_sp_head, 0);
}
......
......@@ -7080,15 +7080,18 @@ class Identifier_chain2
}
// Export as a qualified name string: 'db.name'
size_t make_qname(char *dst, size_t dstlen) const
size_t make_qname(char *dst, size_t dstlen, bool casedn_part1) const
{
return my_snprintf(dst, dstlen, "%.*s.%.*s",
size_t res= my_snprintf(dst, dstlen, "%.*s.%.*s",
(int) m_name[0].length, m_name[0].str,
(int) m_name[1].length, m_name[1].str);
if (casedn_part1 && dstlen > m_name[0].length)
my_casedn_str(system_charset_info, dst + m_name[0].length + 1);
return res;
}
// Export as a qualified name string, allocate on mem_root.
LEX_CSTRING make_qname(MEM_ROOT *mem_root) const
LEX_CSTRING make_qname(MEM_ROOT *mem_root, bool casedn_part1) const
{
LEX_STRING dst;
/* format: [pkg + dot] + name + '\0' */
......@@ -7097,11 +7100,12 @@ class Identifier_chain2
return {NULL, 0};
if (!m_name[0].length)
{
DBUG_ASSERT(!casedn_part1); // Should not be called this way
dst.length= my_snprintf(dst.str, dst_size, "%.*s",
(int) m_name[1].length, m_name[1].str);
return {dst.str, dst.length};
}
dst.length= make_qname(dst.str, dst_size);
dst.length= make_qname(dst.str, dst_size, casedn_part1);
return {dst.str, dst.length};
}
};
......@@ -7150,15 +7154,15 @@ class Database_qualified_name
const LEX_CSTRING &name);
// Export db and name as a qualified name string: 'db.name'
size_t make_qname(char *dst, size_t dstlen) const
size_t make_qname(char *dst, size_t dstlen, bool casedn_name) const
{
return Identifier_chain2(m_db, m_name).make_qname(dst, dstlen);
return Identifier_chain2(m_db, m_name).make_qname(dst, dstlen, casedn_name);
}
// Export db and name as a qualified name string, allocate on mem_root.
LEX_CSTRING make_qname(MEM_ROOT *mem_root) const
LEX_CSTRING make_qname(MEM_ROOT *mem_root, bool casedn_name) const
{
DBUG_SLOW_ASSERT(ok_for_lower_case_names(m_db.str));
return Identifier_chain2(m_db, m_name).make_qname(mem_root);
return Identifier_chain2(m_db, m_name).make_qname(mem_root, casedn_name);
}
bool make_package_routine_name(MEM_ROOT *mem_root,
......@@ -7169,7 +7173,8 @@ class Database_qualified_name
size_t length= package.length + 1 + routine.length + 1;
if (unlikely(!(tmp= (char *) alloc_root(mem_root, length))))
return true;
m_name.length= Identifier_chain2(package, routine).make_qname(tmp, length);
m_name.length= Identifier_chain2(package, routine).make_qname(tmp, length,
false);
m_name.str= tmp;
return false;
}
......@@ -7198,7 +7203,7 @@ class ErrConvDQName: public ErrConv
{ }
const char *ptr() const
{
m_name->make_qname(err_buffer, sizeof(err_buffer));
m_name->make_qname(err_buffer, sizeof(err_buffer), false);
return err_buffer;
}
};
......
......@@ -6754,7 +6754,7 @@ sp_head *LEX::make_sp_head(THD *thd, const sp_name *name,
name->m_name);
else
sp->init_sp_name(name);
if (!(sp->m_qname= sp->make_qname(sp->get_main_mem_root())).str)
if (!(sp->m_qname= sp->make_qname(sp->get_main_mem_root(), true)).str)
return NULL;
}
sphead= sp;
......@@ -8633,7 +8633,7 @@ bool LEX::call_statement_start(THD *thd, const LEX_CSTRING &db,
// Concat `pkg` and `name` to `pkg.name`
LEX_CSTRING pkg_dot_proc;
if (!(pkg_dot_proc= q_pkg_proc.make_qname(thd->mem_root)).str ||
if (!(pkg_dot_proc= q_pkg_proc.make_qname(thd->mem_root, false)).str ||
check_ident_length(&pkg_dot_proc) ||
!(spname= new (thd->mem_root) sp_name(&db, &pkg_dot_proc, true)))
return true;
......@@ -8699,7 +8699,7 @@ sp_package *LEX::create_package_start(THD *thd,
return NULL;
pkg->reset_thd_mem_root(thd);
pkg->init(this);
if (!(pkg->m_qname= pkg->make_qname(pkg->get_main_mem_root())).str)
if (!(pkg->m_qname= pkg->make_qname(pkg->get_main_mem_root(), true)).str)
return NULL;
sphead= pkg;
return pkg;
......@@ -9045,7 +9045,7 @@ Item *LEX::make_item_func_call_generic(THD *thd,
// Concat `pkg` and `name` to `pkg.name`
LEX_CSTRING pkg_dot_func;
if (!(pkg_dot_func= q_pkg_func.make_qname(thd->mem_root)).str ||
if (!(pkg_dot_func= q_pkg_func.make_qname(thd->mem_root, false)).str ||
check_ident_length(&pkg_dot_func) ||
!(qname= new (thd->mem_root) sp_name(&db, &pkg_dot_func, true)))
return NULL;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment