Commit d6effde5 authored by jan@hundin.mysql.fi's avatar jan@hundin.mysql.fi

Added support for a LOCK TABLES...WHERE ENGINE = InnoDB query which sets

transactional table locks to tables mentioned in the query. These locks
are released at the end of the transaction automatically.
This is fix for bugs #5655, #5998 and issue #3762.
parent 835cf0f1
......@@ -585,6 +585,8 @@ extern lock_sys_t* lock_sys;
#define LOCK_TABLE 16 /* these type values should be so high that */
#define LOCK_REC 32 /* they can be ORed to the lock mode */
#define LOCK_TABLE_EXP 80 /* explicit table lock (80 = 16 + 64) */
#define LOCK_TABLE_TRANSACTIONAL 144
/* transactional table lock (144 = 16 + 128)*/
#define LOCK_TYPE_MASK 0xF0UL /* mask used to extract lock type from the
type_mode field in a lock */
/* Waiting lock flag */
......
......@@ -464,6 +464,10 @@ struct trx_struct{
ulint n_lock_table_exp;/* number of explicit table locks
(LOCK TABLES) reserved by the
transaction, stored in trx_locks */
ulint n_lock_table_transactional;
/* number of transactional table locks
(LOCK TABLES..WHERE ENGINE) reserved by
the transaction, stored in trx_locks */
UT_LIST_NODE_T(trx_t)
trx_list; /* list of transactions */
UT_LIST_NODE_T(trx_t)
......
......@@ -2207,7 +2207,8 @@ lock_grant(
release it at the end of the SQL statement */
lock->trx->auto_inc_lock = lock;
} else if (lock_get_type(lock) == LOCK_TABLE_EXP) {
} else if (lock_get_type(lock) == LOCK_TABLE_EXP ||
lock_get_type(lock) == LOCK_TABLE_TRANSACTIONAL) {
ut_a(lock_get_mode(lock) == LOCK_S
|| lock_get_mode(lock) == LOCK_X);
}
......@@ -3421,6 +3422,10 @@ lock_table_create(
lock->trx->n_lock_table_exp++;
}
if (lock_get_type(lock) == LOCK_TABLE_TRANSACTIONAL) {
lock->trx->n_lock_table_transactional++;
}
lock->un_member.tab_lock.table = table;
UT_LIST_ADD_LAST(un_member.tab_lock.locks, table->locks, lock);
......@@ -3458,7 +3463,11 @@ lock_table_remove_low(
}
if (lock_get_type(lock) == LOCK_TABLE_EXP) {
lock->trx->n_lock_table_exp--;
trx->n_lock_table_exp--;
}
if (lock_get_type(lock) == LOCK_TABLE_TRANSACTIONAL) {
trx->n_lock_table_transactional--;
}
UT_LIST_REMOVE(trx_locks, trx->trx_locks, lock);
......@@ -3592,7 +3601,8 @@ lock_table(
DB_DEADLOCK, or DB_QUE_THR_SUSPENDED */
ulint flags, /* in: if BTR_NO_LOCKING_FLAG bit is set,
does nothing;
if LOCK_TABLE_EXP bits are set,
if LOCK_TABLE_EXP|LOCK_TABLE_TRANSACTIONAL
bits are set,
creates an explicit table lock */
dict_table_t* table, /* in: database table in dictionary cache */
ulint mode, /* in: lock mode */
......@@ -3608,7 +3618,8 @@ lock_table(
return(DB_SUCCESS);
}
ut_a(flags == 0 || flags == LOCK_TABLE_EXP);
ut_a(flags == 0 || flags == LOCK_TABLE_EXP ||
flags == LOCK_TABLE_TRANSACTIONAL);
trx = thr_get_trx(thr);
......@@ -3722,7 +3733,8 @@ lock_table_dequeue(
ut_ad(mutex_own(&kernel_mutex));
#endif /* UNIV_SYNC_DEBUG */
ut_a(lock_get_type(in_lock) == LOCK_TABLE ||
lock_get_type(in_lock) == LOCK_TABLE_EXP);
lock_get_type(in_lock) == LOCK_TABLE_EXP ||
lock_get_type(in_lock) == LOCK_TABLE_TRANSACTIONAL);
lock = UT_LIST_GET_NEXT(un_member.tab_lock.locks, in_lock);
......@@ -3826,7 +3838,9 @@ lock_release_off_kernel(
}
lock_table_dequeue(lock);
if (lock_get_type(lock) == LOCK_TABLE_EXP) {
if (lock_get_type(lock) == LOCK_TABLE_EXP ||
lock_get_type(lock) == LOCK_TABLE_TRANSACTIONAL) {
ut_a(lock_get_mode(lock) == LOCK_S
|| lock_get_mode(lock) == LOCK_X);
}
......@@ -3850,6 +3864,7 @@ lock_release_off_kernel(
ut_a(trx->auto_inc_lock == NULL);
ut_a(trx->n_lock_table_exp == 0);
ut_a(trx->n_lock_table_transactional == 0);
}
/*************************************************************************
......@@ -3915,6 +3930,7 @@ lock_release_tables_off_kernel(
}
ut_a(trx->n_lock_table_exp == 0);
ut_a(trx->n_lock_table_transactional == 0);
}
/*************************************************************************
......@@ -4028,11 +4044,15 @@ lock_table_print(
ut_ad(mutex_own(&kernel_mutex));
#endif /* UNIV_SYNC_DEBUG */
ut_a(lock_get_type(lock) == LOCK_TABLE ||
lock_get_type(lock) == LOCK_TABLE_EXP);
lock_get_type(lock) == LOCK_TABLE_EXP ||
lock_get_type(lock) == LOCK_TABLE_TRANSACTIONAL);
if (lock_get_type(lock) == LOCK_TABLE_EXP) {
fputs("EXPLICIT ", file);
} else if (lock_get_type(lock) == LOCK_TABLE_TRANSACTIONAL) {
fputs("TRANSACTIONAL ", file);
}
fputs("TABLE LOCK table ", file);
ut_print_name(file, lock->trx, lock->un_member.tab_lock.table->name);
fprintf(file, " trx id %lu %lu",
......
......@@ -784,7 +784,7 @@ row_lock_table_for_mysql(
table handle */
dict_table_t* table, /* in: table to lock, or NULL
if prebuilt->table should be
locked as LOCK_TABLE_EXP |
locked or a
prebuilt->select_lock_type */
ulint mode) /* in: lock mode of table */
{
......@@ -821,10 +821,16 @@ run_again:
if (table) {
err = lock_table(0, table, mode, thr);
} else {
if (mode == LOCK_TABLE_TRANSACTIONAL) {
err = lock_table(LOCK_TABLE_TRANSACTIONAL,
prebuilt->table,
prebuilt->select_lock_type, thr);
} else {
err = lock_table(LOCK_TABLE_EXP, prebuilt->table,
prebuilt->select_lock_type, thr);
}
}
trx->error_state = err;
......
......@@ -335,8 +335,10 @@ undo log. If the transaction was not yet committed, then we roll it back.
Note: this is done in a background thread */
void *
trx_rollback_or_clean_all_without_sess(void *i)
/*========================================*/
trx_rollback_or_clean_all_without_sess(
/*===================================*/
/* out: arguments */
void *i) /* in: arguments (unused) */
{
mem_heap_t* heap;
que_fork_t* fork;
......@@ -496,6 +498,7 @@ loop:
goto loop;
os_thread_exit(i); /* not reached */
return(i);
}
/***********************************************************************
......
......@@ -153,6 +153,7 @@ trx_create(
trx->auto_inc_lock = NULL;
trx->n_lock_table_exp = 0;
trx->n_lock_table_transactional = 0;
trx->read_view_heap = mem_heap_create(256);
trx->read_view = NULL;
......@@ -285,6 +286,7 @@ trx_free(
ut_a(!trx->has_search_latch);
ut_a(!trx->auto_inc_lock);
ut_a(!trx->n_lock_table_exp);
ut_a(!trx->n_lock_table_transactional);
ut_a(trx->dict_operation_lock_mode == 0);
......@@ -1645,12 +1647,17 @@ trx_print(
putc('\n', f);
if (trx->n_mysql_tables_in_use > 0 || trx->mysql_n_tables_locked > 0) {
fprintf(f, "mysql tables in use %lu, locked %lu\n",
(ulong) trx->n_mysql_tables_in_use,
(ulong) trx->mysql_n_tables_locked);
}
if (trx->n_lock_table_transactional > 0 || trx->n_lock_table_exp > 0) {
fprintf(f, "mysql explicit table locks %lu, transactional table locks %lu\n",
(ulong) trx->n_lock_table_exp,
(ulong) trx->n_lock_table_transactional);
}
newline = TRUE;
switch (trx->que_state) {
......
......@@ -1327,7 +1327,8 @@ innobase_commit(
3. innobase_query_caching_of_table_permitted(),
4. innobase_savepoint(),
5. ::init_table_handle_for_HANDLER(),
6. innobase_start_trx_and_assign_read_view()
6. innobase_start_trx_and_assign_read_view(),
7. ::transactional_table_lock()
and it is only set to 0 in a commit or a rollback. If it is 0 we know
there cannot be resources to be freed and we could return immediately.
......@@ -5095,8 +5096,9 @@ ha_innobase::start_stmt(
select_lock_type value. The value of
stored_select_lock_type was decided in:
1) ::store_lock(),
2) ::external_lock(), and
3) ::init_table_handle_for_HANDLER(). */
2) ::external_lock(),
3) ::init_table_handle_for_HANDLER(), and
4) :.transactional_table_lock(). */
prebuilt->select_lock_type =
prebuilt->stored_select_lock_type;
......@@ -5287,6 +5289,94 @@ ha_innobase::external_lock(
DBUG_RETURN(0);
}
/**********************************************************************
With this function MySQL request a transactional lock to a table when
user issued query LOCK TABLES..WHERE ENGINE = InnoDB. */
int
ha_innobase::transactional_table_lock(
/*==================================*/
/* out: 0 */
THD* thd, /* in: handle to the user thread */
int lock_type) /* in: lock type */
{
row_prebuilt_t* prebuilt = (row_prebuilt_t*) innobase_prebuilt;
trx_t* trx;
DBUG_ENTER("ha_innobase::transactional_table_lock");
DBUG_PRINT("enter",("lock_type: %d", lock_type));
/* We do not know if MySQL can call this function before calling
external_lock(). To be safe, update the thd of the current table
handle. */
update_thd(thd);
if (prebuilt->table->ibd_file_missing && !current_thd->tablespace_op) {
ut_print_timestamp(stderr);
fprintf(stderr, " InnoDB error:\n"
"MySQL is trying to use a table handle but the .ibd file for\n"
"table %s does not exist.\n"
"Have you deleted the .ibd file from the database directory under\n"
"the MySQL datadir, or have you used DISCARD TABLESPACE?\n"
"Look from section 15.1 of http://www.innodb.com/ibman.html\n"
"how you can resolve the problem.\n",
prebuilt->table->name);
DBUG_RETURN(HA_ERR_CRASHED);
}
trx = prebuilt->trx;
prebuilt->sql_stat_start = TRUE;
prebuilt->hint_need_to_fetch_extra_cols = 0;
prebuilt->read_just_key = 0;
prebuilt->keep_other_fields_on_keyread = FALSE;
if (lock_type == F_WRLCK) {
prebuilt->select_lock_type = LOCK_X;
prebuilt->stored_select_lock_type = LOCK_X;
} else if (lock_type == F_RDLCK) {
prebuilt->select_lock_type = LOCK_X;
prebuilt->stored_select_lock_type = LOCK_X;
} else {
ut_print_timestamp(stderr);
fprintf(stderr, " InnoDB error:\n"
"MySQL is trying to set transactional table lock with corrupted lock type\n"
"to table %s, lock type %d does not exist.\n",
prebuilt->table->name, lock_type);
DBUG_RETURN(HA_ERR_CRASHED);
}
/* MySQL is setting a new transactional table lock */
/* Set the MySQL flag to mark that there is an active transaction */
thd->transaction.all.innodb_active_trans = 1;
if (thd->in_lock_tables && thd->variables.innodb_table_locks) {
ulint error = DB_SUCCESS;
error = row_lock_table_for_mysql(prebuilt,NULL,
LOCK_TABLE_TRANSACTIONAL);
if (error != DB_SUCCESS) {
error = convert_error_code_to_mysql(error, user_thd);
DBUG_RETURN(error);
}
if (thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) {
/* Store the current undo_no of the transaction
so that we know where to roll back if we have
to roll back the next SQL statement */
trx_mark_sql_stat_end(trx);
}
}
DBUG_RETURN(0);
}
/****************************************************************************
Here we export InnoDB status variables to MySQL. */
......
......@@ -149,6 +149,7 @@ class ha_innobase: public handler
int discard_or_import_tablespace(my_bool discard);
int extra(enum ha_extra_function operation);
int external_lock(THD *thd, int lock_type);
int transactional_table_lock(THD *thd, int lock_type);
int start_stmt(THD *thd);
void position(byte *record);
......
......@@ -450,6 +450,7 @@ public:
{ return extra(operation); }
virtual int reset() { return extra(HA_EXTRA_RESET); }
virtual int external_lock(THD *thd, int lock_type)=0;
virtual int transactional_table_lock(THD *thd, int lock_type) {return 0;}
virtual void unlock_row() {}
virtual int start_stmt(THD *thd) {return 0;}
/*
......
......@@ -695,6 +695,9 @@ static void print_lock_error(int error)
case HA_ERR_READ_ONLY_TRANSACTION:
textno=ER_READ_ONLY_TRANSACTION;
break;
case HA_ERR_LOCK_DEADLOCK:
textno=ER_LOCK_DEADLOCK;
break;
default:
textno=ER_CANT_LOCK;
break;
......@@ -870,3 +873,62 @@ bool make_global_read_lock_block_commit(THD *thd)
thd->exit_cond(old_message);
DBUG_RETURN(error);
}
/*
Take transactional table lock for all tables in the list
SYNOPSIS
transactional_lock_tables
thd Thread THD
tables list of tables
counter number of tables in the list
NOTES
RETURN
0 - OK
-1 - error
*/
int transactional_lock_tables(THD *thd, TABLE_LIST *tables, uint counter)
{
uint i;
int lock_type,error=0;
TABLE_LIST *table;
TABLE **start,**ptr;
DBUG_ENTER("transactional_lock_tables");
if (!(ptr=start=(TABLE**) sql_alloc(sizeof(TABLE*) * counter)))
return -1;
for (table= tables; table; table= table->next_global)
{
if (!table->placeholder() && !table->schema_table)
*(ptr++)= table->table;
}
for (i=1 ; i <= counter ; i++, start++)
{
DBUG_ASSERT((*start)->reginfo.lock_type >= TL_READ);
lock_type=F_WRLCK; /* Lock exclusive */
if ((*start)->db_stat & HA_READ_ONLY ||
((*start)->reginfo.lock_type >= TL_READ &&
(*start)->reginfo.lock_type <= TL_READ_NO_INSERT))
lock_type=F_RDLCK;
if ((error=(*start)->file->transactional_table_lock(thd, lock_type)))
{
print_lock_error(error);
DBUG_RETURN(-1);
}
else
{
(*start)->db_stat &= ~ HA_BLOCK_LOCK;
(*start)->current_lock= lock_type;
}
}
DBUG_RETURN(0);
}
......@@ -827,6 +827,7 @@ int open_tables(THD *thd, TABLE_LIST *tables, uint *counter);
int simple_open_n_lock_tables(THD *thd,TABLE_LIST *tables);
bool open_and_lock_tables(THD *thd,TABLE_LIST *tables);
int lock_tables(THD *thd, TABLE_LIST *tables, uint counter);
int transactional_lock_tables(THD *thd, TABLE_LIST *tables, uint counter);
TABLE *open_temporary_table(THD *thd, const char *path, const char *db,
const char *table_name, bool link_in_list);
bool rm_temporary_table(enum db_type base, char *path);
......
......@@ -87,6 +87,7 @@ enum enum_sql_command {
SQLCOM_PREPARE, SQLCOM_EXECUTE, SQLCOM_DEALLOCATE_PREPARE,
SQLCOM_CREATE_VIEW, SQLCOM_DROP_VIEW,
SQLCOM_CREATE_TRIGGER, SQLCOM_DROP_TRIGGER,
SQLCOM_LOCK_TABLES_TRANSACTIONAL,
/* This should be the last !!! */
SQLCOM_END
};
......
......@@ -3252,6 +3252,27 @@ create_error:
thd->options&= ~(ulong) (OPTION_TABLE_LOCK);
thd->in_lock_tables=0;
break;
case SQLCOM_LOCK_TABLES_TRANSACTIONAL:
{
uint counter = 0;
if (check_db_used(thd, all_tables))
goto error;
if (check_table_access(thd, LOCK_TABLES_ACL | SELECT_ACL, all_tables, 0))
goto error;
thd->in_lock_tables=1;
thd->options|= OPTION_TABLE_LOCK;
if (open_tables(thd, all_tables, &counter) == 0 &&
transactional_lock_tables(thd, all_tables, counter) == 0)
send_ok(thd);
else
thd->options&= ~(ulong) (OPTION_TABLE_LOCK);
thd->in_lock_tables=0;
break;
}
case SQLCOM_CREATE_DB:
{
char *alias;
......
......@@ -7395,7 +7395,7 @@ lock:
{
Lex->sql_command=SQLCOM_LOCK_TABLES;
}
table_lock_list
table_lock_list lock_engine_opt
{}
;
......@@ -7422,6 +7422,15 @@ lock_option:
| READ_SYM LOCAL_SYM { $$= TL_READ; }
;
lock_engine_opt:
/* empty */
| WHERE
{
Lex->sql_command=SQLCOM_LOCK_TABLES_TRANSACTIONAL;
}
ENGINE_SYM opt_equal storage_engines
;
unlock:
UNLOCK_SYM table_or_tables { Lex->sql_command=SQLCOM_UNLOCK_TABLES; }
;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment