Commit d12a8413 authored by unknown's avatar unknown

No semantical change. Move checks of compatibility

of requested lock type and requested table operation from 
mysql_insert into a separate function.


sql/sql_insert.cc:
  Cleanup: move a number of locking related checks from mysql_insert 
  to a separate function.
  Add comments.
parent d8658088
...@@ -401,6 +401,79 @@ void mark_fields_used_by_triggers_for_insert_stmt(THD *thd, TABLE *table, ...@@ -401,6 +401,79 @@ void mark_fields_used_by_triggers_for_insert_stmt(THD *thd, TABLE *table,
} }
/**
Upgrade table-level lock of INSERT statement to TL_WRITE if
a more concurrent lock is infeasible for some reason. This is
necessary for engines without internal locking support (MyISAM).
An engine with internal locking implementation might later
downgrade the lock in handler::store_lock() method.
*/
void upgrade_lock_type(THD *thd, thr_lock_type *lock_type,
enum_duplicates duplic,
bool is_multi_insert)
{
if (duplic == DUP_UPDATE ||
duplic == DUP_REPLACE && *lock_type == TL_WRITE_CONCURRENT_INSERT)
{
*lock_type= TL_WRITE;
return;
}
if (*lock_type == TL_WRITE_DELAYED)
{
#ifdef EMBEDDED_LIBRARY
/* No auxiliary threads in the embedded server. */
*lock_type= TL_WRITE;
return;
#else
/*
We do not use delayed threads if:
- we're running in the safe mode or skip-new - the feature
is disabled in these modes
- we're running this query in statement level replication,
on a replication slave - because we must ensure serial
execution of queries on the slave
- it is INSERT .. ON DUPLICATE KEY UPDATE - in this case the
insert cannot be concurrent
*/
if (specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE) ||
thd->slave_thread ||
thd->variables.max_insert_delayed_threads == 0)
{
*lock_type= TL_WRITE;
return;
}
#endif
bool log_on= (thd->options & OPTION_BIN_LOG ||
! (thd->security_ctx->master_access & SUPER_ACL));
if (log_on && mysql_bin_log.is_open() && is_multi_insert)
{
/*
Statement-based binary logging does not work in this case, because:
a) two concurrent statements may have their rows intermixed in the
queue, leading to autoincrement replication problems on slave (because
the values generated used for one statement don't depend only on the
value generated for the first row of this statement, so are not
replicable)
b) if first row of the statement has an error the full statement is
not binlogged, while next rows of the statement may be inserted.
c) if first row succeeds, statement is binlogged immediately with a
zero error code (i.e. "no error"), if then second row fails, query
will fail on slave too and slave will stop (wrongly believing that the
master got no error).
So we fall back to non-delayed INSERT.
*/
*lock_type= TL_WRITE;
}
}
}
/**
INSERT statement implementation
*/
bool mysql_insert(THD *thd,TABLE_LIST *table_list, bool mysql_insert(THD *thd,TABLE_LIST *table_list,
List<Item> &fields, List<Item> &fields,
List<List_item> &values_list, List<List_item> &values_list,
...@@ -436,58 +509,31 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, ...@@ -436,58 +509,31 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
DBUG_ENTER("mysql_insert"); DBUG_ENTER("mysql_insert");
/* /*
in safe mode or with skip-new change delayed insert to be regular Upgrade lock type if the requested lock is incompatible with
if we are told to replace duplicates, the insert cannot be concurrent the current connection mode or table operation.
delayed insert changed to regular in slave thread
*/ */
#ifdef EMBEDDED_LIBRARY upgrade_lock_type(thd, &table_list->lock_type, duplic,
if (lock_type == TL_WRITE_DELAYED) values_list.elements > 1);
lock_type=TL_WRITE; lock_type= table_list->lock_type;
#else
if ((lock_type == TL_WRITE_DELAYED &&
((specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE)) ||
thd->slave_thread || !thd->variables.max_insert_delayed_threads)) ||
(lock_type == TL_WRITE_CONCURRENT_INSERT && duplic == DUP_REPLACE) ||
(duplic == DUP_UPDATE))
lock_type=TL_WRITE;
#endif
if ((lock_type == TL_WRITE_DELAYED) &&
log_on && mysql_bin_log.is_open() &&
(values_list.elements > 1))
{
/* /*
Statement-based binary logging does not work in this case, because: We can't write-delayed into a table locked with LOCK TABLES:
a) two concurrent statements may have their rows intermixed in the this will lead to a deadlock, since the delayed thread will
queue, leading to autoincrement replication problems on slave (because never be able to get a lock on the table. QQQ: why not
the values generated used for one statement don't depend only on the upgrade the lock here instead?
value generated for the first row of this statement, so are not
replicable)
b) if first row of the statement has an error the full statement is
not binlogged, while next rows of the statement may be inserted.
c) if first row succeeds, statement is binlogged immediately with a
zero error code (i.e. "no error"), if then second row fails, query
will fail on slave too and slave will stop (wrongly believing that the
master got no error).
So we fallback to non-delayed INSERT.
*/ */
lock_type= TL_WRITE; if (lock_type == TL_WRITE_DELAYED && thd->locked_tables &&
find_locked_table(thd, table_list->db, table_list->table_name))
{
my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0),
table_list->table_name);
DBUG_RETURN(TRUE);
} }
table_list->lock_type= lock_type;
#ifndef EMBEDDED_LIBRARY #ifndef EMBEDDED_LIBRARY
if (lock_type == TL_WRITE_DELAYED) if (lock_type == TL_WRITE_DELAYED)
{ {
res= 1; res= 1;
if (thd->locked_tables)
{
DBUG_ASSERT(table_list->db); /* Must be set in the parser */
if (find_locked_table(thd, table_list->db, table_list->table_name))
{
my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0),
table_list->table_name);
DBUG_RETURN(TRUE);
}
}
if ((table= delayed_get_table(thd,table_list)) && !thd->is_fatal_error) if ((table= delayed_get_table(thd,table_list)) && !thd->is_fatal_error)
{ {
/* /*
...@@ -1460,6 +1506,12 @@ class delayed_row :public ilink { ...@@ -1460,6 +1506,12 @@ class delayed_row :public ilink {
} }
}; };
/**
delayed_insert - context of a thread responsible for delayed insert
into one table. When processing delayed inserts, we create an own
thread for every distinct table. Later on all delayed inserts directed
into that table are handled by a dedicated thread.
*/
class delayed_insert :public ilink { class delayed_insert :public ilink {
uint locks_in_memory; uint locks_in_memory;
...@@ -1551,6 +1603,12 @@ class delayed_insert :public ilink { ...@@ -1551,6 +1603,12 @@ class delayed_insert :public ilink {
I_List<delayed_insert> delayed_threads; I_List<delayed_insert> delayed_threads;
/**
Return an instance of delayed insert thread that can handle
inserts into a given table, if it exists. Otherwise return NULL.
*/
static
delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list) delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list)
{ {
thd->proc_info="waiting for delay_list"; thd->proc_info="waiting for delay_list";
...@@ -1571,6 +1629,18 @@ delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list) ...@@ -1571,6 +1629,18 @@ delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list)
} }
/**
Attempt to find or create a delayed insert thread to handle inserts
into this table.
@return Return an instance of the table in the delayed thread
@retval NULL too many delayed threads OR
this thread ran out of resources OR
a newly created delayed insert thread ran out of resources OR
the delayed insert thread failed to open the table.
In the last three cases an error is set in THD.
*/
static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list) static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list)
{ {
int error; int error;
...@@ -1679,11 +1749,17 @@ static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list) ...@@ -1679,11 +1749,17 @@ static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list)
} }
/* /**
As we can't let many threads modify the same TABLE structure, we create As we can't let many client threads modify the same TABLE
an own structure for each tread. This includes a row buffer to save the structure of the dedicated delayed insert thread, we create an
column values and new fields that points to the new row buffer. own structure for each client thread. This includes a row
The memory is allocated in the client thread and is freed automaticly. buffer to save the column values and new fields that point to
the new row buffer. The memory is allocated in the client
thread and is freed automatically.
@pre This function is called from the client thread. Delayed
insert thread mutex must be acquired before invoking this
function.
*/ */
TABLE *delayed_insert::get_local_table(THD* client_thd) TABLE *delayed_insert::get_local_table(THD* client_thd)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment