Commit f09388fa authored by Michael Widenius's avatar Michael Widenius

Fix for lp:711565 "Index Condition Pushdown can make a thread hold MyISAM...

Fix for lp:711565 "Index Condition Pushdown can make a thread hold MyISAM locks as well as be unKILLable for long time"
- In Maria/MyISAM: Release/re-acquire locks to give queries that wait on them a chance to make progress
- In Maria/MyISAM: Change from numeric constants to ICP_RES values.
- In Maria: Do check index condition in maria_rprev() (was lost in the merge/backport?)
- In Maria/MyISAM/XtraDB: Check if the query was killed, and return immediately if it was.

Added new storage engine error: HA_ERR_ABORTED_BY_USER, for handler to signal that it detected a kill of the query and aborted

Authors: Sergey Petrunia & Monty

include/my_base.h:
  Added HA_ERR_ABORTED_BY_USER, for handler to signal that it detected a kill of the query and aborted
include/my_handler.h:
  Added comment
mysql-test/r/myisam_icp.result:
  Updated test
mysql-test/t/myisam_icp.test:
  Drop used tables at start of test
  Added test case that can help with manual testing of killing index condition pushdown query.
mysys/my_handler_errors.h:
  Text for new storage engine error
sql/handler.cc:
  If engine got HA_ERR_ABORTED_BY_USER, send kill message.
sql/multi_range_read.cc:
  Return error code
storage/maria/ha_maria.cc:
  Added ma_killed_in_mariadb() to detect kill.
  Ensure that file->external_ref points to TABLE object.
storage/maria/ma_extra.c:
  Dummy test-if-killed for standalone
storage/maria/ma_key.c:
  If ma_check_index_cond() fails, set my_errno and info->cur_row.lastpos
storage/maria/ma_rkey.c:
  Release/re-acquire locks to give queries that wait on them a chance to make progress
  Check if the query was killed, and return immediately if it was
storage/maria/ma_rnext.c:
  Check if the query was killed, and return immediately if it was
  Added missing fast_ma_writeinfo(info)
storage/maria/ma_rnext_same.c:
  Check if the query was killed, and return immediately if it was
  Added missing fast_ma_writeinfo(info)
storage/maria/ma_rprev.c:
  Check if the query was killed, and return immediately if it was
  Added missing fast_ma_writeinfo(info) and ma_check_index_cond()
storage/maria/ma_search.c:
  Give error message if we find a wrong key
storage/maria/ma_static.c:
  Added pointer to test-if-killed function
storage/maria/maria_def.h:
  New prototypes
storage/myisam/ha_myisam.cc:
  Added mi_killed_in_mariadb()
  Ensure that file->external_ref points to TABLE object.
storage/myisam/mi_extra.c:
  Dummy test-if-killed for standalone
storage/myisam/mi_key.c:
  If ma_check_index_cond() fails, set my_errno and info->lastpos
storage/myisam/mi_rkey.c:
  Ensure that info->lastpos= HA_OFFSET_ERROR in case of error
  Release/re-acquire locks to give queries that wait on them a chance to make progress
  Check if the query was killed, and return immediately if it was
  Reorder code to do less things in case of error.
  Added missing fast_mi_writeinfo()
storage/myisam/mi_rnext.c:
  Check if the query was killed, and return immediately if it was
  Simplify old ICP code
  Added missing fast_ma_writeinfo(info)
storage/myisam/mi_rnext_same.c:
  Check if the query was killed, and return immediately if it was
  Added missing fast_mi_writeinfo(info)
storage/myisam/mi_rprev.c:
  Check if the query was killed, and return immediately if it was
  Simplify error handling of ICP
  Added missing fast_mi_writeinfo(info)
storage/myisam/mi_search.c:
  Give error message if we find a wrong key
storage/myisam/mi_static.c:
  Added pointer to test-if-killed function
storage/myisam/myisamdef.h:
  New prototypes
storage/xtradb/handler/ha_innodb.cc:
  Added DB_SEARCH_ABORTED_BY_USER and ha_innobase::is_thd_killed()
  Check if the query was killed, and return immediately if it was
storage/xtradb/handler/ha_innodb.h:
  Added prototype
storage/xtradb/include/db0err.h:
  Added DB_SEARCH_ABORTED_BY_USER
storage/xtradb/include/row0mysql.h:
  Added possible ICP errors
storage/xtradb/row/row0sel.c:
  Use ICP errors instead of constants.
  Detect if killed and return B_SEARCH_ABORTED_BY_USER
parent ff6a8deb
...@@ -445,7 +445,8 @@ enum ha_base_keytype { ...@@ -445,7 +445,8 @@ enum ha_base_keytype {
#define HA_ERR_WRONG_CRC 176 /* Wrong CRC on page */ #define HA_ERR_WRONG_CRC 176 /* Wrong CRC on page */
#define HA_ERR_ROW_NOT_VISIBLE 177 #define HA_ERR_ROW_NOT_VISIBLE 177
#define HA_ERR_TOO_MANY_CONCURRENT_TRXS 178 /*Too many active concurrent transactions */ #define HA_ERR_TOO_MANY_CONCURRENT_TRXS 178 /*Too many active concurrent transactions */
#define HA_ERR_LAST 178 /* Copy of last error nr */ #define HA_ERR_ABORTED_BY_USER 179
#define HA_ERR_LAST 179 /* Copy of last error nr */
/* Number of different errors */ /* Number of different errors */
#define HA_ERR_ERRORS (HA_ERR_LAST - HA_ERR_FIRST + 1) #define HA_ERR_ERRORS (HA_ERR_LAST - HA_ERR_FIRST + 1)
......
...@@ -135,6 +135,9 @@ extern void my_handler_error_unregister(void); ...@@ -135,6 +135,9 @@ extern void my_handler_error_unregister(void);
if we're scanning "t.key BETWEEN 10 AND 20" and got a if we're scanning "t.key BETWEEN 10 AND 20" and got a
"t.key=21" tuple (the engine should stop scanning and return "t.key=21" tuple (the engine should stop scanning and return
HA_ERR_END_OF_FILE right away). HA_ERR_END_OF_FILE right away).
-1= ICP_ERROR - Reserved for internal errors in engines. Should not be
returned by index_cond_func_xxx
*/ */
typedef enum icp_result { typedef enum icp_result {
......
...@@ -148,3 +148,4 @@ COUNT(*) ...@@ -148,3 +148,4 @@ COUNT(*)
12 12
DROP PROCEDURE insert_data; DROP PROCEDURE insert_data;
DROP TABLE t1, t2, t3; DROP TABLE t1, t2, t3;
drop table if exists t0, t1, t1i, t1m;
...@@ -4,3 +4,191 @@ ...@@ -4,3 +4,191 @@
--source include/icp_tests.inc --source include/icp_tests.inc
--disable_warnings
drop table if exists t0, t1, t1i, t1m;
--enable_warnings
#
# BUG#711565 Index Condition Pushdown can make a thread hold MyISAM locks as well as be unKILLable for long time
# This is not a ready mysqltest testcase but rather a set of queries that
# allow to check the bug[fix] manually. Making this testcase automatic is
# difficult because
# - big tables are involved
# - it's difficult to have automatic checks of whether the query could be
# KILLed that would reliably work on fast/slow buildslaves, etc.
--disable_parsing
create table t0 (a int);
insert into t0 values (0),(1),(2),(3),(4),(5),(6),(7),(8),(9);
create table t1 (
kp1 int, kp2 int,
filler char(100),
col int,
key(kp1, kp2)
);
set myisam_sort_buffer_size=32*1000*1000;
insert into t1
select
1000 + A.a + 10*B.a + 100*C.a + 1000*D.a + 10000 * F.a,
1,
'filler-data filler-data filler-data filler-data filler-data',
1
from
t0 A, t0 B, t0 C, t0 D, t0 E, t0 F, t0 G
;
insert into t1 values (990, 100, 'a record to test index_next checks',100);
update t1 set kp2=10 where kp1 between 20000+100 and 28000;
update t1 set kp1=20000 where kp1 between 20000 and 28000;
insert into t1 values (20000, 100, 'last-20K-record',1);
create table t1i like t1;
alter table t1i engine=innodb;
insert into t1i select * from t1;
create table t1m like t1;
alter table t1m engine=maria;
insert into t1m select * from t1;
#
# XtraDB has one check for all kinds of ranges.
#
explain
select * from t1i
where
kp1 < 8000 and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
kp2 +1 > 10;
#
# MyISAM, check range access + mi_rnext():
# (will return HA_ERR_END_OF_FILE)
explain
select * from t1
where
kp1 < 10000 and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
kp2 +1 > 10;
#
# MyISAM, check range access + mi_rkey():
# (will return HA_ERR_END_OF_FILE)
explain
select * from t1
where
kp1 >= 999 and kp1 < 10000 and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
kp2 +1 > 10;
#
# MyISAM, check mi_rnext_same:
#
explain
select * from t1
where
kp1 = 20000 and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
kp2 +1 < 10;
#
# MyISAM, check mi_rprev:
#
explain
select * from t1
where
kp1 = 20000 and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
kp2 +1 > 20
order by kp1, kp2 desc;
#
# Maria, check range access + maria_rkey():
#
explain
select * from t1m
where
kp1 >= 999 and kp1 < 10000 and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
kp2 +1 > 10;
#
# Maria, check range access + maria_rnext():
#
explain
select * from t1m
where
kp1 < 10000 and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
kp2 +1 > 10;
#
# Maria, check maria_rnext_same:
#
explain
select * from t1m
where
kp1 = 20000 and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
kp2 +1 > 100;
#
# Maria, check maria_rprev:
#
explain
select * from t1m
where
kp1 = 20000 and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
concat(repeat('foo-bar-', 100000), kp2) like '%bar-1%' and
kp2 +1 > 20
order by kp1, kp2 desc;
drop table t0, t1, t1i, t1m;
--enable_parsing
...@@ -64,6 +64,6 @@ static const char *handler_error_messages[]= ...@@ -64,6 +64,6 @@ static const char *handler_error_messages[]=
"File too short; Expected more data in file", "File too short; Expected more data in file",
"Read page with wrong checksum", "Read page with wrong checksum",
"Too many active concurrent transactions", "Too many active concurrent transactions",
"Row is not visible by the current transaction" "Row is not visible by the current transaction",
"Operation was interrupted by end user (probably kill command?)"
}; };
...@@ -2688,6 +2688,12 @@ void handler::print_error(int error, myf errflag) ...@@ -2688,6 +2688,12 @@ void handler::print_error(int error, myf errflag)
SET_FATAL_ERROR; SET_FATAL_ERROR;
textno=ER_KEY_NOT_FOUND; textno=ER_KEY_NOT_FOUND;
break; break;
case HA_ERR_ABORTED_BY_USER:
{
DBUG_ASSERT(table->in_use->killed);
table->in_use->send_kill_message();
DBUG_VOID_RETURN;
}
case HA_ERR_WRONG_MRG_TABLE_DEF: case HA_ERR_WRONG_MRG_TABLE_DEF:
textno=ER_WRONG_MRG_TABLE; textno=ER_WRONG_MRG_TABLE;
break; break;
......
...@@ -902,7 +902,7 @@ int DsMrr_impl::dsmrr_init(handler *h_arg, RANGE_SEQ_IF *seq_funcs, ...@@ -902,7 +902,7 @@ int DsMrr_impl::dsmrr_init(handler *h_arg, RANGE_SEQ_IF *seq_funcs,
close_second_handler(); close_second_handler();
/* Safety, not really needed but: */ /* Safety, not really needed but: */
strategy= NULL; strategy= NULL;
DBUG_RETURN(1); DBUG_RETURN(res);
use_default_impl: use_default_impl:
DBUG_ASSERT(primary_file->inited == handler::INDEX); DBUG_ASSERT(primary_file->inited == handler::INDEX);
......
...@@ -758,7 +758,7 @@ void _ma_check_print_warning(HA_CHECK *param, const char *fmt, ...) ...@@ -758,7 +758,7 @@ void _ma_check_print_warning(HA_CHECK *param, const char *fmt, ...)
static int maria_create_trn_for_mysql(MARIA_HA *info) static int maria_create_trn_for_mysql(MARIA_HA *info)
{ {
THD *thd= (THD*) info->external_ptr; THD *thd= ((TABLE*) info->external_ref)->in_use;
TRN *trn= THD_TRN; TRN *trn= THD_TRN;
DBUG_ENTER("maria_create_trn_for_mysql"); DBUG_ENTER("maria_create_trn_for_mysql");
...@@ -797,6 +797,11 @@ static int maria_create_trn_for_mysql(MARIA_HA *info) ...@@ -797,6 +797,11 @@ static int maria_create_trn_for_mysql(MARIA_HA *info)
DBUG_RETURN(0); DBUG_RETURN(0);
} }
my_bool ma_killed_in_mariadb(MARIA_HA *info)
{
return (((TABLE*) (info->external_ref))->in_use->killed != 0);
}
} /* extern "C" */ } /* extern "C" */
/** /**
...@@ -1013,6 +1018,8 @@ int ha_maria::open(const char *name, int mode, uint test_if_locked) ...@@ -1013,6 +1018,8 @@ int ha_maria::open(const char *name, int mode, uint test_if_locked)
return (my_errno ? my_errno : -1); return (my_errno ? my_errno : -1);
file->s->chst_invalidator= query_cache_invalidate_by_MyISAM_filename_ref; file->s->chst_invalidator= query_cache_invalidate_by_MyISAM_filename_ref;
/* Set external_ref, mainly for temporary tables */
file->external_ref= (void*) table; // For ma_killed()
if (test_if_locked & (HA_OPEN_IGNORE_IF_LOCKED | HA_OPEN_TMP_TABLE)) if (test_if_locked & (HA_OPEN_IGNORE_IF_LOCKED | HA_OPEN_TMP_TABLE))
VOID(maria_extra(file, HA_EXTRA_NO_WAIT_LOCK, 0)); VOID(maria_extra(file, HA_EXTRA_NO_WAIT_LOCK, 0));
...@@ -2525,6 +2532,7 @@ void ha_maria::drop_table(const char *name) ...@@ -2525,6 +2532,7 @@ void ha_maria::drop_table(const char *name)
int ha_maria::external_lock(THD *thd, int lock_type) int ha_maria::external_lock(THD *thd, int lock_type)
{ {
DBUG_ENTER("ha_maria::external_lock"); DBUG_ENTER("ha_maria::external_lock");
file->external_ref= (void*) table; // For ma_killed()
/* /*
We don't test now_transactional because it may vary between lock/unlock We don't test now_transactional because it may vary between lock/unlock
and thus confuse our reference counting. and thus confuse our reference counting.
...@@ -2543,8 +2551,6 @@ int ha_maria::external_lock(THD *thd, int lock_type) ...@@ -2543,8 +2551,6 @@ int ha_maria::external_lock(THD *thd, int lock_type)
/* Transactional table */ /* Transactional table */
if (lock_type != F_UNLCK) if (lock_type != F_UNLCK)
{ {
file->external_ptr= thd; // For maria_register_trn()
if (!file->s->lock_key_trees) // If we don't use versioning if (!file->s->lock_key_trees) // If we don't use versioning
{ {
/* /*
...@@ -3392,6 +3398,9 @@ static int ha_maria_init(void *p) ...@@ -3392,6 +3398,9 @@ static int ha_maria_init(void *p)
#endif #endif
if (res) if (res)
maria_hton= 0; maria_hton= 0;
ma_killed= ma_killed_in_mariadb;
return res ? HA_ERR_INITIALIZATION : 0; return res ? HA_ERR_INITIALIZATION : 0;
} }
......
...@@ -635,3 +635,9 @@ int _ma_flush_table_files(MARIA_HA *info, uint flush_data_or_index, ...@@ -635,3 +635,9 @@ int _ma_flush_table_files(MARIA_HA *info, uint flush_data_or_index,
return 1; return 1;
} }
my_bool ma_killed_standalone(MARIA_HA *info __attribute__((unused)))
{
return 0;
}
...@@ -669,25 +669,39 @@ int _ma_read_key_record(MARIA_HA *info, uchar *buf, MARIA_RECORD_POS filepos) ...@@ -669,25 +669,39 @@ int _ma_read_key_record(MARIA_HA *info, uchar *buf, MARIA_RECORD_POS filepos)
will look for column values there) will look for column values there)
RETURN RETURN
ICP_ERROR Error ICP_ERROR Error ; my_errno set to HA_ERR_CRASHED
ICP_NO_MATCH Index condition is not satisfied, continue scanning ICP_NO_MATCH Index condition is not satisfied, continue scanning
ICP_MATCH Index condition is satisfied ICP_MATCH Index condition is satisfied
ICP_OUT_OF_RANGE Index condition is not satisfied, end the scan. ICP_OUT_OF_RANGE Index condition is not satisfied, end the scan.
my_errno set to HA_ERR_END_OF_FILE
info->cur_row.lastpos is set to HA_OFFSET_ERROR in case of ICP_ERROR or
ICP_OUT_OF_RANGE to indicate that we don't have any active row.
*/ */
int ma_check_index_cond(register MARIA_HA *info, uint keynr, uchar *record) ICP_RESULT ma_check_index_cond(register MARIA_HA *info, uint keynr,
uchar *record)
{ {
ICP_RESULT res= ICP_MATCH;
if (info->index_cond_func) if (info->index_cond_func)
{ {
if (_ma_put_key_in_record(info, keynr, FALSE, record)) if (_ma_put_key_in_record(info, keynr, FALSE, record))
{ {
/* Impossible case; Can only happen if bug in code */
maria_print_error(info->s, HA_ERR_CRASHED); maria_print_error(info->s, HA_ERR_CRASHED);
my_errno=HA_ERR_CRASHED; info->cur_row.lastpos= HA_OFFSET_ERROR; /* No active record */
return -1; my_errno= HA_ERR_CRASHED;
res= ICP_ERROR;
}
else if ((res= info->index_cond_func(info->index_cond_func_arg)) ==
ICP_OUT_OF_RANGE)
{
/* We got beyond the end of scanned range */
info->cur_row.lastpos= HA_OFFSET_ERROR; /* No active record */
my_errno= HA_ERR_END_OF_FILE;
} }
return info->index_cond_func(info->index_cond_func_arg);
} }
return 1; return res;
} }
......
...@@ -34,7 +34,7 @@ int maria_rkey(MARIA_HA *info, uchar *buf, int inx, const uchar *key_data, ...@@ -34,7 +34,7 @@ int maria_rkey(MARIA_HA *info, uchar *buf, int inx, const uchar *key_data,
HA_KEYSEG *last_used_keyseg; HA_KEYSEG *last_used_keyseg;
uint32 nextflag; uint32 nextflag;
MARIA_KEY key; MARIA_KEY key;
int icp_res= 1; ICP_RESULT icp_res= ICP_MATCH;
DBUG_ENTER("maria_rkey"); DBUG_ENTER("maria_rkey");
DBUG_PRINT("enter", ("base: 0x%lx buf: 0x%lx inx: %d search_flag: %d", DBUG_PRINT("enter", ("base: 0x%lx buf: 0x%lx inx: %d search_flag: %d",
(long) info, (long) buf, inx, search_flag)); (long) info, (long) buf, inx, search_flag));
...@@ -115,7 +115,7 @@ int maria_rkey(MARIA_HA *info, uchar *buf, int inx, const uchar *key_data, ...@@ -115,7 +115,7 @@ int maria_rkey(MARIA_HA *info, uchar *buf, int inx, const uchar *key_data,
not satisfied with an out-of-range condition. not satisfied with an out-of-range condition.
*/ */
if ((*share->row_is_visible)(info) && if ((*share->row_is_visible)(info) &&
((icp_res= ma_check_index_cond(info, inx, buf)) != 0)) ((icp_res= ma_check_index_cond(info, inx, buf)) != ICP_NO_MATCH))
break; break;
/* The key references a concurrently inserted record. */ /* The key references a concurrently inserted record. */
...@@ -145,6 +145,18 @@ int maria_rkey(MARIA_HA *info, uchar *buf, int inx, const uchar *key_data, ...@@ -145,6 +145,18 @@ int maria_rkey(MARIA_HA *info, uchar *buf, int inx, const uchar *key_data,
if (_ma_search_next(info, &lastkey, maria_readnext_vec[search_flag], if (_ma_search_next(info, &lastkey, maria_readnext_vec[search_flag],
info->s->state.key_root[inx])) info->s->state.key_root[inx]))
break; /* purecov: inspected */ break; /* purecov: inspected */
/*
If we are at the last key on the key page, allow writers to
access the index.
*/
if (info->int_keypos >= info->int_maxpos &&
ma_yield_and_check_if_killed(info, inx))
{
DBUG_ASSERT(info->cur_row.lastpos == HA_OFFSET_ERROR);
break;
}
/* /*
Check that the found key does still match the search. Check that the found key does still match the search.
_ma_search_next() delivers the next key regardless of its _ma_search_next() delivers the next key regardless of its
...@@ -164,15 +176,19 @@ int maria_rkey(MARIA_HA *info, uchar *buf, int inx, const uchar *key_data, ...@@ -164,15 +176,19 @@ int maria_rkey(MARIA_HA *info, uchar *buf, int inx, const uchar *key_data,
} while (!(*share->row_is_visible)(info) || } while (!(*share->row_is_visible)(info) ||
((icp_res= ma_check_index_cond(info, inx, buf)) == 0)); ((icp_res= ma_check_index_cond(info, inx, buf)) == 0));
} }
else
{
DBUG_ASSERT(info->cur_row.lastpos);
}
} }
if (share->lock_key_trees) if (share->lock_key_trees)
rw_unlock(&keyinfo->root_lock); rw_unlock(&keyinfo->root_lock);
if (info->cur_row.lastpos == HA_OFFSET_ERROR || (icp_res != 1)) if (info->cur_row.lastpos == HA_OFFSET_ERROR)
{ {
if (icp_res == 2) if (icp_res == ICP_OUT_OF_RANGE)
{ {
info->cur_row.lastpos= HA_OFFSET_ERROR; /* We don't want HA_ERR_END_OF_FILE in this particular case */
my_errno= HA_ERR_KEY_NOT_FOUND; my_errno= HA_ERR_KEY_NOT_FOUND;
} }
fast_ma_writeinfo(info); fast_ma_writeinfo(info);
...@@ -214,3 +230,35 @@ int maria_rkey(MARIA_HA *info, uchar *buf, int inx, const uchar *key_data, ...@@ -214,3 +230,35 @@ int maria_rkey(MARIA_HA *info, uchar *buf, int inx, const uchar *key_data,
info->update|=HA_STATE_NEXT_FOUND; /* Previous gives last row */ info->update|=HA_STATE_NEXT_FOUND; /* Previous gives last row */
DBUG_RETURN(my_errno); DBUG_RETURN(my_errno);
} /* _ma_rkey */ } /* _ma_rkey */
/*
Yield to possible other writers during a index scan.
Check also if we got killed by the user and if yes, return
HA_ERR_LOCK_WAIT_TIMEOUT
return 0 ok
return 1 Query has been requested to be killed
*/
my_bool ma_yield_and_check_if_killed(MARIA_HA *info, int inx)
{
MARIA_SHARE *share;
if (ma_killed(info))
{
/* Mark that we don't have an active row */
info->cur_row.lastpos= HA_OFFSET_ERROR;
/* Set error that we where aborted by kill from application */
my_errno= HA_ERR_ABORTED_BY_USER;
return 1;
}
if ((share= info->s)->lock_key_trees)
{
/* Give writers a chance to access index */
rw_unlock(&share->keyinfo[inx].root_lock);
rw_rdlock(&share->keyinfo[inx].root_lock);
}
return 0;
}
...@@ -30,7 +30,7 @@ int maria_rnext(MARIA_HA *info, uchar *buf, int inx) ...@@ -30,7 +30,7 @@ int maria_rnext(MARIA_HA *info, uchar *buf, int inx)
uint flag; uint flag;
MARIA_SHARE *share= info->s; MARIA_SHARE *share= info->s;
MARIA_KEYDEF *keyinfo; MARIA_KEYDEF *keyinfo;
int icp_res= 1; ICP_RESULT icp_res= ICP_MATCH;
DBUG_ENTER("maria_rnext"); DBUG_ENTER("maria_rnext");
if ((inx = _ma_check_index(info,inx)) < 0) if ((inx = _ma_check_index(info,inx)) < 0)
...@@ -92,8 +92,20 @@ int maria_rnext(MARIA_HA *info, uchar *buf, int inx) ...@@ -92,8 +92,20 @@ int maria_rnext(MARIA_HA *info, uchar *buf, int inx)
if (!error) if (!error)
{ {
while (!(*share->row_is_visible)(info) || while (!(*share->row_is_visible)(info) ||
((icp_res= ma_check_index_cond(info, inx, buf)) == 0)) ((icp_res= ma_check_index_cond(info, inx, buf)) == ICP_NO_MATCH))
{ {
/*
If we are at the last key on the key page, allow writers to
access the index.
*/
if (info->int_keypos >= info->int_maxpos &&
ma_yield_and_check_if_killed(info, inx))
{
/* my_errno is set by ma_yield_and_check_if_killed() */
error= 1;
break;
}
/* Skip rows inserted by other threads since we got a lock */ /* Skip rows inserted by other threads since we got a lock */
if ((error= _ma_search_next(info, &info->last_key, if ((error= _ma_search_next(info, &info->last_key,
SEARCH_BIGGER, SEARCH_BIGGER,
...@@ -108,16 +120,15 @@ int maria_rnext(MARIA_HA *info, uchar *buf, int inx) ...@@ -108,16 +120,15 @@ int maria_rnext(MARIA_HA *info, uchar *buf, int inx)
info->update&= (HA_STATE_CHANGED | HA_STATE_ROW_CHANGED); info->update&= (HA_STATE_CHANGED | HA_STATE_ROW_CHANGED);
info->update|= HA_STATE_NEXT_FOUND; info->update|= HA_STATE_NEXT_FOUND;
if (icp_res == 2) if (error || icp_res != ICP_MATCH)
my_errno=HA_ERR_END_OF_FILE; /* got beyond the end of scanned range */
if (error || icp_res != 1)
{ {
fast_ma_writeinfo(info);
if (my_errno == HA_ERR_KEY_NOT_FOUND) if (my_errno == HA_ERR_KEY_NOT_FOUND)
my_errno=HA_ERR_END_OF_FILE; my_errno= HA_ERR_END_OF_FILE;
} }
else if (!buf) else if (!buf)
{ {
fast_ma_writeinfo(info);
DBUG_RETURN(info->cur_row.lastpos == HA_OFFSET_ERROR ? my_errno : 0); DBUG_RETURN(info->cur_row.lastpos == HA_OFFSET_ERROR ? my_errno : 0);
} }
else if (!(*info->read_record)(info, buf, info->cur_row.lastpos)) else if (!(*info->read_record)(info, buf, info->cur_row.lastpos))
......
...@@ -30,7 +30,7 @@ int maria_rnext_same(MARIA_HA *info, uchar *buf) ...@@ -30,7 +30,7 @@ int maria_rnext_same(MARIA_HA *info, uchar *buf)
int error; int error;
uint inx,not_used[2]; uint inx,not_used[2];
MARIA_KEYDEF *keyinfo; MARIA_KEYDEF *keyinfo;
int icp_res= 1; ICP_RESULT icp_res= ICP_MATCH;
DBUG_ENTER("maria_rnext_same"); DBUG_ENTER("maria_rnext_same");
if ((int) (inx= info->lastinx) < 0 || if ((int) (inx= info->lastinx) < 0 ||
...@@ -80,9 +80,19 @@ int maria_rnext_same(MARIA_HA *info, uchar *buf) ...@@ -80,9 +80,19 @@ int maria_rnext_same(MARIA_HA *info, uchar *buf)
info->cur_row.lastpos= HA_OFFSET_ERROR; info->cur_row.lastpos= HA_OFFSET_ERROR;
break; break;
} }
/*
If we are at the last key on the key page, allow writers to
access the index.
*/
if (info->int_keypos >= info->int_maxpos &&
ma_yield_and_check_if_killed(info, inx))
{
error= 1;
break;
}
/* Skip rows that are inserted by other threads since we got a lock */ /* Skip rows that are inserted by other threads since we got a lock */
if ((info->s->row_is_visible)(info) && if ((info->s->row_is_visible)(info) &&
((icp_res= ma_check_index_cond(info, inx, buf)) != 0)) ((icp_res= ma_check_index_cond(info, inx, buf)) != ICP_NO_MATCH))
break; break;
} }
} }
...@@ -92,16 +102,15 @@ int maria_rnext_same(MARIA_HA *info, uchar *buf) ...@@ -92,16 +102,15 @@ int maria_rnext_same(MARIA_HA *info, uchar *buf)
info->update&= (HA_STATE_CHANGED | HA_STATE_ROW_CHANGED); info->update&= (HA_STATE_CHANGED | HA_STATE_ROW_CHANGED);
info->update|= HA_STATE_NEXT_FOUND | HA_STATE_RNEXT_SAME; info->update|= HA_STATE_NEXT_FOUND | HA_STATE_RNEXT_SAME;
if (icp_res == 2) if (error || icp_res != ICP_MATCH)
my_errno=HA_ERR_END_OF_FILE; /* got beyond the end of scanned range */
if (error || icp_res != 1)
{ {
fast_ma_writeinfo(info);
if (my_errno == HA_ERR_KEY_NOT_FOUND) if (my_errno == HA_ERR_KEY_NOT_FOUND)
my_errno=HA_ERR_END_OF_FILE; my_errno= HA_ERR_END_OF_FILE;
} }
else if (!buf) else if (!buf)
{ {
fast_ma_writeinfo(info);
DBUG_RETURN(info->cur_row.lastpos == HA_OFFSET_ERROR ? my_errno : 0); DBUG_RETURN(info->cur_row.lastpos == HA_OFFSET_ERROR ? my_errno : 0);
} }
else if (!(*info->read_record)(info, buf, info->cur_row.lastpos)) else if (!(*info->read_record)(info, buf, info->cur_row.lastpos))
......
...@@ -28,6 +28,7 @@ int maria_rprev(MARIA_HA *info, uchar *buf, int inx) ...@@ -28,6 +28,7 @@ int maria_rprev(MARIA_HA *info, uchar *buf, int inx)
register uint flag; register uint flag;
MARIA_SHARE *share= info->s; MARIA_SHARE *share= info->s;
MARIA_KEYDEF *keyinfo; MARIA_KEYDEF *keyinfo;
ICP_RESULT icp_res= ICP_MATCH;
DBUG_ENTER("maria_rprev"); DBUG_ENTER("maria_rprev");
if ((inx = _ma_check_index(info,inx)) < 0) if ((inx = _ma_check_index(info,inx)) < 0)
...@@ -55,8 +56,24 @@ int maria_rprev(MARIA_HA *info, uchar *buf, int inx) ...@@ -55,8 +56,24 @@ int maria_rprev(MARIA_HA *info, uchar *buf, int inx)
if (!error) if (!error)
{ {
while (!(*share->row_is_visible)(info)) my_off_t cur_keypage= info->last_keypage;
while (!(*share->row_is_visible)(info) ||
((icp_res= ma_check_index_cond(info, inx, buf)) == ICP_NO_MATCH))
{ {
/*
If we are at the last (i.e. first?) key on the key page,
allow writers to access the index.
*/
if (info->last_keypage != cur_keypage)
{
cur_keypage= info->last_keypage;
if (ma_yield_and_check_if_killed(info, inx))
{
error= 1;
break;
}
}
/* Skip rows that are inserted by other threads since we got a lock */ /* Skip rows that are inserted by other threads since we got a lock */
if ((error= _ma_search_next(info, &info->last_key, if ((error= _ma_search_next(info, &info->last_key,
SEARCH_SMALLER, SEARCH_SMALLER,
...@@ -68,13 +85,16 @@ int maria_rprev(MARIA_HA *info, uchar *buf, int inx) ...@@ -68,13 +85,16 @@ int maria_rprev(MARIA_HA *info, uchar *buf, int inx)
rw_unlock(&keyinfo->root_lock); rw_unlock(&keyinfo->root_lock);
info->update&= (HA_STATE_CHANGED | HA_STATE_ROW_CHANGED); info->update&= (HA_STATE_CHANGED | HA_STATE_ROW_CHANGED);
info->update|= HA_STATE_PREV_FOUND; info->update|= HA_STATE_PREV_FOUND;
if (error)
if (error || icp_res != ICP_MATCH)
{ {
fast_ma_writeinfo(info);
if (my_errno == HA_ERR_KEY_NOT_FOUND) if (my_errno == HA_ERR_KEY_NOT_FOUND)
my_errno=HA_ERR_END_OF_FILE; my_errno= HA_ERR_END_OF_FILE;
} }
else if (!buf) else if (!buf)
{ {
fast_ma_writeinfo(info);
DBUG_RETURN(info->cur_row.lastpos == HA_OFFSET_ERROR ? my_errno : 0); DBUG_RETURN(info->cur_row.lastpos == HA_OFFSET_ERROR ? my_errno : 0);
} }
else if (!(*info->read_record)(info, buf, info->cur_row.lastpos)) else if (!(*info->read_record)(info, buf, info->cur_row.lastpos))
......
...@@ -141,7 +141,11 @@ static int _ma_search_no_save(register MARIA_HA *info, MARIA_KEY *key, ...@@ -141,7 +141,11 @@ static int _ma_search_no_save(register MARIA_HA *info, MARIA_KEY *key,
flag= (*keyinfo->bin_search)(key, &page, nextflag, &keypos, lastkey, flag= (*keyinfo->bin_search)(key, &page, nextflag, &keypos, lastkey,
&last_key_not_used); &last_key_not_used);
if (flag == MARIA_FOUND_WRONG_KEY) if (flag == MARIA_FOUND_WRONG_KEY)
DBUG_RETURN(-1); {
maria_print_error(info->s, HA_ERR_CRASHED);
my_errno= HA_ERR_CRASHED;
goto err;
}
page_flag= page.flag; page_flag= page.flag;
used_length= page.size; used_length= page.size;
nod_flag= page.node; nod_flag= page.node;
......
...@@ -107,3 +107,6 @@ static int always_valid(const char *filename __attribute__((unused))) ...@@ -107,3 +107,6 @@ static int always_valid(const char *filename __attribute__((unused)))
} }
int (*maria_test_invalid_symlink)(const char *filename)= always_valid; int (*maria_test_invalid_symlink)(const char *filename)= always_valid;
my_bool (*ma_killed)(MARIA_HA *)= ma_killed_standalone;
...@@ -492,7 +492,6 @@ struct st_maria_handler ...@@ -492,7 +492,6 @@ struct st_maria_handler
{ {
MARIA_SHARE *s; /* Shared between open:s */ MARIA_SHARE *s; /* Shared between open:s */
struct st_ma_transaction *trn; /* Pointer to active transaction */ struct st_ma_transaction *trn; /* Pointer to active transaction */
void *external_ptr; /* Pointer to THD in mysql */
MARIA_STATUS_INFO *state, state_save; MARIA_STATUS_INFO *state, state_save;
MARIA_STATUS_INFO *state_start; /* State at start of transaction */ MARIA_STATUS_INFO *state_start; /* State at start of transaction */
MARIA_ROW cur_row; /* The active row that we just read */ MARIA_ROW cur_row; /* The active row that we just read */
...@@ -509,6 +508,7 @@ struct st_maria_handler ...@@ -509,6 +508,7 @@ struct st_maria_handler
DYNAMIC_ARRAY *ft1_to_ft2; /* used only in ft1->ft2 conversion */ DYNAMIC_ARRAY *ft1_to_ft2; /* used only in ft1->ft2 conversion */
MEM_ROOT ft_memroot; /* used by the parser */ MEM_ROOT ft_memroot; /* used by the parser */
MYSQL_FTPARSER_PARAM *ftparser_param; /* share info between init/deinit */ MYSQL_FTPARSER_PARAM *ftparser_param; /* share info between init/deinit */
void *external_ref; /* For MariaDB TABLE */
uchar *buff; /* page buffer */ uchar *buff; /* page buffer */
uchar *keyread_buff; /* Buffer for last key read */ uchar *keyread_buff; /* Buffer for last key read */
uchar *lastkey_buff; /* Last used search key */ uchar *lastkey_buff; /* Last used search key */
...@@ -813,6 +813,7 @@ extern my_bool maria_inited, maria_in_ha_maria, maria_recovery_changed_data; ...@@ -813,6 +813,7 @@ extern my_bool maria_inited, maria_in_ha_maria, maria_recovery_changed_data;
extern my_bool maria_recovery_verbose; extern my_bool maria_recovery_verbose;
extern HASH maria_stored_state; extern HASH maria_stored_state;
extern int (*maria_create_trn_hook)(MARIA_HA *); extern int (*maria_create_trn_hook)(MARIA_HA *);
extern my_bool (*ma_killed)(MARIA_HA *);
/* This is used by _ma_calc_xxx_key_length och _ma_store_key */ /* This is used by _ma_calc_xxx_key_length och _ma_store_key */
typedef struct st_maria_s_param typedef struct st_maria_s_param
...@@ -1281,4 +1282,8 @@ extern my_bool maria_flush_log_for_page_none(uchar *page, ...@@ -1281,4 +1282,8 @@ extern my_bool maria_flush_log_for_page_none(uchar *page,
extern PAGECACHE *maria_log_pagecache; extern PAGECACHE *maria_log_pagecache;
extern void ma_set_index_cond_func(MARIA_HA *info, index_cond_func_t func, extern void ma_set_index_cond_func(MARIA_HA *info, index_cond_func_t func,
void *func_arg); void *func_arg);
int ma_check_index_cond(register MARIA_HA *info, uint keynr, uchar *record); ICP_RESULT ma_check_index_cond(register MARIA_HA *info, uint keynr, uchar *record);
extern my_bool ma_yield_and_check_if_killed(MARIA_HA *info, int inx);
extern my_bool ma_killed_standalone(MARIA_HA *);
...@@ -538,6 +538,13 @@ void mi_check_print_warning(HA_CHECK *param, const char *fmt,...) ...@@ -538,6 +538,13 @@ void mi_check_print_warning(HA_CHECK *param, const char *fmt,...)
va_end(args); va_end(args);
} }
/* Return 1 if user have requested query to be killed */
my_bool mi_killed_in_mariadb(MI_INFO *info)
{
return (((TABLE*) (info->external_ref))->in_use->killed != 0);
}
} }
...@@ -699,6 +706,8 @@ int ha_myisam::open(const char *name, int mode, uint test_if_locked) ...@@ -699,6 +706,8 @@ int ha_myisam::open(const char *name, int mode, uint test_if_locked)
return (my_errno ? my_errno : -1); return (my_errno ? my_errno : -1);
file->s->chst_invalidator= query_cache_invalidate_by_MyISAM_filename_ref; file->s->chst_invalidator= query_cache_invalidate_by_MyISAM_filename_ref;
/* Set external_ref, mainly for temporary tables */
file->external_ref= (void*) table; // For mi_killed()
if (!table->s->tmp_table) /* No need to perform a check for tmp table */ if (!table->s->tmp_table) /* No need to perform a check for tmp table */
{ {
...@@ -1971,6 +1980,7 @@ int ha_myisam::delete_table(const char *name) ...@@ -1971,6 +1980,7 @@ int ha_myisam::delete_table(const char *name)
int ha_myisam::external_lock(THD *thd, int lock_type) int ha_myisam::external_lock(THD *thd, int lock_type)
{ {
file->external_ref= (void*) table; // For mi_killed()
return mi_lock_database(file, !table->s->tmp_table ? return mi_lock_database(file, !table->s->tmp_table ?
lock_type : ((lock_type == F_UNLCK) ? lock_type : ((lock_type == F_UNLCK) ?
F_UNLCK : F_EXTRA_LCK)); F_UNLCK : F_EXTRA_LCK));
...@@ -2219,6 +2229,7 @@ static int myisam_init(void *p) ...@@ -2219,6 +2229,7 @@ static int myisam_init(void *p)
myisam_hton->create= myisam_create_handler; myisam_hton->create= myisam_create_handler;
myisam_hton->panic= myisam_panic; myisam_hton->panic= myisam_panic;
myisam_hton->flags= HTON_CAN_RECREATE | HTON_SUPPORT_LOG_TABLES; myisam_hton->flags= HTON_CAN_RECREATE | HTON_SUPPORT_LOG_TABLES;
mi_killed= mi_killed_in_mariadb;
return 0; return 0;
} }
......
...@@ -473,3 +473,8 @@ int mi_reset(MI_INFO *info) ...@@ -473,3 +473,8 @@ int mi_reset(MI_INFO *info)
HA_STATE_PREV_FOUND); HA_STATE_PREV_FOUND);
DBUG_RETURN(error); DBUG_RETURN(error);
} }
my_bool mi_killed_standalone(MI_INFO *info __attribute__((unused)))
{
return 0;
}
...@@ -510,15 +510,26 @@ int _mi_read_key_record(MI_INFO *info, my_off_t filepos, uchar *buf) ...@@ -510,15 +510,26 @@ int _mi_read_key_record(MI_INFO *info, my_off_t filepos, uchar *buf)
ICP_OUT_OF_RANGE Index condition is not satisfied, end the scan. ICP_OUT_OF_RANGE Index condition is not satisfied, end the scan.
*/ */
int mi_check_index_cond(register MI_INFO *info, uint keynr, uchar *record) ICP_RESULT mi_check_index_cond(register MI_INFO *info, uint keynr,
uchar *record)
{ {
ICP_RESULT res;
if (_mi_put_key_in_record(info, keynr, FALSE, record)) if (_mi_put_key_in_record(info, keynr, FALSE, record))
{ {
/* Impossible case; Can only happen if bug in code */
mi_print_error(info->s, HA_ERR_CRASHED); mi_print_error(info->s, HA_ERR_CRASHED);
my_errno=HA_ERR_CRASHED; info->lastpos= HA_OFFSET_ERROR; /* No active record */
return ICP_ERROR; my_errno= HA_ERR_CRASHED;
res= ICP_ERROR;
} }
return info->index_cond_func(info->index_cond_func_arg); else if ((res= info->index_cond_func(info->index_cond_func_arg)) ==
ICP_OUT_OF_RANGE)
{
/* We got beyond the end of scanned range */
info->lastpos= HA_OFFSET_ERROR; /* No active record */
my_errno= HA_ERR_END_OF_FILE;
}
return res;
} }
/* /*
......
...@@ -88,6 +88,7 @@ int mi_rkey(MI_INFO *info, uchar *buf, int inx, const uchar *key, ...@@ -88,6 +88,7 @@ int mi_rkey(MI_INFO *info, uchar *buf, int inx, const uchar *key,
my_errno=HA_ERR_CRASHED; my_errno=HA_ERR_CRASHED;
if (share->concurrent_insert) if (share->concurrent_insert)
rw_unlock(&share->key_root_lock[inx]); rw_unlock(&share->key_root_lock[inx]);
fast_mi_writeinfo(info);
goto err; goto err;
} }
break; break;
...@@ -131,7 +132,10 @@ int mi_rkey(MI_INFO *info, uchar *buf, int inx, const uchar *key, ...@@ -131,7 +132,10 @@ int mi_rkey(MI_INFO *info, uchar *buf, int inx, const uchar *key,
info->lastkey_length, info->lastkey_length,
myisam_readnext_vec[search_flag], myisam_readnext_vec[search_flag],
info->s->state.key_root[inx])) info->s->state.key_root[inx]))
{
info->lastpos= HA_OFFSET_ERROR;
break; break;
}
/* /*
Check that the found key does still match the search. Check that the found key does still match the search.
_mi_search_next() delivers the next key regardless of its _mi_search_next() delivers the next key regardless of its
...@@ -145,13 +149,22 @@ int mi_rkey(MI_INFO *info, uchar *buf, int inx, const uchar *key, ...@@ -145,13 +149,22 @@ int mi_rkey(MI_INFO *info, uchar *buf, int inx, const uchar *key,
info->lastpos= HA_OFFSET_ERROR; info->lastpos= HA_OFFSET_ERROR;
break; break;
} }
/*
If we are at the last key on the key page, allow writers to
access the index.
*/
if (info->int_keypos >= info->int_maxpos &&
mi_yield_and_check_if_killed(info, inx))
{
/* Aborted by user */
buf= 0; /* Fast abort */
}
} }
if (res == ICP_OUT_OF_RANGE) if (res == ICP_OUT_OF_RANGE)
{ {
info->lastpos= HA_OFFSET_ERROR; /* Change error from HA_ERR_END_OF_FILE */
if (share->concurrent_insert) DBUG_ASSERT(info->lastpos == HA_OFFSET_ERROR);
rw_unlock(&share->key_root_lock[inx]); my_errno= HA_ERR_KEY_NOT_FOUND;
DBUG_RETURN((my_errno= HA_ERR_KEY_NOT_FOUND));
} }
/* /*
Error if no row found within the data file. (Bug #29838) Error if no row found within the data file. (Bug #29838)
...@@ -164,29 +177,43 @@ int mi_rkey(MI_INFO *info, uchar *buf, int inx, const uchar *key, ...@@ -164,29 +177,43 @@ int mi_rkey(MI_INFO *info, uchar *buf, int inx, const uchar *key,
my_errno= HA_ERR_KEY_NOT_FOUND; my_errno= HA_ERR_KEY_NOT_FOUND;
} }
} }
else
{
DBUG_ASSERT(info->lastpos= HA_OFFSET_ERROR);
}
} }
if (share->concurrent_insert) if (share->concurrent_insert)
rw_unlock(&share->key_root_lock[inx]); rw_unlock(&share->key_root_lock[inx]);
/* Calculate length of the found key; Used by mi_rnext_same */ info->last_rkey_length= pack_key_length;
if ((keyinfo->flag & HA_VAR_LENGTH_KEY) && last_used_keyseg &&
info->lastpos != HA_OFFSET_ERROR)
info->last_rkey_length= _mi_keylength_part(keyinfo, info->lastkey,
last_used_keyseg);
else
info->last_rkey_length= pack_key_length;
/* Check if we don't want to have record back, only error message */
if (!buf)
DBUG_RETURN(info->lastpos == HA_OFFSET_ERROR ? my_errno : 0);
if (!(*info->read_record)(info,info->lastpos,buf)) if (info->lastpos == HA_OFFSET_ERROR) /* No such record */
{ {
info->update|= HA_STATE_AKTIV; /* Record is read */ fast_mi_writeinfo(info);
DBUG_RETURN(0); if (!buf)
DBUG_RETURN(my_errno);
} }
else
{
/* Calculate length of the found key; Used by mi_rnext_same */
if ((keyinfo->flag & HA_VAR_LENGTH_KEY) && last_used_keyseg)
info->last_rkey_length= _mi_keylength_part(keyinfo, info->lastkey,
last_used_keyseg);
info->lastpos = HA_OFFSET_ERROR; /* Didn't find key */ /* Check if we don't want to have record back, only error message */
if (!buf)
{
fast_mi_writeinfo(info);
DBUG_RETURN(0);
}
if (!(*info->read_record)(info,info->lastpos,buf))
{
info->update|= HA_STATE_AKTIV; /* Record is read */
DBUG_RETURN(0);
}
DBUG_PRINT("error", ("Didn't find row. Error %d", my_errno));
info->lastpos= HA_OFFSET_ERROR; /* Didn't find row */
}
/* Store last used key as a base for read next */ /* Store last used key as a base for read next */
memcpy(info->lastkey,key_buff,pack_key_length); memcpy(info->lastkey,key_buff,pack_key_length);
...@@ -199,3 +226,32 @@ int mi_rkey(MI_INFO *info, uchar *buf, int inx, const uchar *key, ...@@ -199,3 +226,32 @@ int mi_rkey(MI_INFO *info, uchar *buf, int inx, const uchar *key,
err: err:
DBUG_RETURN(my_errno); DBUG_RETURN(my_errno);
} /* _mi_rkey */ } /* _mi_rkey */
/*
Yield to possible other writers during a index scan.
Check also if we got killed by the user and if yes, return
HA_ERR_LOCK_WAIT_TIMEOUT
return 0 ok
return 1 Query has been requested to be killed
*/
my_bool mi_yield_and_check_if_killed(MI_INFO *info, int inx)
{
MYISAM_SHARE *share;
if (mi_killed(info))
{
info->lastpos= HA_OFFSET_ERROR;
my_errno= HA_ERR_KEY_NOT_FOUND;
return 1;
}
if ((share= info->s)->concurrent_insert)
{
/* Give writers a chance to access index */
rw_unlock(&share->key_root_lock[inx]);
rw_rdlock(&share->key_root_lock[inx]);
}
return 0;
}
...@@ -28,7 +28,7 @@ int mi_rnext(MI_INFO *info, uchar *buf, int inx) ...@@ -28,7 +28,7 @@ int mi_rnext(MI_INFO *info, uchar *buf, int inx)
{ {
int error,changed; int error,changed;
uint flag; uint flag;
ICP_RESULT res= 0; ICP_RESULT icp_res= ICP_MATCH;
uint update_mask= HA_STATE_NEXT_FOUND; uint update_mask= HA_STATE_NEXT_FOUND;
DBUG_ENTER("mi_rnext"); DBUG_ENTER("mi_rnext");
...@@ -102,8 +102,19 @@ int mi_rnext(MI_INFO *info, uchar *buf, int inx) ...@@ -102,8 +102,19 @@ int mi_rnext(MI_INFO *info, uchar *buf, int inx)
while ((info->s->concurrent_insert && while ((info->s->concurrent_insert &&
info->lastpos >= info->state->data_file_length) || info->lastpos >= info->state->data_file_length) ||
(info->index_cond_func && (info->index_cond_func &&
(res= mi_check_index_cond(info, inx, buf)) == ICP_NO_MATCH)) (icp_res= mi_check_index_cond(info, inx, buf)) == ICP_NO_MATCH))
{ {
/*
If we are at the last key on the key page, allow writers to
access the index.
*/
if (info->int_keypos >= info->int_maxpos &&
mi_yield_and_check_if_killed(info, inx))
{
error= 1;
break;
}
/* /*
Skip rows that are either inserted by other threads since Skip rows that are either inserted by other threads since
we got a lock or do not match pushed index conditions we got a lock or do not match pushed index conditions
...@@ -115,13 +126,6 @@ int mi_rnext(MI_INFO *info, uchar *buf, int inx) ...@@ -115,13 +126,6 @@ int mi_rnext(MI_INFO *info, uchar *buf, int inx)
info->s->state.key_root[inx]))) info->s->state.key_root[inx])))
break; break;
} }
if (!error && res == ICP_OUT_OF_RANGE)
{
if (info->s->concurrent_insert)
rw_unlock(&info->s->key_root_lock[inx]);
info->lastpos= HA_OFFSET_ERROR;
DBUG_RETURN(my_errno= HA_ERR_END_OF_FILE);
}
} }
if (info->s->concurrent_insert) if (info->s->concurrent_insert)
...@@ -131,13 +135,15 @@ int mi_rnext(MI_INFO *info, uchar *buf, int inx) ...@@ -131,13 +135,15 @@ int mi_rnext(MI_INFO *info, uchar *buf, int inx)
info->update&= (HA_STATE_CHANGED | HA_STATE_ROW_CHANGED); info->update&= (HA_STATE_CHANGED | HA_STATE_ROW_CHANGED);
info->update|= update_mask; info->update|= update_mask;
if (error) if (error || icp_res != ICP_MATCH)
{ {
fast_mi_writeinfo(info);
if (my_errno == HA_ERR_KEY_NOT_FOUND) if (my_errno == HA_ERR_KEY_NOT_FOUND)
my_errno=HA_ERR_END_OF_FILE; my_errno=HA_ERR_END_OF_FILE;
} }
else if (!buf) else if (!buf)
{ {
fast_mi_writeinfo(info);
DBUG_RETURN(info->lastpos==HA_OFFSET_ERROR ? my_errno : 0); DBUG_RETURN(info->lastpos==HA_OFFSET_ERROR ? my_errno : 0);
} }
else if (!(*info->read_record)(info,info->lastpos,buf)) else if (!(*info->read_record)(info,info->lastpos,buf))
......
...@@ -29,6 +29,7 @@ int mi_rnext_same(MI_INFO *info, uchar *buf) ...@@ -29,6 +29,7 @@ int mi_rnext_same(MI_INFO *info, uchar *buf)
int error; int error;
uint inx,not_used[2]; uint inx,not_used[2];
MI_KEYDEF *keyinfo; MI_KEYDEF *keyinfo;
ICP_RESULT icp_res= ICP_MATCH;
DBUG_ENTER("mi_rnext_same"); DBUG_ENTER("mi_rnext_same");
if ((int) (inx=info->lastinx) < 0 || info->lastpos == HA_OFFSET_ERROR) if ((int) (inx=info->lastinx) < 0 || info->lastpos == HA_OFFSET_ERROR)
...@@ -63,6 +64,17 @@ int mi_rnext_same(MI_INFO *info, uchar *buf) ...@@ -63,6 +64,17 @@ int mi_rnext_same(MI_INFO *info, uchar *buf)
} }
for (;;) for (;;)
{ {
/*
If we are at the last key on the key page, allow writers to
access the index.
*/
if (info->int_keypos >= info->int_maxpos &&
mi_yield_and_check_if_killed(info, inx))
{
error=1;
break;
}
if ((error=_mi_search_next(info,keyinfo,info->lastkey, if ((error=_mi_search_next(info,keyinfo,info->lastkey,
info->lastkey_length,SEARCH_BIGGER, info->lastkey_length,SEARCH_BIGGER,
info->s->state.key_root[inx]))) info->s->state.key_root[inx])))
...@@ -78,26 +90,30 @@ int mi_rnext_same(MI_INFO *info, uchar *buf) ...@@ -78,26 +90,30 @@ int mi_rnext_same(MI_INFO *info, uchar *buf)
/* /*
Skip Skip
- rows that are inserted by other threads since we got a lock - rows that are inserted by other threads since we got a lock
- rows that don't match index condition */ - rows that don't match index condition
*/
if (info->lastpos < info->state->data_file_length && if (info->lastpos < info->state->data_file_length &&
(!info->index_cond_func || (!info->index_cond_func ||
mi_check_index_cond(info, inx, buf) != ICP_NO_MATCH)) (icp_res= mi_check_index_cond(info, inx, buf)) != ICP_NO_MATCH))
break; break;
} }
} }
if (info->s->concurrent_insert) if (info->s->concurrent_insert)
rw_unlock(&info->s->key_root_lock[inx]); rw_unlock(&info->s->key_root_lock[inx]);
/* Don't clear if database-changed */ /* Don't clear if database-changed */
info->update&= (HA_STATE_CHANGED | HA_STATE_ROW_CHANGED); info->update&= (HA_STATE_CHANGED | HA_STATE_ROW_CHANGED);
info->update|= HA_STATE_NEXT_FOUND | HA_STATE_RNEXT_SAME; info->update|= HA_STATE_NEXT_FOUND | HA_STATE_RNEXT_SAME;
if (error) if (error || icp_res != ICP_MATCH)
{ {
fast_mi_writeinfo(info);
if (my_errno == HA_ERR_KEY_NOT_FOUND) if (my_errno == HA_ERR_KEY_NOT_FOUND)
my_errno=HA_ERR_END_OF_FILE; my_errno=HA_ERR_END_OF_FILE;
} }
else if (!buf) else if (!buf)
{ {
fast_mi_writeinfo(info);
DBUG_RETURN(info->lastpos==HA_OFFSET_ERROR ? my_errno : 0); DBUG_RETURN(info->lastpos==HA_OFFSET_ERROR ? my_errno : 0);
} }
else if (!(*info->read_record)(info,info->lastpos,buf)) else if (!(*info->read_record)(info,info->lastpos,buf))
......
...@@ -27,6 +27,7 @@ int mi_rprev(MI_INFO *info, uchar *buf, int inx) ...@@ -27,6 +27,7 @@ int mi_rprev(MI_INFO *info, uchar *buf, int inx)
int error,changed; int error,changed;
register uint flag; register uint flag;
MYISAM_SHARE *share=info->s; MYISAM_SHARE *share=info->s;
ICP_RESULT icp_res= ICP_MATCH;
DBUG_ENTER("mi_rprev"); DBUG_ENTER("mi_rprev");
if ((inx = _mi_check_index(info,inx)) < 0) if ((inx = _mi_check_index(info,inx)) < 0)
...@@ -53,12 +54,26 @@ int mi_rprev(MI_INFO *info, uchar *buf, int inx) ...@@ -53,12 +54,26 @@ int mi_rprev(MI_INFO *info, uchar *buf, int inx)
if (!error) if (!error)
{ {
int res= 0; my_off_t cur_keypage= info->last_keypage;
while ((share->concurrent_insert && while ((share->concurrent_insert &&
info->lastpos >= info->state->data_file_length) || info->lastpos >= info->state->data_file_length) ||
(info->index_cond_func && (info->index_cond_func &&
!(res= mi_check_index_cond(info, inx, buf)))) (icp_res= mi_check_index_cond(info, inx, buf)) == ICP_NO_MATCH))
{ {
/*
If we are at the last (i.e. first?) key on the key page,
allow writers to access the index.
*/
if (info->last_keypage != cur_keypage)
{
cur_keypage= info->last_keypage;
if (mi_yield_and_check_if_killed(info, inx))
{
error= 1;
break;
}
}
/* /*
Skip rows that are either inserted by other threads since Skip rows that are either inserted by other threads since
we got a lock or do not match pushed index conditions we got a lock or do not match pushed index conditions
...@@ -69,13 +84,6 @@ int mi_rprev(MI_INFO *info, uchar *buf, int inx) ...@@ -69,13 +84,6 @@ int mi_rprev(MI_INFO *info, uchar *buf, int inx)
share->state.key_root[inx]))) share->state.key_root[inx])))
break; break;
} }
if (!error && res == 2)
{
if (share->concurrent_insert)
rw_unlock(&share->key_root_lock[inx]);
info->lastpos= HA_OFFSET_ERROR;
DBUG_RETURN(my_errno= HA_ERR_END_OF_FILE);
}
} }
if (share->concurrent_insert) if (share->concurrent_insert)
...@@ -83,13 +91,16 @@ int mi_rprev(MI_INFO *info, uchar *buf, int inx) ...@@ -83,13 +91,16 @@ int mi_rprev(MI_INFO *info, uchar *buf, int inx)
info->update&= (HA_STATE_CHANGED | HA_STATE_ROW_CHANGED); info->update&= (HA_STATE_CHANGED | HA_STATE_ROW_CHANGED);
info->update|= HA_STATE_PREV_FOUND; info->update|= HA_STATE_PREV_FOUND;
if (error)
if (error || icp_res != ICP_MATCH)
{ {
fast_mi_writeinfo(info);
if (my_errno == HA_ERR_KEY_NOT_FOUND) if (my_errno == HA_ERR_KEY_NOT_FOUND)
my_errno=HA_ERR_END_OF_FILE; my_errno=HA_ERR_END_OF_FILE;
} }
else if (!buf) else if (!buf)
{ {
fast_mi_writeinfo(info);
DBUG_RETURN(info->lastpos==HA_OFFSET_ERROR ? my_errno : 0); DBUG_RETURN(info->lastpos==HA_OFFSET_ERROR ? my_errno : 0);
} }
else if (!(*info->read_record)(info,info->lastpos,buf)) else if (!(*info->read_record)(info,info->lastpos,buf))
......
...@@ -89,7 +89,10 @@ int _mi_search(register MI_INFO *info, register MI_KEYDEF *keyinfo, ...@@ -89,7 +89,10 @@ int _mi_search(register MI_INFO *info, register MI_KEYDEF *keyinfo,
flag=(*keyinfo->bin_search)(info,keyinfo,buff,key,key_len,nextflag, flag=(*keyinfo->bin_search)(info,keyinfo,buff,key,key_len,nextflag,
&keypos,lastkey, &last_key); &keypos,lastkey, &last_key);
if (flag == MI_FOUND_WRONG_KEY) if (flag == MI_FOUND_WRONG_KEY)
DBUG_RETURN(-1); {
my_errno= HA_ERR_CRASHED;
goto err;
}
nod_flag=mi_test_if_nod(buff); nod_flag=mi_test_if_nod(buff);
maxpos=buff+mi_getint(buff)-1; maxpos=buff+mi_getint(buff)-1;
......
...@@ -41,6 +41,7 @@ my_off_t myisam_max_temp_length= MAX_FILE_SIZE; ...@@ -41,6 +41,7 @@ my_off_t myisam_max_temp_length= MAX_FILE_SIZE;
ulong myisam_bulk_insert_tree_size=8192*1024; ulong myisam_bulk_insert_tree_size=8192*1024;
ulong myisam_data_pointer_size=4; ulong myisam_data_pointer_size=4;
ulonglong myisam_mmap_size= SIZE_T_MAX, myisam_mmap_used= 0; ulonglong myisam_mmap_size= SIZE_T_MAX, myisam_mmap_used= 0;
my_bool (*mi_killed)(MI_INFO *)= mi_killed_standalone;
static int always_valid(const char *filename __attribute__((unused))) static int always_valid(const char *filename __attribute__((unused)))
{ {
......
...@@ -248,6 +248,7 @@ struct st_myisam_info ...@@ -248,6 +248,7 @@ struct st_myisam_info
DYNAMIC_ARRAY *ft1_to_ft2; /* used only in ft1->ft2 conversion */ DYNAMIC_ARRAY *ft1_to_ft2; /* used only in ft1->ft2 conversion */
MEM_ROOT ft_memroot; /* used by the parser */ MEM_ROOT ft_memroot; /* used by the parser */
MYSQL_FTPARSER_PARAM *ftparser_param; /* share info between init/deinit */ MYSQL_FTPARSER_PARAM *ftparser_param; /* share info between init/deinit */
void *external_ref; /* For MariaDB TABLE */
char *filename; /* parameter to open filename */ char *filename; /* parameter to open filename */
uchar *buff, /* Temp area for key */ uchar *buff, /* Temp area for key */
*lastkey, *lastkey2; /* Last used search key */ *lastkey, *lastkey2; /* Last used search key */
...@@ -433,6 +434,7 @@ extern uint NEAR myisam_read_vec[], NEAR myisam_readnext_vec[]; ...@@ -433,6 +434,7 @@ extern uint NEAR myisam_read_vec[], NEAR myisam_readnext_vec[];
extern uint myisam_quick_table_bits; extern uint myisam_quick_table_bits;
extern File myisam_log_file; extern File myisam_log_file;
extern ulong myisam_pid; extern ulong myisam_pid;
extern my_bool (*mi_killed)(MI_INFO *);
/* This is used by _mi_calc_xxx_key_length och _mi_store_key */ /* This is used by _mi_calc_xxx_key_length och _mi_store_key */
...@@ -593,6 +595,8 @@ extern ulonglong mi_safe_mul(ulonglong a, ulonglong b); ...@@ -593,6 +595,8 @@ extern ulonglong mi_safe_mul(ulonglong a, ulonglong b);
extern int _mi_ft_update(MI_INFO *info, uint keynr, uchar *keybuf, extern int _mi_ft_update(MI_INFO *info, uint keynr, uchar *keybuf,
const uchar *oldrec, const uchar *newrec, const uchar *oldrec, const uchar *newrec,
my_off_t pos); my_off_t pos);
extern my_bool mi_yield_and_check_if_killed(MI_INFO *info, int inx);
extern my_bool mi_killed_standalone(MI_INFO *);
struct st_sort_info; struct st_sort_info;
...@@ -729,7 +733,7 @@ my_bool mi_dynmap_file(MI_INFO *info, my_off_t size); ...@@ -729,7 +733,7 @@ my_bool mi_dynmap_file(MI_INFO *info, my_off_t size);
int mi_munmap_file(MI_INFO *info); int mi_munmap_file(MI_INFO *info);
void mi_remap_file(MI_INFO *info, my_off_t size); void mi_remap_file(MI_INFO *info, my_off_t size);
int mi_check_index_cond(register MI_INFO *info, uint keynr, uchar *record); ICP_RESULT mi_check_index_cond(register MI_INFO *info, uint keynr, uchar *record);
/* Functions needed by mi_check */ /* Functions needed by mi_check */
int killed_ptr(HA_CHECK *param); int killed_ptr(HA_CHECK *param);
void mi_check_print_error _VARARGS((HA_CHECK *param, const char *fmt, ...)); void mi_check_print_error _VARARGS((HA_CHECK *param, const char *fmt, ...));
......
...@@ -126,7 +126,7 @@ static pthread_mutex_t commit_cond_m; ...@@ -126,7 +126,7 @@ static pthread_mutex_t commit_cond_m;
static bool innodb_inited = 0; static bool innodb_inited = 0;
C_MODE_START C_MODE_START
static int index_cond_func_innodb(void *arg); static xtradb_icp_result_t index_cond_func_innodb(void *arg);
C_MODE_END C_MODE_END
...@@ -853,6 +853,9 @@ convert_error_code_to_mysql( ...@@ -853,6 +853,9 @@ convert_error_code_to_mysql(
case DB_RECORD_NOT_FOUND: case DB_RECORD_NOT_FOUND:
return(HA_ERR_NO_ACTIVE_RECORD); return(HA_ERR_NO_ACTIVE_RECORD);
case DB_SEARCH_ABORTED_BY_USER:
return(HA_ERR_ABORTED_BY_USER);
case DB_DEADLOCK: case DB_DEADLOCK:
/* Since we rolled back the whole transaction, we must /* Since we rolled back the whole transaction, we must
tell it also to MySQL so that MySQL knows to empty the tell it also to MySQL so that MySQL knows to empty the
...@@ -12082,6 +12085,14 @@ ha_rows ha_innobase::multi_range_read_info(uint keyno, uint n_ranges, uint keys, ...@@ -12082,6 +12085,14 @@ ha_rows ha_innobase::multi_range_read_info(uint keyno, uint n_ranges, uint keys,
} }
/*
A helper function used only in index_cond_func_innodb
*/
bool ha_innobase::is_thd_killed()
{
return test(user_thd->killed);
}
/** /**
* Index Condition Pushdown interface implementation * Index Condition Pushdown interface implementation
...@@ -12094,15 +12105,18 @@ C_MODE_START ...@@ -12094,15 +12105,18 @@ C_MODE_START
See note on ICP_RESULT for return values description. See note on ICP_RESULT for return values description.
*/ */
static int index_cond_func_innodb(void *arg) static xtradb_icp_result_t index_cond_func_innodb(void *arg)
{ {
ha_innobase *h= (ha_innobase*)arg; ha_innobase *h= (ha_innobase*)arg;
if (h->is_thd_killed())
return XTRADB_ICP_ERROR;
if (h->end_range) if (h->end_range)
{ {
if (h->compare_key2(h->end_range) > 0) if (h->compare_key2(h->end_range) > 0)
return ICP_OUT_OF_RANGE; /* caller should return HA_ERR_END_OF_FILE already */ return XTRADB_ICP_OUT_OF_RANGE; /* caller should return HA_ERR_END_OF_FILE already */
} }
return h->pushed_idx_cond->val_int()? ICP_MATCH : ICP_NO_MATCH; return h->pushed_idx_cond->val_int()? XTRADB_ICP_MATCH : XTRADB_ICP_NO_MATCH;
} }
C_MODE_END C_MODE_END
......
...@@ -240,6 +240,9 @@ class ha_innobase: public handler ...@@ -240,6 +240,9 @@ class ha_innobase: public handler
DsMrr_impl ds_mrr; DsMrr_impl ds_mrr;
Item *idx_cond_push(uint keyno, Item* idx_cond); Item *idx_cond_push(uint keyno, Item* idx_cond);
/* An helper function for index_cond_func_innodb: */
bool is_thd_killed();
}; };
/* Some accessor functions which the InnoDB plugin needs, but which /* Some accessor functions which the InnoDB plugin needs, but which
......
...@@ -105,7 +105,8 @@ enum db_err { ...@@ -105,7 +105,8 @@ enum db_err {
DB_STRONG_FAIL, DB_STRONG_FAIL,
DB_ZIP_OVERFLOW, DB_ZIP_OVERFLOW,
DB_RECORD_NOT_FOUND = 1500, DB_RECORD_NOT_FOUND = 1500,
DB_END_OF_INDEX DB_END_OF_INDEX,
DB_SEARCH_ABORTED_BY_USER= 1533
}; };
#endif #endif
...@@ -577,7 +577,16 @@ struct mysql_row_templ_struct { ...@@ -577,7 +577,16 @@ struct mysql_row_templ_struct {
#define ROW_PREBUILT_ALLOCATED 78540783 #define ROW_PREBUILT_ALLOCATED 78540783
#define ROW_PREBUILT_FREED 26423527 #define ROW_PREBUILT_FREED 26423527
typedef int (*index_cond_func_t)(void *param);
typedef enum xtradb_icp_result {
XTRADB_ICP_ERROR=-1,
XTRADB_ICP_NO_MATCH=0,
XTRADB_ICP_MATCH=1,
XTRADB_ICP_OUT_OF_RANGE=2,
XTRADB_ICP_ABORTED_BY_USER=3,
} xtradb_icp_result_t;
typedef xtradb_icp_result_t (*index_cond_func_t)(void *param);
/** A struct for (sometimes lazily) prebuilt structures in an Innobase table /** A struct for (sometimes lazily) prebuilt structures in an Innobase table
handle used within MySQL; these are used to save CPU time. */ handle used within MySQL; these are used to save CPU time. */
......
...@@ -3348,7 +3348,8 @@ and fetch prev. NOTE that if we do a search with a full key value ...@@ -3348,7 +3348,8 @@ and fetch prev. NOTE that if we do a search with a full key value
from a unique index (ROW_SEL_EXACT), then we will not store the cursor from a unique index (ROW_SEL_EXACT), then we will not store the cursor
position and fetch next or fetch prev must not be tried to the cursor! position and fetch next or fetch prev must not be tried to the cursor!
@return DB_SUCCESS, DB_RECORD_NOT_FOUND, DB_END_OF_INDEX, DB_DEADLOCK, @return DB_SUCCESS, DB_RECORD_NOT_FOUND, DB_END_OF_INDEX, DB_DEADLOCK,
DB_LOCK_TABLE_FULL, DB_CORRUPTION, or DB_TOO_BIG_RECORD */ DB_LOCK_TABLE_FULL, DB_CORRUPTION, DB_SEARCH_ABORTED_BY_USER or
DB_TOO_BIG_RECORD */
UNIV_INTERN UNIV_INTERN
ulint ulint
row_search_for_mysql( row_search_for_mysql(
...@@ -4396,12 +4397,15 @@ row_search_for_mysql( ...@@ -4396,12 +4397,15 @@ row_search_for_mysql(
*/ */
ut_ad(ib_res); ut_ad(ib_res);
res= prebuilt->idx_cond_func(prebuilt->idx_cond_func_arg); res= prebuilt->idx_cond_func(prebuilt->idx_cond_func_arg);
if (res == 0) if (res == XTRADB_ICP_NO_MATCH)
goto next_rec; goto next_rec;
if (res == 2) { else if (res != XTRADB_ICP_MATCH) {
err = DB_RECORD_NOT_FOUND; err= (res == XTRADB_ICP_ABORTED_BY_USER ?
DB_SEARCH_ABORTED_BY_USER :
DB_RECORD_NOT_FOUND);
goto idx_cond_failed; goto idx_cond_failed;
} }
/* res == XTRADB_ICP_MATCH */
} }
/* Get the clustered index record if needed, if we did not do the /* Get the clustered index record if needed, if we did not do the
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment