Commit c73c6aea authored by Kristian Nielsen's avatar Kristian Nielsen

MDEV-33426: Aria temptables wrong thread-specific memory accounting in slave thread

Aria temporary tables account allocated memory as specific to the current
THD. But this fails for slave threads, where the temporary tables need to be
detached from any specific THD.

Introduce a new flag to mark temporary tables in replication as "global",
and use that inside Aria to not account memory allocations as thread
specific for such tables.

Based on original suggestion by Monty.
Reviewed-by: default avatarMonty <monty@mariadb.org>
Signed-off-by: default avatarKristian Nielsen <knielsen@knielsen-hq.org>
parent ae709b64
......@@ -49,6 +49,7 @@
#define HA_OPEN_MERGE_TABLE 2048U
#define HA_OPEN_FOR_CREATE 4096U
#define HA_OPEN_FOR_DROP (1U << 13) /* Open part of drop */
#define HA_OPEN_GLOBAL_TMP_TABLE (1U << 14) /* TMP table used by repliction */
/*
Allow opening even if table is incompatible as this is for ALTER TABLE which
......@@ -367,6 +368,12 @@ enum ha_base_keytype {
#define HA_CREATE_INTERNAL_TABLE 256U
#define HA_PRESERVE_INSERT_ORDER 512U
#define HA_CREATE_NO_ROLLBACK 1024U
/*
A temporary table that can be used by different threads, eg. replication
threads. This flag ensure that memory is not allocated with THREAD_SPECIFIC,
as we do for other temporary tables.
*/
#define HA_CREATE_GLOBAL_TMP_TABLE 2048U
/* Flags used by start_bulk_insert */
......
......@@ -203,6 +203,24 @@ a b
include/stop_slave.inc
SET GLOBAL slave_parallel_mode=@old_mode;
include/start_slave.inc
*** MDEV33426: Memory allocation accounting incorrect for replicated temptable
connection server_1;
CREATE TEMPORARY TABLE t5 (a int) ENGINE=Aria;
CREATE TEMPORARY TABLE t6 (a int) ENGINE=Heap;
INSERT INTO t5 VALUES (1);
INSERT INTO t6 VALUES (2);
connection server_2;
include/stop_slave.inc
connection server_1;
INSERT INTO t1 SELECT a+40, 5 FROM t5;
INSERT INTO t1 SELECT a+40, 6 FROM t6;
DROP TABLE t5, t6;
connection server_2;
include/start_slave.inc
SELECT * FROM t1 WHERE a>=40 ORDER BY a;
a b
41 5
42 6
connection server_2;
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
......
......@@ -265,6 +265,30 @@ SET GLOBAL slave_parallel_mode=@old_mode;
--source include/start_slave.inc
--echo *** MDEV33426: Memory allocation accounting incorrect for replicated temptable
--connection server_1
CREATE TEMPORARY TABLE t5 (a int) ENGINE=Aria;
CREATE TEMPORARY TABLE t6 (a int) ENGINE=Heap;
INSERT INTO t5 VALUES (1);
INSERT INTO t6 VALUES (2);
--save_master_pos
--connection server_2
--sync_with_master
--source include/stop_slave.inc
--connection server_1
INSERT INTO t1 SELECT a+40, 5 FROM t5;
INSERT INTO t1 SELECT a+40, 6 FROM t6;
DROP TABLE t5, t6;
--save_master_pos
--connection server_2
--source include/start_slave.inc
--sync_with_master
SELECT * FROM t1 WHERE a>=40 ORDER BY a;
# Clean up.
--connection server_2
......
......@@ -2827,6 +2827,17 @@ int handler::ha_open(TABLE *table_arg, const char *name, int mode,
DBUG_ASSERT(alloc_root_inited(&table->mem_root));
set_partitions_to_open(partitions_to_open);
internal_tmp_table= MY_TEST(test_if_locked & HA_OPEN_INTERNAL_TABLE);
if (!internal_tmp_table && (test_if_locked & HA_OPEN_TMP_TABLE) &&
current_thd->slave_thread)
{
/*
This is a temporary table used by replication that is not attached
to a THD. Mark it as a global temporary table.
*/
test_if_locked|= HA_OPEN_GLOBAL_TMP_TABLE;
}
if (unlikely((error=open(name,mode,test_if_locked))))
{
......@@ -2872,7 +2883,6 @@ int handler::ha_open(TABLE *table_arg, const char *name, int mode,
cached_table_flags= table_flags();
}
reset_statistics();
internal_tmp_table= MY_TEST(test_if_locked & HA_OPEN_INTERNAL_TABLE);
DBUG_RETURN(error);
}
......@@ -4857,6 +4867,9 @@ handler::ha_create(const char *name, TABLE *form, HA_CREATE_INFO *info_arg)
{
DBUG_ASSERT(m_lock_type == F_UNLCK);
mark_trx_read_write();
if ((info_arg->options & HA_LEX_CREATE_TMP_TABLE) &&
current_thd->slave_thread)
info_arg->options|= HA_LEX_CREATE_GLOBAL_TMP_TABLE;
int error= create(name, form, info_arg);
if (!error &&
!(info_arg->options & (HA_LEX_CREATE_TMP_TABLE | HA_CREATE_TMP_ALTER)))
......
......@@ -465,6 +465,12 @@ enum enum_alter_inplace_result {
#define HA_LEX_CREATE_SEQUENCE 16U
#define HA_VERSIONED_TABLE 32U
#define HA_SKIP_KEY_SORT 64U
/*
A temporary table that can be used by different threads, eg. replication
threads. This flag ensure that memory is not allocated with THREAD_SPECIFIC,
as we do for other temporary tables.
*/
#define HA_LEX_CREATE_GLOBAL_TMP_TABLE 128U
#define HA_MAX_REC_LENGTH 65535
......
......@@ -1115,11 +1115,16 @@ TABLE *THD::open_temporary_table(TMP_TABLE_SHARE *share,
DBUG_RETURN(NULL); /* Out of memory */
}
uint flags= ha_open_options | (open_options & HA_OPEN_FOR_CREATE);
/*
In replication, temporary tables are not confined to a single
thread/THD.
*/
if (slave_thread)
flags|= HA_OPEN_GLOBAL_TMP_TABLE;
if (open_table_from_share(this, share, &alias,
(uint) HA_OPEN_KEYFILE,
EXTRA_RECORD,
(ha_open_options |
(open_options & HA_OPEN_FOR_CREATE)),
EXTRA_RECORD, flags,
table, false))
{
my_free(table);
......
......@@ -3188,6 +3188,8 @@ int ha_maria::create(const char *name, TABLE *table_arg,
if (ha_create_info->tmp_table())
{
create_flags|= HA_CREATE_TMP_TABLE | HA_CREATE_DELAY_KEY_WRITE;
if (ha_create_info->options & HA_LEX_CREATE_GLOBAL_TMP_TABLE)
create_flags|= HA_CREATE_GLOBAL_TMP_TABLE;
create_info.transactional= 0;
}
if (ha_create_info->options & HA_CREATE_KEEP_FILES)
......
......@@ -232,7 +232,7 @@ my_bool _ma_bitmap_init(MARIA_SHARE *share, File file,
uint max_page_size;
MARIA_FILE_BITMAP *bitmap= &share->bitmap;
uint size= share->block_size;
myf flag= MY_WME | (share->temporary ? MY_THREAD_SPECIFIC : 0);
myf flag= MY_WME | share->malloc_flag;
pgcache_page_no_t first_bitmap_with_space;
#ifndef DBUG_OFF
/* We want to have a copy of the bitmap to be able to print differences */
......
......@@ -485,7 +485,7 @@ my_bool _ma_init_block_record(MARIA_HA *info)
{
MARIA_ROW *row= &info->cur_row, *new_row= &info->new_row;
MARIA_SHARE *share= info->s;
myf flag= MY_WME | (share->temporary ? MY_THREAD_SPECIFIC : 0);
myf flag= MY_WME | share->malloc_flag;
uint default_extents;
DBUG_ENTER("_ma_init_block_record");
......@@ -2642,7 +2642,6 @@ static my_bool write_block_record(MARIA_HA *info,
LSN lsn;
my_off_t position;
uint save_my_errno;
myf myflag= MY_WME | (share->temporary ? MY_THREAD_SPECIFIC : 0);
DBUG_ENTER("write_block_record");
head_block= bitmap_blocks->block;
......@@ -2709,7 +2708,7 @@ static my_bool write_block_record(MARIA_HA *info,
for every data segment we want to store.
*/
if (_ma_alloc_buffer(&info->rec_buff, &info->rec_buff_size,
row->head_length, myflag))
row->head_length, MY_WME | share->malloc_flag))
DBUG_RETURN(1);
tmp_data_used= 0; /* Either 0 or last used uchar in 'data' */
......@@ -4719,7 +4718,7 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record,
MARIA_EXTENT_CURSOR extent;
MARIA_COLUMNDEF *column, *end_column;
MARIA_ROW *cur_row= &info->cur_row;
myf myflag= MY_WME | (share->temporary ? MY_THREAD_SPECIFIC : 0);
myf myflag= MY_WME | share->malloc_flag;
DBUG_ENTER("_ma_read_block_record2");
start_of_data= data;
......@@ -5052,7 +5051,6 @@ static my_bool read_row_extent_info(MARIA_HA *info, uchar *buff,
uint flag, row_extents, row_extents_size;
uint field_lengths __attribute__ ((unused));
uchar *extents, *end;
myf myflag= MY_WME | (share->temporary ? MY_THREAD_SPECIFIC : 0);
DBUG_ENTER("read_row_extent_info");
if (!(data= get_record_position(share, buff,
......@@ -5076,7 +5074,7 @@ static my_bool read_row_extent_info(MARIA_HA *info, uchar *buff,
if (info->cur_row.extents_buffer_length < row_extents_size &&
_ma_alloc_buffer(&info->cur_row.extents,
&info->cur_row.extents_buffer_length,
row_extents_size, myflag))
row_extents_size, MY_WME | share->malloc_flag))
DBUG_RETURN(1);
memcpy(info->cur_row.extents, data, ROW_EXTENT_SIZE);
data+= ROW_EXTENT_SIZE;
......@@ -5247,7 +5245,7 @@ my_bool _ma_cmp_block_unique(MARIA_HA *info, MARIA_UNIQUEDEF *def,
my_bool _ma_scan_init_block_record(MARIA_HA *info)
{
MARIA_SHARE *share= info->s;
myf flag= MY_WME | (share->temporary ? MY_THREAD_SPECIFIC : 0);
myf flag= MY_WME | share->malloc_flag;
DBUG_ENTER("_ma_scan_init_block_record");
DBUG_ASSERT(info->dfile.file == share->bitmap.file.file);
......
......@@ -1271,7 +1271,6 @@ static int check_dynamic_record(HA_CHECK *param, MARIA_HA *info, int extend,
ulong UNINIT_VAR(left_length);
uint b_type;
char llbuff[22],llbuff2[22],llbuff3[22];
myf myflag= MY_WME | (share->temporary ? MY_THREAD_SPECIFIC : 0);
DBUG_ENTER("check_dynamic_record");
pos= 0;
......@@ -1379,7 +1378,8 @@ static int check_dynamic_record(HA_CHECK *param, MARIA_HA *info, int extend,
{
if (_ma_alloc_buffer(&info->rec_buff, &info->rec_buff_size,
block_info.rec_len +
share->base.extra_rec_buff_size, myflag))
share->base.extra_rec_buff_size,
MY_WME | share->malloc_flag))
{
_ma_check_print_error(param,
......
......@@ -103,7 +103,7 @@ int maria_create(const char *name, enum data_file_type datafile_type,
DBUG_ASSERT(maria_inited);
if (flags & HA_CREATE_TMP_TABLE)
if ((flags & HA_CREATE_TMP_TABLE) && !(flags & HA_CREATE_GLOBAL_TMP_TABLE))
common_flag|= MY_THREAD_SPECIFIC;
if (!ci)
......
......@@ -1478,7 +1478,6 @@ int _ma_read_dynamic_record(MARIA_HA *info, uchar *buf,
uchar *UNINIT_VAR(to);
uint UNINIT_VAR(left_length);
MARIA_SHARE *share= info->s;
myf flag= MY_WME | (share->temporary ? MY_THREAD_SPECIFIC : 0);
DBUG_ENTER("_ma_read_dynamic_record");
if (filepos == HA_OFFSET_ERROR)
......@@ -1515,7 +1514,8 @@ int _ma_read_dynamic_record(MARIA_HA *info, uchar *buf,
{
if (_ma_alloc_buffer(&info->rec_buff, &info->rec_buff_size,
block_info.rec_len +
share->base.extra_rec_buff_size, flag))
share->base.extra_rec_buff_size,
MY_WME | share->malloc_flag))
goto err;
}
to= info->rec_buff;
......@@ -1771,7 +1771,6 @@ int _ma_read_rnd_dynamic_record(MARIA_HA *info,
uchar *UNINIT_VAR(to);
MARIA_BLOCK_INFO block_info;
MARIA_SHARE *share= info->s;
myf flag= MY_WME | (share->temporary ? MY_THREAD_SPECIFIC : 0);
DBUG_ENTER("_ma_read_rnd_dynamic_record");
#ifdef MARIA_EXTERNAL_LOCKING
......@@ -1862,7 +1861,8 @@ int _ma_read_rnd_dynamic_record(MARIA_HA *info,
{
if (_ma_alloc_buffer(&info->rec_buff, &info->rec_buff_size,
block_info.rec_len +
share->base.extra_rec_buff_size, flag))
share->base.extra_rec_buff_size,
MY_WME | share->malloc_flag))
goto err;
}
to= info->rec_buff;
......
......@@ -539,7 +539,7 @@ int maria_reset(MARIA_HA *info)
{
int error= 0;
MARIA_SHARE *share= info->s;
myf flag= MY_WME | (share->temporary ? MY_THREAD_SPECIFIC : 0);
myf flag= MY_WME | share->malloc_flag;
DBUG_ENTER("maria_reset");
/*
Free buffers and reset the following flags:
......
......@@ -98,7 +98,7 @@ static MARIA_HA *maria_clone_internal(MARIA_SHARE *share,
uint errpos;
MARIA_HA info,*m_info;
my_bitmap_map *changed_fields_bitmap;
myf flag= MY_WME | (share->temporary ? MY_THREAD_SPECIFIC : 0);
myf flag= MY_WME | share->malloc_flag;
DBUG_ENTER("maria_clone_internal");
errpos= 0;
......@@ -265,7 +265,9 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags)
uint i,j,len,errpos,head_length,base_pos,keys, realpath_err,
key_parts,base_key_parts,unique_key_parts,fulltext_keys,uniques;
uint internal_table= MY_TEST(open_flags & HA_OPEN_INTERNAL_TABLE);
myf common_flag= open_flags & HA_OPEN_TMP_TABLE ? MY_THREAD_SPECIFIC : 0;
myf common_flag= (((open_flags & HA_OPEN_TMP_TABLE) &&
!(open_flags & HA_OPEN_GLOBAL_TMP_TABLE)) ?
MY_THREAD_SPECIFIC : 0);
uint file_version;
size_t info_length;
char name_buff[FN_REFLEN], org_name[FN_REFLEN], index_name[FN_REFLEN],
......@@ -885,9 +887,10 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags)
if (open_flags & HA_OPEN_TMP_TABLE || share->options & HA_OPTION_TMP_TABLE)
{
common_flag|= MY_THREAD_SPECIFIC;
share->options|= HA_OPTION_TMP_TABLE;
share->temporary= share->delay_key_write= 1;
share->malloc_flag=
(open_flags & HA_OPEN_GLOBAL_TMP_TABLE) ? 0 : MY_THREAD_SPECIFIC;
share->write_flag=MYF(MY_NABP);
share->w_locks++; /* We don't have to update status */
share->tot_locks++;
......@@ -1954,9 +1957,8 @@ void _ma_set_index_pagecache_callbacks(PAGECACHE_FILE *file,
int _ma_open_datafile(MARIA_HA *info, MARIA_SHARE *share)
{
myf flags= MY_WME | (share->mode & O_NOFOLLOW ? MY_NOSYMLINKS : 0);
if (share->temporary)
flags|= MY_THREAD_SPECIFIC;
myf flags= MY_WME | (share->mode & O_NOFOLLOW ? MY_NOSYMLINKS : 0) |
share->malloc_flag;
DEBUG_SYNC_C("mi_open_datafile");
info->dfile.file= share->bitmap.file.file=
mysql_file_open(key_file_dfile, share->data_file_name.str,
......
......@@ -1414,7 +1414,6 @@ uint _ma_pack_get_block_info(MARIA_HA *maria, MARIA_BIT_BUFF *bit_buff,
uchar *header= info->header;
uint head_length,UNINIT_VAR(ref_length);
MARIA_SHARE *share= maria->s;
myf flag= MY_WME | (share->temporary ? MY_THREAD_SPECIFIC : 0);
if (file >= 0)
{
......@@ -1441,7 +1440,8 @@ uint _ma_pack_get_block_info(MARIA_HA *maria, MARIA_BIT_BUFF *bit_buff,
*/
if (_ma_alloc_buffer(rec_buff_p, rec_buff_size_p,
info->rec_len + info->blob_len +
share->base.extra_rec_buff_size, flag))
share->base.extra_rec_buff_size,
MY_WME | share->malloc_flag))
return BLOCK_FATAL_ERROR; /* not enough memory */
bit_buff->blob_pos= *rec_buff_p + info->rec_len;
bit_buff->blob_end= bit_buff->blob_pos + info->blob_len;
......@@ -1583,7 +1583,6 @@ _ma_mempack_get_block_info(MARIA_HA *maria,
uchar *header)
{
MARIA_SHARE *share= maria->s;
myf flag= MY_WME | (share->temporary ? MY_THREAD_SPECIFIC : 0);
header+= read_pack_length((uint) share->pack.version, header,
&info->rec_len);
......@@ -1593,7 +1592,8 @@ _ma_mempack_get_block_info(MARIA_HA *maria,
&info->blob_len);
/* _ma_alloc_rec_buff sets my_errno on error */
if (_ma_alloc_buffer(rec_buff_p, rec_buff_size_p,
info->blob_len + share->base.extra_rec_buff_size, flag))
info->blob_len + share->base.extra_rec_buff_size,
MY_WME | share->malloc_flag))
return 0; /* not enough memory */
bit_buff->blob_pos= *rec_buff_p;
bit_buff->blob_end= *rec_buff_p + info->blob_len;
......
......@@ -454,6 +454,11 @@ typedef struct st_maria_share
ulong max_pack_length;
ulong state_diff_length;
uint rec_reflength; /* rec_reflength in use now */
/*
Extra flag to use for my_malloc(); set to MY_THREAD_SPECIFIC for temporary
tables whose memory allocation should be accounted to the current THD.
*/
uint malloc_flag;
uint keypage_header;
uint32 ftkeys; /* Number of distinct full-text keys
+ 1 */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment