Commit c2f2a41e authored by unknown's avatar unknown

Maria - merging recent changes done to MyISAM into Maria.

Plus compiler warnings, and a fix to the pagecache unit tests for IA64


include/maria.h:
  merging MyISAM into Maria
include/myisam.h:
  post-merge fixes
mysql-test/r/maria.result:
  merging MyISAM into Maria
mysql-test/t/maria.test:
  merging MyISAM into Maria
sql/mysqld.cc:
  post-merge fixes
storage/maria/ha_maria.cc:
  merging MyISAM into Maria
storage/maria/ha_maria.h:
  merging MyISAM into Maria
storage/maria/ma_check.c:
  merging MyISAM into Maria
storage/maria/ma_open.c:
  merging MyISAM into Maria
storage/maria/ma_packrec.c:
  merging MyISAM into Maria
storage/maria/ma_range.c:
  merging MyISAM into Maria
storage/maria/ma_sort.c:
  merging MyISAM into Maria
storage/maria/maria_def.h:
  merging MyISAM into Maria
storage/maria/maria_pack.c:
  merging MyISAM into Maria
storage/maria/plug.in:
  merging MyISAM into Maria
storage/myisam/myisamdef.h:
  merging MyISAM into Maria
storage/myisam/myisampack.c:
  fix for compiler warnings
unittest/mysys/mf_pagecache_consist.c:
  this sets the stack size lower than the minimum on IA64, we remove it
  (it made the test fail)
unittest/mysys/mf_pagecache_single.c:
  this sets the stack size lower than the minimum on IA64, we remove it
  (it made the test fail)
parent 51bb36a0
......@@ -293,6 +293,17 @@ extern uint maria_get_pointer_length(ulonglong file_length, uint def);
#define MARIA_CHK_REPAIR 1 /* equivalent to mariachk -r */
#define MARIA_CHK_VERIFY 2 /* Verify, run repair if failure */
typedef uint maria_bit_type;
typedef struct st_maria_bit_buff
{ /* Used for packing of record */
maria_bit_type current_byte;
uint bits;
uchar *pos, *end, *blob_pos, *blob_end;
uint error;
} MARIA_BIT_BUFF;
typedef struct st_maria_sort_info
{
#ifdef THREAD
......@@ -305,7 +316,6 @@ typedef struct st_maria_sort_info
char *buff;
SORT_KEY_BLOCKS *key_block, *key_block_end;
SORT_FT_BUF *ft_buf;
my_off_t filelength, dupp, buff_length;
ha_rows max_records;
uint current_key, total_keys;
......@@ -314,12 +324,12 @@ typedef struct st_maria_sort_info
enum data_file_type new_data_file_type;
} MARIA_SORT_INFO;
typedef struct st_maria_sort_param
{
pthread_t thr;
IO_CACHE read_cache, tempfile, tempfile_for_exceptions;
DYNAMIC_ARRAY buffpek;
MARIA_BIT_BUFF bit_buff; /* For parallel repair of packrec. */
MARIA_KEYDEF *keyinfo;
MARIA_SORT_INFO *sort_info;
......@@ -342,6 +352,7 @@ typedef struct st_maria_sort_param
uint key, key_length,real_key_length,sortbuff_size;
uint maxbuffers, keys, find_length, sort_keys_length;
my_bool fix_datafile, master;
my_bool calc_checksum; /* calculate table checksum */
int (*key_cmp)(struct st_maria_sort_param *, const void *, const void *);
int (*key_read)(struct st_maria_sort_param *,void *);
......
......@@ -300,6 +300,17 @@ extern uint mi_get_pointer_length(ulonglong file_length, uint def);
#define MYISAMCHK_REPAIR 1 /* equivalent to myisamchk -r */
#define MYISAMCHK_VERIFY 2 /* Verify, run repair if failure */
typedef uint mi_bit_type;
typedef struct st_mi_bit_buff
{ /* Used for packing of record */
mi_bit_type current_byte;
uint bits;
uchar *pos, *end, *blob_pos, *blob_end;
uint error;
} MI_BIT_BUFF;
typedef struct st_sort_info
{
#ifdef THREAD
......@@ -319,10 +330,13 @@ typedef struct st_sort_info
myf myf_rw;
enum data_file_type new_data_file_type;
} MI_SORT_INFO;
typedef struct st_mi_sort_param
{
pthread_t thr;
IO_CACHE read_cache, tempfile, tempfile_for_exceptions;
DYNAMIC_ARRAY buffpek;
MI_BIT_BUFF bit_buff; /* For parallel repair of packrec. */
MI_KEYDEF *keyinfo;
MI_SORT_INFO *sort_info;
......@@ -345,6 +359,7 @@ typedef struct st_mi_sort_param
uint key, key_length,real_key_length,sortbuff_size;
uint maxbuffers, keys, find_length, sort_keys_length;
my_bool fix_datafile, master;
my_bool calc_checksum; /* calculate table checksum */
int (*key_cmp)(struct st_mi_sort_param *, const void *, const void *);
int (*key_read)(struct st_mi_sort_param *,void *);
......
......@@ -780,6 +780,132 @@ a b
xxxxxxxxx bbbbbb
xxxxxxxxx bbbbbb
DROP TABLE t1;
SET @@maria_repair_threads=2;
SHOW VARIABLES LIKE 'maria_repair%';
Variable_name Value
maria_repair_threads 2
CREATE TABLE t1 (
`_id` int(11) NOT NULL default '0',
`url` text,
`email` text,
`description` text,
`loverlap` int(11) default NULL,
`roverlap` int(11) default NULL,
`lneighbor_id` int(11) default NULL,
`rneighbor_id` int(11) default NULL,
`length_` int(11) default NULL,
`sequence` mediumtext,
`name` text,
`_obj_class` text NOT NULL,
PRIMARY KEY (`_id`),
UNIQUE KEY `sequence_name_index` (`name`(50)),
KEY (`length_`)
) ENGINE=maria DEFAULT CHARSET=latin1;
INSERT INTO t1 VALUES
(1,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample1',''),
(2,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample2',''),
(3,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample3',''),
(4,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample4',''),
(5,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample5',''),
(6,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample6',''),
(7,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample7',''),
(8,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample8',''),
(9,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample9','');
SELECT _id FROM t1;
_id
1
2
3
4
5
6
7
8
9
DELETE FROM t1 WHERE _id < 8;
SHOW TABLE STATUS LIKE 't1';
Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment
t1 MARIA 10 Dynamic 2 # # # # 140 # # # # # #
CHECK TABLE t1 EXTENDED;
Table Op Msg_type Msg_text
test.t1 check status OK
OPTIMIZE TABLE t1;
Table Op Msg_type Msg_text
test.t1 optimize status OK
CHECK TABLE t1 EXTENDED;
Table Op Msg_type Msg_text
test.t1 check status OK
SHOW TABLE STATUS LIKE 't1';
Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment
t1 MARIA 10 Dynamic 2 # # # # 0 # # # # # #
SELECT _id FROM t1;
_id
8
9
DROP TABLE t1;
CREATE TABLE t1 (
`_id` int(11) NOT NULL default '0',
`url` text,
`email` text,
`description` text,
`loverlap` int(11) default NULL,
`roverlap` int(11) default NULL,
`lneighbor_id` int(11) default NULL,
`rneighbor_id` int(11) default NULL,
`length_` int(11) default NULL,
`sequence` mediumtext,
`name` text,
`_obj_class` text NOT NULL,
PRIMARY KEY (`_id`),
UNIQUE KEY `sequence_name_index` (`name`(50)),
KEY (`length_`)
) ENGINE=maria DEFAULT CHARSET=latin1;
INSERT INTO t1 VALUES
(1,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample1',''),
(2,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample2',''),
(3,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample3',''),
(4,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample4',''),
(5,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample5',''),
(6,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample6',''),
(7,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample7',''),
(8,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample8',''),
(9,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample9','');
SELECT _id FROM t1;
_id
1
2
3
4
5
6
7
8
9
DELETE FROM t1 WHERE _id < 8;
SHOW TABLE STATUS LIKE 't1';
Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment
t1 MARIA 10 Dynamic 2 # # # # 140 # # # # # #
CHECK TABLE t1 EXTENDED;
Table Op Msg_type Msg_text
test.t1 check status OK
REPAIR TABLE t1 QUICK;
Table Op Msg_type Msg_text
test.t1 repair status OK
CHECK TABLE t1 EXTENDED;
Table Op Msg_type Msg_text
test.t1 check status OK
SHOW TABLE STATUS LIKE 't1';
Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment
t1 MARIA 10 Dynamic 2 # # # # 140 # # # # # #
SELECT _id FROM t1;
_id
8
9
DROP TABLE t1;
SET @@maria_repair_threads=1;
SHOW VARIABLES LIKE 'maria_repair%';
Variable_name Value
maria_repair_threads 1
drop table if exists t1,t2,t3;
--- Testing varchar ---
--- Testing varchar ---
......
......@@ -733,6 +733,98 @@ UPDATE t1 AS ta1,t1 AS ta2 SET ta1.b='aaaaaa',ta2.b='bbbbbb';
SELECT * FROM t1;
DROP TABLE t1;
#
# Bug#8283 - OPTIMIZE TABLE causes data loss
#
SET @@maria_repair_threads=2;
SHOW VARIABLES LIKE 'maria_repair%';
#
# Test OPTIMIZE. This creates a new data file.
CREATE TABLE t1 (
`_id` int(11) NOT NULL default '0',
`url` text,
`email` text,
`description` text,
`loverlap` int(11) default NULL,
`roverlap` int(11) default NULL,
`lneighbor_id` int(11) default NULL,
`rneighbor_id` int(11) default NULL,
`length_` int(11) default NULL,
`sequence` mediumtext,
`name` text,
`_obj_class` text NOT NULL,
PRIMARY KEY (`_id`),
UNIQUE KEY `sequence_name_index` (`name`(50)),
KEY (`length_`)
) ENGINE=maria DEFAULT CHARSET=latin1;
#
INSERT INTO t1 VALUES
(1,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample1',''),
(2,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample2',''),
(3,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample3',''),
(4,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample4',''),
(5,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample5',''),
(6,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample6',''),
(7,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample7',''),
(8,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample8',''),
(9,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample9','');
#
SELECT _id FROM t1;
DELETE FROM t1 WHERE _id < 8;
--replace_column 6 # 7 # 8 # 9 # 11 # 12 # 13 # 14 # 15 # 16 #
SHOW TABLE STATUS LIKE 't1';
CHECK TABLE t1 EXTENDED;
OPTIMIZE TABLE t1;
CHECK TABLE t1 EXTENDED;
--replace_column 6 # 7 # 8 # 9 # 11 # 12 # 13 # 14 # 15 # 16 #
SHOW TABLE STATUS LIKE 't1';
SELECT _id FROM t1;
DROP TABLE t1;
#
# Test REPAIR QUICK. This retains the old data file.
CREATE TABLE t1 (
`_id` int(11) NOT NULL default '0',
`url` text,
`email` text,
`description` text,
`loverlap` int(11) default NULL,
`roverlap` int(11) default NULL,
`lneighbor_id` int(11) default NULL,
`rneighbor_id` int(11) default NULL,
`length_` int(11) default NULL,
`sequence` mediumtext,
`name` text,
`_obj_class` text NOT NULL,
PRIMARY KEY (`_id`),
UNIQUE KEY `sequence_name_index` (`name`(50)),
KEY (`length_`)
) ENGINE=maria DEFAULT CHARSET=latin1;
#
INSERT INTO t1 VALUES
(1,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample1',''),
(2,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample2',''),
(3,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample3',''),
(4,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample4',''),
(5,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample5',''),
(6,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample6',''),
(7,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample7',''),
(8,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample8',''),
(9,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample9','');
#
SELECT _id FROM t1;
DELETE FROM t1 WHERE _id < 8;
--replace_column 6 # 7 # 8 # 9 # 11 # 12 # 13 # 14 # 15 # 16 #
SHOW TABLE STATUS LIKE 't1';
CHECK TABLE t1 EXTENDED;
REPAIR TABLE t1 QUICK;
CHECK TABLE t1 EXTENDED;
--replace_column 6 # 7 # 8 # 9 # 11 # 12 # 13 # 14 # 15 # 16 #
SHOW TABLE STATUS LIKE 't1';
SELECT _id FROM t1;
DROP TABLE t1;
#
SET @@maria_repair_threads=1;
SHOW VARIABLES LIKE 'maria_repair%';
#
# Test varchar
#
......
......@@ -3537,7 +3537,7 @@ int main(int argc, char **argv)
{
if (global_system_variables.log_warnings)
sql_print_warning("Asked for %ld thread stack, but got %ld",
my_thread_stack, (long) stack_size);
my_thread_stack_size, (long) stack_size);
#if defined(__ia64__) || defined(__ia64)
my_thread_stack_size= stack_size*2;
#else
......
......@@ -283,7 +283,7 @@ err:
bool ha_maria::check_if_locking_is_allowed(uint sql_command,
ulong type, TABLE *table,
uint count,
bool called_by_logger_thread)
bool called_by_privileged_thread)
{
/*
To be able to open and lock for reading system tables like 'mysql.proc',
......@@ -300,10 +300,10 @@ bool ha_maria::check_if_locking_is_allowed(uint sql_command,
/*
Deny locking of the log tables, which is incompatible with
concurrent insert. Unless called from a logger THD:
general_log_thd or slow_log_thd.
concurrent insert. Unless called from a logger THD (general_log_thd
or slow_log_thd) or by a privileged thread.
*/
if (!called_by_logger_thread)
if (!called_by_privileged_thread)
return check_if_log_table_locking_is_allowed(sql_command, type, table);
return TRUE;
......@@ -1368,7 +1368,7 @@ void ha_maria::position(const byte * record)
}
void ha_maria::info(uint flag)
int ha_maria::info(uint flag)
{
MARIA_INFO info;
char name_buff[FN_REFLEN];
......@@ -1428,6 +1428,8 @@ void ha_maria::info(uint flag)
/* Faster to always update, than to do it based on flag */
stats.update_time= info.update_time;
stats.auto_increment_value= info.auto_increment;
return 0;
}
......
......@@ -103,7 +103,7 @@ public:
int rnd_pos(byte * buf, byte * pos);
int restart_rnd_next(byte * buf, byte * pos);
void position(const byte * record);
void info(uint);
int info(uint);
int extra(enum ha_extra_function operation);
int extra_opt(enum ha_extra_function operation, ulong cache_size);
int reset(void);
......
......@@ -16,6 +16,31 @@
/* Describe, check and repair of MARIA tables */
/*
About checksum calculation.
There are two types of checksums. Table checksum and row checksum.
Row checksum is an additional byte at the end of dynamic length
records. It must be calculated if the table is configured for them.
Otherwise they must not be used. The variable
MYISAM_SHARE::calc_checksum determines if row checksums are used.
MI_INFO::checksum is used as temporary storage during row handling.
For parallel repair we must assure that only one thread can use this
variable. There is no problem on the write side as this is done by one
thread only. But when checking a record after read this could go
wrong. But since all threads read through a common read buffer, it is
sufficient if only one thread checks it.
Table checksum is an eight byte value in the header of the index file.
It can be calculated even if row checksums are not used. The variable
MI_CHECK::glob_crc is calculated over all records.
MI_SORT_PARAM::calc_checksum determines if this should be done. This
variable is not part of MI_CHECK because it must be set per thread for
parallel repair. The global glob_crc must be changed by one thread
only. And it is sufficient to calculate the checksum once only.
*/
#include "ma_ftdefs.h"
#include <myisamchk.h>
#include <m_ctype.h>
......@@ -42,8 +67,7 @@ static int chk_index(HA_CHECK *param, MARIA_HA *info,MARIA_KEYDEF *keyinfo,
ha_checksum *key_checksum, uint level);
static uint isam_key_length(MARIA_HA *info,MARIA_KEYDEF *keyinfo);
static ha_checksum calc_checksum(ha_rows count);
static int writekeys(HA_CHECK *param, MARIA_HA *info,byte *buff,
my_off_t filepos);
static int writekeys(MARIA_SORT_PARAM *sort_param);
static int sort_one_index(HA_CHECK *param, MARIA_HA *info,MARIA_KEYDEF *keyinfo,
my_off_t pagepos, File new_file);
static int sort_key_read(MARIA_SORT_PARAM *sort_param,void *key);
......@@ -1159,7 +1183,8 @@ int maria_chk_data_link(HA_CHECK *param, MARIA_HA *info,int extend)
goto err;
start_recpos=pos;
splits++;
VOID(_ma_pack_get_block_info(info,&block_info, -1, start_recpos));
VOID(_ma_pack_get_block_info(info, &info->bit_buff, &block_info,
&info->rec_buff, -1, start_recpos));
pos=block_info.filepos+block_info.rec_len;
if (block_info.rec_len < (uint) info->s->min_pack_length ||
block_info.rec_len > (uint) info->s->max_pack_length)
......@@ -1173,7 +1198,8 @@ int maria_chk_data_link(HA_CHECK *param, MARIA_HA *info,int extend)
if (_ma_read_cache(&param->read_cache,(byte*) info->rec_buff,
block_info.filepos, block_info.rec_len, READING_NEXT))
goto err;
if (_ma_pack_rec_unpack(info,record,info->rec_buff,block_info.rec_len))
if (_ma_pack_rec_unpack(info, &info->bit_buff, record,
info->rec_buff, block_info.rec_len))
{
_ma_check_print_error(param,"Found wrong record at %s",
llstr(start_recpos,llbuff));
......@@ -1457,7 +1483,7 @@ int maria_repair(HA_CHECK *param, register MARIA_HA *info,
info->state->empty=0;
param->glob_crc=0;
if (param->testflag & T_CALC_CHECKSUM)
param->calc_checksum=1;
sort_param.calc_checksum= 1;
info->update= (short) (HA_STATE_CHANGED | HA_STATE_ROW_CHANGED);
......@@ -1486,7 +1512,7 @@ int maria_repair(HA_CHECK *param, register MARIA_HA *info,
/* Re-create all keys, which are set in key_map. */
while (!(error=sort_get_next_record(&sort_param)))
{
if (writekeys(param,info,(byte*)sort_param.record,sort_param.filepos))
if (writekeys(&sort_param))
{
if (my_errno != HA_ERR_FOUND_DUPP_KEY)
goto err;
......@@ -1631,11 +1657,13 @@ err:
/* Uppate keyfile when doing repair */
static int writekeys(HA_CHECK *param, register MARIA_HA *info, byte *buff,
my_off_t filepos)
static int writekeys(MARIA_SORT_PARAM *sort_param)
{
register uint i;
uchar *key;
MARIA_HA *info= sort_param->sort_info->info;
byte *buff= sort_param->record;
my_off_t filepos= sort_param->filepos;
DBUG_ENTER("writekeys");
key=info->lastkey+info->s->base.max_key_length;
......@@ -1689,8 +1717,8 @@ static int writekeys(HA_CHECK *param, register MARIA_HA *info, byte *buff,
}
}
/* Remove checksum that was added to glob_crc in sort_get_next_record */
if (param->calc_checksum)
param->glob_crc-= info->checksum;
if (sort_param->calc_checksum)
sort_param->sort_info->param->glob_crc-= info->checksum;
DBUG_PRINT("error",("errno: %d",my_errno));
DBUG_RETURN(-1);
} /* writekeys */
......@@ -2180,7 +2208,7 @@ int maria_repair_by_sort(HA_CHECK *param, register MARIA_HA *info,
del=info->state->del;
param->glob_crc=0;
if (param->testflag & T_CALC_CHECKSUM)
param->calc_checksum=1;
sort_param.calc_checksum= 1;
rec_per_key_part= param->rec_per_key_part;
for (sort_param.key=0 ; sort_param.key < share->base.keys ;
......@@ -2266,7 +2294,8 @@ int maria_repair_by_sort(HA_CHECK *param, register MARIA_HA *info,
param->retry_repair=1;
goto err;
}
param->calc_checksum=0; /* No need to calc glob_crc */
/* No need to calculate checksum again. */
sort_param.calc_checksum= 0;
free_root(&sort_param.wordroot, MYF(0));
/* Set for next loop */
......@@ -2430,6 +2459,28 @@ err:
Each key is handled by a separate thread.
TODO: make a number of threads a parameter
In parallel repair we use one thread per index. There are two modes:
Quick
Only the indexes are rebuilt. All threads share a read buffer.
Every thread that needs fresh data in the buffer enters the shared
cache lock. The last thread joining the lock reads the buffer from
the data file and wakes all other threads.
Non-quick
The data file is rebuilt and all indexes are rebuilt to point to
the new record positions. One thread is the master thread. It
reads from the old data file and writes to the new data file. It
also creates one of the indexes. The other threads read from a
buffer which is filled by the master. If they need fresh data,
they enter the shared cache lock. If the masters write buffer is
full, it flushes it to the new data file and enters the shared
cache lock too. When all threads joined in the lock, the master
copies its write buffer to the read buffer for the other threads
and wakes them.
RESULT
0 ok
<>0 Error
......@@ -2452,6 +2503,7 @@ int maria_repair_parallel(HA_CHECK *param, register MARIA_HA *info,
ulong *rec_per_key_part;
HA_KEYSEG *keyseg;
char llbuff[22];
IO_CACHE new_data_cache; /* For non-quick repair. */
IO_CACHE_SHARE io_share;
MARIA_SORT_INFO sort_info;
ulonglong key_map=share->state.key_map;
......@@ -2473,19 +2525,55 @@ int maria_repair_parallel(HA_CHECK *param, register MARIA_HA *info,
if (info->s->options & (HA_OPTION_CHECKSUM | HA_OPTION_COMPRESS_RECORD))
param->testflag|=T_CALC_CHECKSUM;
/*
Quick repair (not touching data file, rebuilding indexes):
{
Read cache is (MI_CHECK *param)->read_cache using info->dfile.
}
Non-quick repair (rebuilding data file and indexes):
{
Master thread:
Read cache is (MI_CHECK *param)->read_cache using info->dfile.
Write cache is (MI_INFO *info)->rec_cache using new_file.
Slave threads:
Read cache is new_data_cache synced to master rec_cache.
The final assignment of the filedescriptor for rec_cache is done
after the cache creation.
Don't check file size on new_data_cache, as the resulting file size
is not known yet.
As rec_cache and new_data_cache are synced, write_buffer_length is
used for the read cache 'new_data_cache'. Both start at the same
position 'new_header_length'.
}
*/
DBUG_PRINT("info", ("is quick repair: %d", rep_quick));
bzero((char*)&sort_info,sizeof(sort_info));
/* Initialize pthread structures before goto err. */
pthread_mutex_init(&sort_info.mutex, MY_MUTEX_INIT_FAST);
pthread_cond_init(&sort_info.cond, 0);
if (!(sort_info.key_block=
alloc_key_blocks(param,
(uint) param->sort_key_blocks,
share->base.max_key_block_length))
|| init_io_cache(&param->read_cache,info->dfile,
alloc_key_blocks(param, (uint) param->sort_key_blocks,
share->base.max_key_block_length)) ||
init_io_cache(&param->read_cache, info->dfile,
(uint) param->read_buffer_length,
READ_CACHE,share->pack.header_length,1,MYF(MY_WME)) ||
(! rep_quick &&
init_io_cache(&info->rec_cache,info->dfile,
READ_CACHE, share->pack.header_length, 1, MYF(MY_WME)) ||
(!rep_quick &&
(init_io_cache(&info->rec_cache, info->dfile,
(uint) param->write_buffer_length,
WRITE_CACHE,new_header_length,1,
MYF(MY_WME | MY_WAIT_IF_FULL) & param->myf_rw)))
WRITE_CACHE, new_header_length, 1,
MYF(MY_WME | MY_WAIT_IF_FULL) & param->myf_rw) ||
init_io_cache(&new_data_cache, -1,
(uint) param->write_buffer_length,
READ_CACHE, new_header_length, 1,
MYF(MY_WME | MY_DONT_CHECK_FILESIZE)))))
goto err;
sort_info.key_block_end=sort_info.key_block+param->sort_key_blocks;
info->opt_flag|=WRITE_CACHE_USED;
......@@ -2576,8 +2664,6 @@ int maria_repair_parallel(HA_CHECK *param, register MARIA_HA *info,
del=info->state->del;
param->glob_crc=0;
if (param->testflag & T_CALC_CHECKSUM)
param->calc_checksum=1;
if (!(sort_param=(MARIA_SORT_PARAM *)
my_malloc((uint) share->base.keys *
......@@ -2627,6 +2713,7 @@ int maria_repair_parallel(HA_CHECK *param, register MARIA_HA *info,
sort_param[i].sort_info=&sort_info;
sort_param[i].master=0;
sort_param[i].fix_datafile=0;
sort_param[i].calc_checksum= 0;
sort_param[i].filepos=new_header_length;
sort_param[i].max_pos=sort_param[i].pos=share->pack.header_length;
......@@ -2664,19 +2751,45 @@ int maria_repair_parallel(HA_CHECK *param, register MARIA_HA *info,
sort_info.total_keys=i;
sort_param[0].master= 1;
sort_param[0].fix_datafile= (my_bool)(! rep_quick);
sort_param[0].calc_checksum= test(param->testflag & T_CALC_CHECKSUM);
sort_info.got_error=0;
pthread_mutex_init(&sort_info.mutex, MY_MUTEX_INIT_FAST);
pthread_cond_init(&sort_info.cond, 0);
pthread_mutex_lock(&sort_info.mutex);
init_io_cache_share(&param->read_cache, &io_share, i);
/*
Initialize the I/O cache share for use with the read caches and, in
case of non-quick repair, the write cache. When all threads join on
the cache lock, the writer copies the write cache contents to the
read caches.
*/
if (i > 1)
{
if (rep_quick)
init_io_cache_share(&param->read_cache, &io_share, NULL, i);
else
init_io_cache_share(&new_data_cache, &io_share, &info->rec_cache, i);
}
else
io_share.total_threads= 0; /* share not used */
(void) pthread_attr_init(&thr_attr);
(void) pthread_attr_setdetachstate(&thr_attr,PTHREAD_CREATE_DETACHED);
for (i=0 ; i < sort_info.total_keys ; i++)
{
sort_param[i].read_cache=param->read_cache;
/*
Copy the properly initialized IO_CACHE structure so that every
thread has its own copy. In quick mode param->read_cache is shared
for use by all threads. In non-quick mode all threads but the
first copy the shared new_data_cache, which is synchronized to the
write cache of the first thread. The first thread copies
param->read_cache, which is not shared.
*/
sort_param[i].read_cache= ((rep_quick || !i) ? param->read_cache :
new_data_cache);
DBUG_PRINT("io_cache_share", ("thread: %u read_cache: 0x%lx",
i, (long) &sort_param[i].read_cache));
/*
two approaches: the same amount of memory for each thread
or the memory for the same number of keys for each thread...
......@@ -2694,7 +2807,10 @@ int maria_repair_parallel(HA_CHECK *param, register MARIA_HA *info,
(void *) (sort_param+i)))
{
_ma_check_print_error(param,"Cannot start a repair thread");
remove_io_thread(&param->read_cache);
/* Cleanup: Detach from the share. Avoid others to be blocked. */
if (io_share.total_threads)
remove_io_thread(&sort_param[i].read_cache);
DBUG_PRINT("error", ("Cannot start a repair thread"));
sort_info.got_error=1;
}
else
......@@ -2716,6 +2832,11 @@ int maria_repair_parallel(HA_CHECK *param, register MARIA_HA *info,
if (sort_param[0].fix_datafile)
{
/*
Append some nuls to the end of a memory mapped file. Destroy the
write cache. The master thread did already detach from the share
by remove_io_thread() in sort.c:thr_find_all_keys().
*/
if (maria_write_data_suffix(&sort_info,1) || end_io_cache(&info->rec_cache))
goto err;
if (param->testflag & T_SAFE_REPAIR)
......@@ -2731,8 +2852,13 @@ int maria_repair_parallel(HA_CHECK *param, register MARIA_HA *info,
sort_param->filepos;
/* Only whole records */
share->state.version=(ulong) time((time_t*) 0);
/*
Exchange the data file descriptor of the table, so that we use the
new file from now on.
*/
my_close(info->dfile,MYF(0));
info->dfile=new_file;
share->data_file_type=sort_info.new_data_file_type;
share->pack.header_length=(ulong) new_header_length;
}
......@@ -2787,7 +2913,20 @@ int maria_repair_parallel(HA_CHECK *param, register MARIA_HA *info,
err:
got_error|= _ma_flush_blocks(param, share->key_cache, share->kfile);
/*
Destroy the write cache. The master thread did already detach from
the share by remove_io_thread() or it was not yet started (if the
error happend before creating the thread).
*/
VOID(end_io_cache(&info->rec_cache));
/*
Destroy the new data cache in case of non-quick repair. All slave
threads did either detach from the share by remove_io_thread()
already or they were not yet started (if the error happend before
creating the threads).
*/
if (!rep_quick)
VOID(end_io_cache(&new_data_cache));
if (!got_error)
{
/* Replace the actual file with the temporary file */
......@@ -2920,12 +3059,41 @@ static int sort_maria_ft_key_read(MARIA_SORT_PARAM *sort_param, void *key)
} /* sort_maria_ft_key_read */
/* Read next record from file using parameters in sort_info */
/* Return -1 if end of file, 0 if ok and > 0 if error */
/*
Read next record from file using parameters in sort_info.
SYNOPSIS
sort_get_next_record()
sort_param Information about and for the sort process
NOTE
Dynamic Records With Non-Quick Parallel Repair
For non-quick parallel repair we use a synchronized read/write
cache. This means that one thread is the master who fixes the data
file by reading each record from the old data file and writing it
to the new data file. By doing this the records in the new data
file are written contiguously. Whenever the write buffer is full,
it is copied to the read buffer. The slaves read from the read
buffer, which is not associated with a file. Thus read_cache.file
is -1. When using _mi_read_cache(), the slaves must always set
flag to READING_NEXT so that the function never tries to read from
file. This is safe because the records are contiguous. There is no
need to read outside the cache. This condition is evaluated in the
variable 'parallel_flag' for quick reference. read_cache.file must
be >= 0 in every other case.
RETURN
-1 end of file
0 ok
> 0 error
*/
static int sort_get_next_record(MARIA_SORT_PARAM *sort_param)
{
int searching;
int parallel_flag;
uint found_record,b_type,left_length;
my_off_t pos;
byte *to;
......@@ -2963,7 +3131,7 @@ static int sort_get_next_record(MARIA_SORT_PARAM *sort_param)
sort_param->max_pos=(sort_param->pos+=share->base.pack_reclength);
if (*sort_param->record)
{
if (param->calc_checksum)
if (sort_param->calc_checksum)
param->glob_crc+= (info->checksum=
_ma_static_checksum(info,sort_param->record));
DBUG_RETURN(0);
......@@ -2978,6 +3146,7 @@ static int sort_get_next_record(MARIA_SORT_PARAM *sort_param)
LINT_INIT(to);
pos=sort_param->pos;
searching=(sort_param->fix_datafile && (param->testflag & T_EXTEND));
parallel_flag= (sort_param->read_cache.file < 0) ? READING_NEXT : 0;
for (;;)
{
found_record=block_info.second_read= 0;
......@@ -3008,7 +3177,7 @@ static int sort_get_next_record(MARIA_SORT_PARAM *sort_param)
(byte*) block_info.header,pos,
MARIA_BLOCK_INFO_HEADER_LENGTH,
(! found_record ? READING_NEXT : 0) |
READING_HEADER))
parallel_flag | READING_HEADER))
{
if (found_record)
{
......@@ -3185,9 +3354,31 @@ static int sort_get_next_record(MARIA_SORT_PARAM *sort_param)
llstr(sort_param->start_recpos,llbuff));
goto try_next;
}
if (_ma_read_cache(&sort_param->read_cache,to,block_info.filepos,
/*
Copy information that is already read. Avoid accessing data
below the cache start. This could happen if the header
streched over the end of the previous buffer contents.
*/
{
uint header_len= (uint) (block_info.filepos - pos);
uint prefetch_len= (MARIA_BLOCK_INFO_HEADER_LENGTH - header_len);
if (prefetch_len > block_info.data_len)
prefetch_len= block_info.data_len;
if (prefetch_len)
{
memcpy(to, block_info.header + header_len, prefetch_len);
block_info.filepos+= prefetch_len;
block_info.data_len-= prefetch_len;
left_length-= prefetch_len;
to+= prefetch_len;
}
}
if (block_info.data_len &&
_ma_read_cache(&sort_param->read_cache,to,block_info.filepos,
block_info.data_len,
(found_record == 1 ? READING_NEXT : 0)))
(found_record == 1 ? READING_NEXT : 0) |
parallel_flag))
{
_ma_check_print_info(param,
"Read error for block at: %s (error: %d); Skipped",
......@@ -3217,13 +3408,14 @@ static int sort_get_next_record(MARIA_SORT_PARAM *sort_param)
{
if (sort_param->read_cache.error < 0)
DBUG_RETURN(1);
if (info->s->calc_checksum)
info->checksum=_ma_checksum(info,sort_param->record);
if (sort_param->calc_checksum)
info->checksum= _ma_checksum(info, sort_param->record);
if ((param->testflag & (T_EXTEND | T_REP)) || searching)
{
if (_ma_rec_check(info, sort_param->record, sort_param->rec_buff,
sort_param->find_length,
(param->testflag & T_QUICK) &&
sort_param->calc_checksum &&
test(info->s->calc_checksum)))
{
_ma_check_print_info(param,"Found wrong packed record at %s",
......@@ -3231,7 +3423,7 @@ static int sort_get_next_record(MARIA_SORT_PARAM *sort_param)
goto try_next;
}
}
if (param->calc_checksum)
if (sort_param->calc_checksum)
param->glob_crc+= info->checksum;
DBUG_RETURN(0);
}
......@@ -3258,7 +3450,8 @@ static int sort_get_next_record(MARIA_SORT_PARAM *sort_param)
DBUG_RETURN(1); /* Something wrong with data */
}
sort_param->start_recpos=sort_param->pos;
if (_ma_pack_get_block_info(info,&block_info,-1,sort_param->pos))
if (_ma_pack_get_block_info(info, &sort_param->bit_buff, &block_info,
&sort_param->rec_buff, -1, sort_param->pos))
DBUG_RETURN(-1);
if (!block_info.rec_len &&
sort_param->pos + MEMMAP_EXTRA_MARGIN ==
......@@ -3282,15 +3475,14 @@ static int sort_get_next_record(MARIA_SORT_PARAM *sort_param)
llstr(sort_param->pos,llbuff));
continue;
}
if (_ma_pack_rec_unpack(info,sort_param->record,sort_param->rec_buff,
block_info.rec_len))
if (_ma_pack_rec_unpack(info, &sort_param->bit_buff, sort_param->record,
sort_param->rec_buff, block_info.rec_len))
{
if (! searching)
_ma_check_print_info(param,"Found wrong record at %s",
llstr(sort_param->pos,llbuff));
continue;
}
info->checksum=_ma_checksum(info,sort_param->record);
if (!sort_param->fix_datafile)
{
sort_param->filepos=sort_param->pos;
......@@ -3300,8 +3492,9 @@ static int sort_get_next_record(MARIA_SORT_PARAM *sort_param)
sort_param->max_pos=(sort_param->pos=block_info.filepos+
block_info.rec_len);
info->packed_length=block_info.rec_len;
if (param->calc_checksum)
param->glob_crc+= info->checksum;
if (sort_param->calc_checksum)
param->glob_crc+= (info->checksum=
_ma_checksum(info, sort_param->record));
DBUG_RETURN(0);
}
}
......@@ -3309,7 +3502,20 @@ static int sort_get_next_record(MARIA_SORT_PARAM *sort_param)
}
/* Write record to new file */
/*
Write record to new file.
SYNOPSIS
_ma_sort_write_record()
sort_param Sort parameters.
NOTE
This is only called by a master thread if parallel repair is used.
RETURN
0 OK
1 Error
*/
int _ma_sort_write_record(MARIA_SORT_PARAM *sort_param)
{
......@@ -3358,6 +3564,7 @@ int _ma_sort_write_record(MARIA_SORT_PARAM *sort_param)
}
from=sort_info->buff+ALIGN_SIZE(MARIA_MAX_DYN_BLOCK_HEADER);
}
/* We can use info->checksum here as only one thread calls this. */
info->checksum=_ma_checksum(info,sort_param->record);
reclength= _ma_rec_pack(info,from,sort_param->record);
flag=0;
......@@ -3767,7 +3974,7 @@ static int sort_delete_record(MARIA_SORT_PARAM *sort_param)
DBUG_RETURN(1);
}
}
if (param->calc_checksum)
if (sort_param->calc_checksum)
param->glob_crc-=(*info->s->calc_checksum)(info, sort_param->record);
}
error=flush_io_cache(&info->rec_cache) || (*info->s->delete_record)(info);
......
......@@ -211,7 +211,10 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags)
((open_flags & HA_OPEN_ABORT_IF_CRASHED) &&
(my_disable_locking && share->state.open_count))))
{
DBUG_PRINT("error",("Table is marked as crashed"));
DBUG_PRINT("error",("Table is marked as crashed. open_flags: %u "
"changed: %u open_count: %u !locking: %d",
open_flags, share->state.changed,
share->state.open_count, my_disable_locking));
my_errno=((share->state.changed & STATE_CRASHED_ON_REPAIR) ?
HA_ERR_CRASHED_ON_REPAIR : HA_ERR_CRASHED_ON_USAGE);
goto err;
......
......@@ -104,7 +104,8 @@ static void fill_buffer(MARIA_BIT_BUFF *bit_buff);
static uint max_bit(uint value);
static uint read_pack_length(uint version, const uchar *buf, ulong *length);
#ifdef HAVE_MMAP
static uchar *_ma_mempack_get_block_info(MARIA_HA *maria,MARIA_BLOCK_INFO *info,
static uchar *_ma_mempack_get_block_info(MARIA_HA *maria, MARIA_BIT_BUFF *bit_buff,
MARIA_BLOCK_INFO *info, byte **rec_buff_p,
uchar *header);
#endif
......@@ -450,13 +451,15 @@ int _ma_read_pack_record(MARIA_HA *info, my_off_t filepos, byte *buf)
DBUG_RETURN(-1); /* _search() didn't find record */
file=info->dfile;
if (_ma_pack_get_block_info(info, &block_info, file, filepos))
if (_ma_pack_get_block_info(info, &info->bit_buff, &block_info,
&info->rec_buff, file, filepos))
goto err;
if (my_read(file,(byte*) info->rec_buff + block_info.offset ,
block_info.rec_len - block_info.offset, MYF(MY_NABP)))
goto panic;
info->update|= HA_STATE_AKTIV;
DBUG_RETURN(_ma_pack_rec_unpack(info,buf,info->rec_buff,block_info.rec_len));
DBUG_RETURN(_ma_pack_rec_unpack(info,&info->bit_buff, buf,
info->rec_buff, block_info.rec_len));
panic:
my_errno=HA_ERR_WRONG_IN_RECORD;
err:
......@@ -465,8 +468,8 @@ err:
int _ma_pack_rec_unpack(register MARIA_HA *info, register byte *to, byte *from,
ulong reclength)
int _ma_pack_rec_unpack(register MARIA_HA *info, MARIA_BIT_BUFF *bit_buff,
register byte *to, byte *from, ulong reclength)
{
byte *end_field;
reg3 MARIA_COLUMNDEF *end;
......@@ -474,18 +477,18 @@ int _ma_pack_rec_unpack(register MARIA_HA *info, register byte *to, byte *from,
MARIA_SHARE *share=info->s;
DBUG_ENTER("_ma_pack_rec_unpack");
init_bit_buffer(&info->bit_buff, (uchar*) from,reclength);
init_bit_buffer(bit_buff, (uchar*) from, reclength);
for (current_field=share->rec, end=current_field+share->base.fields ;
current_field < end ;
current_field++,to=end_field)
{
end_field=to+current_field->length;
(*current_field->unpack)(current_field,&info->bit_buff,(uchar*) to,
(*current_field->unpack)(current_field, bit_buff, (uchar*) to,
(uchar*) end_field);
}
if (! info->bit_buff.error &&
info->bit_buff.pos - info->bit_buff.bits/8 == info->bit_buff.end)
if (!bit_buff->error &&
bit_buff->pos - bit_buff->bits / 8 == bit_buff->end)
DBUG_RETURN(0);
info->update&= ~HA_STATE_AKTIV;
DBUG_RETURN(my_errno=HA_ERR_WRONG_IN_RECORD);
......@@ -1016,13 +1019,16 @@ int _ma_read_rnd_pack_record(MARIA_HA *info, byte *buf,
if (info->opt_flag & READ_CACHE_USED)
{
if (_ma_read_cache(&info->rec_cache,(byte*) block_info.header,filepos,
share->pack.ref_length, skip_deleted_blocks))
if (_ma_read_cache(&info->rec_cache, (byte*) block_info.header,
filepos, share->pack.ref_length,
skip_deleted_blocks ? READING_NEXT : 0))
goto err;
b_type= _ma_pack_get_block_info(info,&block_info,-1, filepos);
b_type= _ma_pack_get_block_info(info, &info->bit_buff, &block_info,
&info->rec_buff, -1, filepos);
}
else
b_type= _ma_pack_get_block_info(info,&block_info,info->dfile,filepos);
b_type= _ma_pack_get_block_info(info, &info->bit_buff, &block_info,
&info->rec_buff, info->dfile, filepos);
if (b_type)
goto err; /* Error code is already set */
#ifndef DBUG_OFF
......@@ -1035,9 +1041,9 @@ int _ma_read_rnd_pack_record(MARIA_HA *info, byte *buf,
if (info->opt_flag & READ_CACHE_USED)
{
if (_ma_read_cache(&info->rec_cache,(byte*) info->rec_buff,
if (_ma_read_cache(&info->rec_cache, (byte*) info->rec_buff,
block_info.filepos, block_info.rec_len,
skip_deleted_blocks))
skip_deleted_blocks ? READING_NEXT : 0))
goto err;
}
else
......@@ -1052,8 +1058,8 @@ int _ma_read_rnd_pack_record(MARIA_HA *info, byte *buf,
info->nextpos=block_info.filepos+block_info.rec_len;
info->update|= HA_STATE_AKTIV | HA_STATE_KEY_CHANGED;
DBUG_RETURN (_ma_pack_rec_unpack(info,buf,info->rec_buff,
block_info.rec_len));
DBUG_RETURN (_ma_pack_rec_unpack(info, &info->bit_buff, buf,
info->rec_buff, block_info.rec_len));
err:
DBUG_RETURN(my_errno);
}
......@@ -1061,8 +1067,9 @@ int _ma_read_rnd_pack_record(MARIA_HA *info, byte *buf,
/* Read and process header from a huff-record-file */
uint _ma_pack_get_block_info(MARIA_HA *maria, MARIA_BLOCK_INFO *info, File file,
my_off_t filepos)
uint _ma_pack_get_block_info(MARIA_HA *maria, MARIA_BIT_BUFF *bit_buff,
MARIA_BLOCK_INFO *info, byte **rec_buff_p,
File file, my_off_t filepos)
{
uchar *header=info->header;
uint head_length,ref_length;
......@@ -1087,17 +1094,17 @@ uint _ma_pack_get_block_info(MARIA_HA *maria, MARIA_BLOCK_INFO *info, File file,
head_length+= read_pack_length((uint) maria->s->pack.version,
header + head_length, &info->blob_len);
if (!(_ma_alloc_rec_buff(maria,info->rec_len + info->blob_len,
&maria->rec_buff)))
rec_buff_p)))
return BLOCK_FATAL_ERROR; /* not enough memory */
maria->bit_buff.blob_pos=(uchar*) maria->rec_buff+info->rec_len;
maria->bit_buff.blob_end= maria->bit_buff.blob_pos+info->blob_len;
bit_buff->blob_pos= (uchar*) *rec_buff_p + info->rec_len;
bit_buff->blob_end= bit_buff->blob_pos + info->blob_len;
maria->blob_length=info->blob_len;
}
info->filepos=filepos+head_length;
if (file > 0)
{
info->offset=min(info->rec_len, ref_length - head_length);
memcpy(maria->rec_buff, header+head_length, info->offset);
memcpy(*rec_buff_p, header + head_length, info->offset);
}
return 0;
}
......@@ -1215,7 +1222,8 @@ void _ma_unmap_file(MARIA_HA *info)
}
static uchar *_ma_mempack_get_block_info(MARIA_HA *maria,MARIA_BLOCK_INFO *info,
static uchar *_ma_mempack_get_block_info(MARIA_HA *maria, MARIA_BIT_BUFF *bit_buff,
MARIA_BLOCK_INFO *info, byte **rec_buff_p,
uchar *header)
{
header+= read_pack_length((uint) maria->s->pack.version, header,
......@@ -1226,10 +1234,10 @@ static uchar *_ma_mempack_get_block_info(MARIA_HA *maria,MARIA_BLOCK_INFO *info,
&info->blob_len);
/* _ma_alloc_rec_buff sets my_errno on error */
if (!(_ma_alloc_rec_buff(maria, info->blob_len,
&maria->rec_buff)))
rec_buff_p)))
return 0; /* not enough memory */
maria->bit_buff.blob_pos=(uchar*) maria->rec_buff;
maria->bit_buff.blob_end= (uchar*) maria->rec_buff + info->blob_len;
bit_buff->blob_pos= (uchar*) *rec_buff_p;
bit_buff->blob_end= (uchar*) *rec_buff_p + info->blob_len;
}
return header;
}
......@@ -1245,11 +1253,13 @@ static int _ma_read_mempack_record(MARIA_HA *info, my_off_t filepos, byte *buf)
if (filepos == HA_OFFSET_ERROR)
DBUG_RETURN(-1); /* _search() didn't find record */
if (!(pos= (byte*) _ma_mempack_get_block_info(info,&block_info,
if (!(pos= (byte*) _ma_mempack_get_block_info(info, &info->bit_buff,
&block_info, &info->rec_buff,
(uchar*) share->file_map+
filepos)))
DBUG_RETURN(-1);
DBUG_RETURN(_ma_pack_rec_unpack(info, buf, pos, block_info.rec_len));
DBUG_RETURN(_ma_pack_rec_unpack(info, &info->bit_buff, buf,
pos, block_info.rec_len));
}
......@@ -1269,7 +1279,8 @@ static int _ma_read_rnd_mempack_record(MARIA_HA *info, byte *buf,
my_errno=HA_ERR_END_OF_FILE;
goto err;
}
if (!(pos= (byte*) _ma_mempack_get_block_info(info,&block_info,
if (!(pos= (byte*) _ma_mempack_get_block_info(info, &info->bit_buff,
&block_info, &info->rec_buff,
(uchar*)
(start=share->file_map+
filepos))))
......@@ -1286,7 +1297,8 @@ static int _ma_read_rnd_mempack_record(MARIA_HA *info, byte *buf,
info->nextpos=filepos+(uint) (pos-start)+block_info.rec_len;
info->update|= HA_STATE_AKTIV | HA_STATE_KEY_CHANGED;
DBUG_RETURN (_ma_pack_rec_unpack(info,buf,pos, block_info.rec_len));
DBUG_RETURN (_ma_pack_rec_unpack(info, &info->bit_buff, buf,
pos, block_info.rec_len));
err:
DBUG_RETURN(my_errno);
}
......
......@@ -71,6 +71,21 @@ ha_rows maria_records_in_range(MARIA_HA *info, int inx, key_range *min_key,
uchar * key_buff;
uint start_key_len;
/*
The problem is that the optimizer doesn't support
RTree keys properly at the moment.
Hope this will be fixed some day.
But now NULL in the min_key means that we
didn't make the task for the RTree key
and expect BTree functionality from it.
As it's not able to handle such request
we return the error.
*/
if (!min_key)
{
res= HA_POS_ERROR;
break;
}
key_buff= info->lastkey+info->s->base.max_key_length;
start_key_len= _ma_pack_key(info,inx, key_buff,
(uchar*) min_key->key, min_key->length,
......
......@@ -20,6 +20,11 @@
*/
#include "ma_fulltext.h"
#if defined(MSDOS) || defined(__WIN__)
#include <fcntl.h>
#else
#include <stddef.h>
#endif
#include <queues.h>
/* static variables */
......@@ -143,7 +148,8 @@ int _ma_create_index_by_sort(MARIA_SORT_PARAM *info,my_bool no_messages,
skr=maxbuffer;
if (memavl < sizeof(BUFFPEK)*(uint) maxbuffer ||
(keys=(memavl-sizeof(BUFFPEK)*(uint) maxbuffer)/
(sort_length+sizeof(char*))) <= 1)
(sort_length+sizeof(char*))) <= 1 ||
keys < (uint) maxbuffer)
{
_ma_check_print_error(info->sort_info->param,
"sort_buffer_size is to small");
......@@ -304,7 +310,7 @@ static ha_rows NEAR_F find_all_keys(MARIA_SORT_PARAM *info, uint keys,
pthread_handler_t _ma_thr_find_all_keys(void *arg)
{
MARIA_SORT_PARAM *info= (MARIA_SORT_PARAM*) arg;
MARIA_SORT_PARAM *sort_param= (MARIA_SORT_PARAM*) arg;
int error;
uint memavl,old_memavl,keys,sort_length;
uint idx, maxbuffer;
......@@ -316,31 +322,35 @@ pthread_handler_t _ma_thr_find_all_keys(void *arg)
if (my_thread_init())
goto err;
if (info->sort_info->got_error)
{ /* Add extra block since DBUG_ENTER declare variables */
DBUG_ENTER("_ma_thr_find_all_keys");
DBUG_PRINT("enter", ("master: %d", sort_param->master));
if (sort_param->sort_info->got_error)
goto err;
if (info->keyinfo->flag && HA_VAR_LENGTH_KEY)
if (sort_param->keyinfo->flag & HA_VAR_LENGTH_KEY)
{
info->write_keys=write_keys_varlen;
info->read_to_buffer=read_to_buffer_varlen;
info->write_key=write_merge_key_varlen;
sort_param->write_keys= write_keys_varlen;
sort_param->read_to_buffer= read_to_buffer_varlen;
sort_param->write_key= write_merge_key_varlen;
}
else
{
info->write_keys=write_keys;
info->read_to_buffer=read_to_buffer;
info->write_key=write_merge_key;
sort_param->write_keys= write_keys;
sort_param->read_to_buffer= read_to_buffer;
sort_param->write_key= write_merge_key;
}
my_b_clear(&info->tempfile);
my_b_clear(&info->tempfile_for_exceptions);
bzero((char*) &info->buffpek,sizeof(info->buffpek));
bzero((char*) &info->unique, sizeof(info->unique));
my_b_clear(&sort_param->tempfile);
my_b_clear(&sort_param->tempfile_for_exceptions);
bzero((char*) &sort_param->buffpek,sizeof(sort_param->buffpek));
bzero((char*) &sort_param->unique, sizeof(sort_param->unique));
sort_keys= (uchar **) NULL;
memavl=max(info->sortbuff_size, MIN_SORT_MEMORY);
idx= info->sort_info->max_records;
sort_length= info->key_length;
memavl= max(sort_param->sortbuff_size, MIN_SORT_MEMORY);
idx= sort_param->sort_info->max_records;
sort_length= sort_param->key_length;
maxbuffer= 1;
while (memavl >= MIN_SORT_MEMORY)
......@@ -353,23 +363,25 @@ pthread_handler_t _ma_thr_find_all_keys(void *arg)
uint skr;
do
{
skr=maxbuffer;
skr= maxbuffer;
if (memavl < sizeof(BUFFPEK)*maxbuffer ||
(keys=(memavl-sizeof(BUFFPEK)*maxbuffer)/
(sort_length+sizeof(char*))) <= 1)
(sort_length+sizeof(char*))) <= 1 ||
keys < (uint) maxbuffer)
{
_ma_check_print_error(info->sort_info->param,
_ma_check_print_error(sort_param->sort_info->param,
"sort_buffer_size is to small");
goto err;
}
}
while ((maxbuffer= (int) (idx/(keys-1)+1)) != skr);
}
if ((sort_keys=(uchar **)my_malloc(keys*(sort_length+sizeof(char*))+
((info->keyinfo->flag & HA_FULLTEXT) ?
if ((sort_keys= (uchar **)
my_malloc(keys*(sort_length+sizeof(char*))+
((sort_param->keyinfo->flag & HA_FULLTEXT) ?
HA_FT_MAXBYTELEN : 0), MYF(0))))
{
if (my_init_dynamic_array(&info->buffpek, sizeof(BUFFPEK),
if (my_init_dynamic_array(&sort_param->buffpek, sizeof(BUFFPEK),
maxbuffer, maxbuffer/2))
{
my_free((gptr) sort_keys,MYF(0));
......@@ -378,76 +390,95 @@ pthread_handler_t _ma_thr_find_all_keys(void *arg)
else
break;
}
old_memavl=memavl;
if ((memavl=memavl/4*3) < MIN_SORT_MEMORY && old_memavl > MIN_SORT_MEMORY)
memavl=MIN_SORT_MEMORY;
old_memavl= memavl;
if ((memavl= memavl/4*3) < MIN_SORT_MEMORY &&
old_memavl > MIN_SORT_MEMORY)
memavl= MIN_SORT_MEMORY;
}
if (memavl < MIN_SORT_MEMORY)
{
_ma_check_print_error(info->sort_info->param,"Sort buffer to small"); /* purecov: tested */
_ma_check_print_error(sort_param->sort_info->param,
"Sort buffer too small");
goto err; /* purecov: tested */
}
if (info->sort_info->param->testflag & T_VERBOSE)
printf("Key %d - Allocating buffer for %d keys\n",info->key+1,keys);
info->sort_keys=sort_keys;
if (sort_param->sort_info->param->testflag & T_VERBOSE)
printf("Key %d - Allocating buffer for %d keys\n",
sort_param->key+1, keys);
sort_param->sort_keys= sort_keys;
idx=error=0;
sort_keys[0]=(uchar*) (sort_keys+keys);
idx= error= 0;
sort_keys[0]= (uchar*) (sort_keys+keys);
while (!(error=info->sort_info->got_error) &&
!(error=(*info->key_read)(info,sort_keys[idx])))
DBUG_PRINT("info", ("reading keys"));
while (!(error= sort_param->sort_info->got_error) &&
!(error= (*sort_param->key_read)(sort_param, sort_keys[idx])))
{
if (info->real_key_length > info->key_length)
if (sort_param->real_key_length > sort_param->key_length)
{
if (write_key(info,sort_keys[idx], &info->tempfile_for_exceptions))
if (write_key(sort_param,sort_keys[idx],
&sort_param->tempfile_for_exceptions))
goto err;
continue;
}
if (++idx == keys)
{
if (info->write_keys(info,sort_keys,idx-1,
(BUFFPEK *)alloc_dynamic(&info->buffpek),
&info->tempfile))
if (sort_param->write_keys(sort_param, sort_keys, idx - 1,
(BUFFPEK *)alloc_dynamic(&sort_param->buffpek),
&sort_param->tempfile))
goto err;
sort_keys[0]=(uchar*) (sort_keys+keys);
memcpy(sort_keys[0],sort_keys[idx-1],(size_t) info->key_length);
idx=1;
sort_keys[0]= (uchar*) (sort_keys+keys);
memcpy(sort_keys[0], sort_keys[idx - 1], (size_t) sort_param->key_length);
idx= 1;
}
sort_keys[idx]=sort_keys[idx-1]+info->key_length;
sort_keys[idx]=sort_keys[idx - 1] + sort_param->key_length;
}
if (error > 0)
goto err;
if (info->buffpek.elements)
if (sort_param->buffpek.elements)
{
if (info->write_keys(info,sort_keys, idx,
(BUFFPEK *) alloc_dynamic(&info->buffpek), &info->tempfile))
if (sort_param->write_keys(sort_param,sort_keys, idx,
(BUFFPEK *) alloc_dynamic(&sort_param->buffpek), &sort_param->tempfile))
goto err;
info->keys=(info->buffpek.elements-1)*(keys-1)+idx;
sort_param->keys= (sort_param->buffpek.elements - 1) * (keys - 1) + idx;
}
else
info->keys=idx;
sort_param->keys= idx;
info->sort_keys_length=keys;
sort_param->sort_keys_length= keys;
goto ok;
err:
info->sort_info->got_error=1; /* no need to protect this with a mutex */
DBUG_PRINT("error", ("got some error"));
sort_param->sort_info->got_error= 1; /* no need to protect with a mutex */
if (sort_keys)
my_free((gptr) sort_keys,MYF(0));
info->sort_keys=0;
delete_dynamic(& info->buffpek);
close_cached_file(&info->tempfile);
close_cached_file(&info->tempfile_for_exceptions);
sort_param->sort_keys=0;
delete_dynamic(& sort_param->buffpek);
close_cached_file(&sort_param->tempfile);
close_cached_file(&sort_param->tempfile_for_exceptions);
ok:
free_root(&info->wordroot, MYF(0));
remove_io_thread(&info->read_cache);
pthread_mutex_lock(&info->sort_info->mutex);
info->sort_info->threads_running--;
pthread_cond_signal(&info->sort_info->cond);
pthread_mutex_unlock(&info->sort_info->mutex);
free_root(&sort_param->wordroot, MYF(0));
/*
Detach from the share if the writer is involved. Avoid others to
be blocked. This includes a flush of the write buffer. This will
also indicate EOF to the readers.
*/
if (sort_param->sort_info->info->rec_cache.share)
remove_io_thread(&sort_param->sort_info->info->rec_cache);
/* Readers detach from the share if any. Avoid others to be blocked. */
if (sort_param->read_cache.share)
remove_io_thread(&sort_param->read_cache);
pthread_mutex_lock(&sort_param->sort_info->mutex);
if (!--sort_param->sort_info->threads_running)
pthread_cond_signal(&sort_param->sort_info->cond);
pthread_mutex_unlock(&sort_param->sort_info->mutex);
DBUG_PRINT("exit", ("======== ending thread ========"));
}
my_thread_end();
return NULL;
}
......@@ -465,6 +496,7 @@ int _ma_thr_write_keys(MARIA_SORT_PARAM *sort_param)
MARIA_SHARE *share=info->s;
MARIA_SORT_PARAM *sinfo;
byte *mergebuf=0;
DBUG_ENTER("_ma_thr_write_keys");
LINT_INIT(length);
for (i= 0, sinfo= sort_param ;
......@@ -474,6 +506,8 @@ int _ma_thr_write_keys(MARIA_SORT_PARAM *sort_param)
if (!sinfo->sort_keys)
{
got_error=1;
my_free(_ma_get_rec_buff_ptr(info, sinfo->rec_buff),
MYF(MY_ALLOW_ZERO_PTR));
continue;
}
if (!got_error)
......@@ -513,7 +547,7 @@ int _ma_thr_write_keys(MARIA_SORT_PARAM *sort_param)
{
if (got_error)
continue;
if (sinfo->keyinfo->flag && HA_VAR_LENGTH_KEY)
if (sinfo->keyinfo->flag & HA_VAR_LENGTH_KEY)
{
sinfo->write_keys=write_keys_varlen;
sinfo->read_to_buffer=read_to_buffer_varlen;
......@@ -602,7 +636,7 @@ int _ma_thr_write_keys(MARIA_SORT_PARAM *sort_param)
}
}
my_free((gptr) mergebuf,MYF(MY_ALLOW_ZERO_PTR));
return got_error;
DBUG_RETURN(got_error);
}
#endif /* THREAD */
......
......@@ -69,6 +69,7 @@ typedef struct st_maria_state_info
ulong update_count; /* Updated for each write lock */
ulong status;
ulong *rec_per_key_part;
ha_checksum checksum; /* Table checksum */
my_off_t *key_root; /* Start of key trees */
my_off_t *key_del; /* delete links for trees */
my_off_t rec_per_key_rows; /* Rows when calculating rec_per_key */
......@@ -179,6 +180,7 @@ typedef struct st_maria_share
*/
MARIA_DECODE_TREE *decode_trees;
uint16 *decode_tables;
/* Function to use for a row checksum. */
int(*read_record) (struct st_maria_info *, my_off_t, byte *);
int(*write_record) (struct st_maria_info *, const byte *);
int(*update_record) (struct st_maria_info *, my_off_t, const byte *);
......@@ -230,16 +232,6 @@ typedef struct st_maria_share
} MARIA_SHARE;
typedef uint maria_bit_type;
typedef struct st_maria_bit_buff
{ /* Used for packing of record */
maria_bit_type current_byte;
uint bits;
uchar *pos, *end, *blob_pos, *blob_end;
uint error;
} MARIA_BIT_BUFF;
struct st_maria_info
{
MARIA_SHARE *s; /* Shared between open:s */
......@@ -273,7 +265,7 @@ struct st_maria_info
my_off_t last_keypage; /* Last key page read */
my_off_t last_search_keypage; /* Last keypage when searching */
my_off_t dupp_key_pos;
ha_checksum checksum;
ha_checksum checksum; /* Temp storage for row checksum */
/*
QQ: the folloing two xxx_length fields should be removed,
as they are not compatible with parallel repair
......@@ -354,8 +346,15 @@ struct st_maria_info
#define maria_putint(x,y,nod) { uint16 boh=(nod ? (uint16) 32768 : 0) + (uint16) (y);\
mi_int2store(x,boh); }
#define _ma_test_if_nod(x) (x[0] & 128 ? info->s->base.key_reflength : 0)
#define maria_mark_crashed(x) (x)->s->state.changed|=STATE_CRASHED
#define maria_mark_crashed_on_repair(x) { (x)->s->state.changed|=STATE_CRASHED|STATE_CRASHED_ON_REPAIR ; (x)->update|= HA_STATE_CHANGED; }
#define maria_mark_crashed(x) do{(x)->s->state.changed|= STATE_CRASHED; \
DBUG_PRINT("error", ("Marked table crashed")); \
}while(0)
#define maria_mark_crashed_on_repair(x) do{(x)->s->state.changed|= \
STATE_CRASHED|STATE_CRASHED_ON_REPAIR; \
(x)->update|= HA_STATE_CHANGED; \
DBUG_PRINT("error", \
("Marked table crashed")); \
}while(0)
#define maria_is_crashed(x) ((x)->s->state.changed & STATE_CRASHED)
#define maria_is_crashed_on_repair(x) ((x)->s->state.changed & STATE_CRASHED_ON_REPAIR)
#define maria_print_error(SHARE, ERRNO) \
......@@ -608,8 +607,8 @@ extern my_bool _ma_read_pack_info(MARIA_HA *info, pbool fix_keys);
extern int _ma_read_pack_record(MARIA_HA *info, my_off_t filepos,
byte *buf);
extern int _ma_read_rnd_pack_record(MARIA_HA *, byte *, my_off_t, my_bool);
extern int _ma_pack_rec_unpack(MARIA_HA *info, byte *to, byte *from,
ulong reclength);
extern int _ma_pack_rec_unpack(MARIA_HA *info, MARIA_BIT_BUFF *bit_buff,
byte *to, byte *from, ulong reclength);
extern ulonglong _ma_safe_mul(ulonglong a, ulonglong b);
extern int _ma_ft_update(MARIA_HA *info, uint keynr, byte *keybuf,
const byte *oldrec, const byte *newrec,
......@@ -663,8 +662,9 @@ typedef struct st_maria_block_info
extern uint _ma_get_block_info(MARIA_BLOCK_INFO *, File, my_off_t);
extern uint _ma_rec_pack(MARIA_HA *info, byte *to, const byte *from);
extern uint _ma_pack_get_block_info(MARIA_HA *, MARIA_BLOCK_INFO *, File,
my_off_t);
extern uint _ma_pack_get_block_info(MARIA_HA *mari, MARIA_BIT_BUFF *bit_buff,
MARIA_BLOCK_INFO *info, byte **rec_buff_p,
File file, my_off_t filepos);
extern void _ma_store_blob_length(byte *pos, uint pack_length, uint length);
extern void _ma_report_error(int errcode, const char *file_name);
extern my_bool _ma_memmap_file(MARIA_HA *info);
......
......@@ -1967,7 +1967,7 @@ static char *bindigits(ulonglong value, uint bits)
DBUG_ASSERT(idx < sizeof(digits));
while (idx)
*(ptr++)= '0' + ((value >> (--idx)) & 1);
*(ptr++)= '0' + ((char) (value >> (--idx)) & (char) 1);
*ptr= '\0';
return digits;
}
......@@ -1997,7 +1997,7 @@ static char *hexdigits(ulonglong value)
DBUG_ASSERT(idx < sizeof(digits));
while (idx)
{
if ((*(ptr++)= '0' + ((value >> (4 * (--idx))) & 0xf)) > '9')
if ((*(ptr++)= '0' + ((char) (value >> (4 * (--idx))) & (char) 0xf)) > '9')
*(ptr - 1)+= 'a' - '9' - 1;
}
*ptr= '\0';
......@@ -2286,7 +2286,7 @@ static my_off_t write_huff_tree(HUFF_TREE *huff_tree, uint trees)
errors++;
break;
}
idx+= code & 1;
idx+= (uint) code & 1;
if (idx >= length)
{
VOID(fflush(stdout));
......
......@@ -5,3 +5,4 @@ MYSQL_PLUGIN_ACTIONS(maria, [AC_CONFIG_FILES(storage/maria/unittest/Makefile)])
MYSQL_PLUGIN_STATIC(maria, [libmaria.a])
# Maria will probably go first into max builds, not all builds,
# so we don't declare it mandatory.
MYSQL_PLUGIN_DEPENDS_ON_MYSQL_INTERNALS(maria, [ha_maria.cc])
......@@ -84,6 +84,7 @@ typedef struct st_mi_state_info
time_t recover_time; /* Time for last recover */
time_t check_time; /* Time for last check */
uint sortkey; /* sorted by this key (not used) */
uint open_count;
uint8 changed; /* Changed since myisamchk */
/* the following isn't saved on disk */
......@@ -224,16 +225,6 @@ typedef struct st_mi_isam_share
} MYISAM_SHARE;
typedef uint mi_bit_type;
typedef struct st_mi_bit_buff
{ /* Used for packing of record */
mi_bit_type current_byte;
uint bits;
uchar *pos, *end, *blob_pos, *blob_end;
uint error;
} MI_BIT_BUFF;
struct st_myisam_info
{
MYISAM_SHARE *s; /* Shared between open:s */
......@@ -309,6 +300,7 @@ struct st_myisam_info
uchar *rtree_recursion_state; /* For RTREE */
int rtree_recursion_depth;
};
#define USE_WHOLE_KEY HA_MAX_KEY_BUFF*2 /* Use whole key in _mi_search() */
#define F_EXTRA_LCK -1
/* bits in opt_flag */
......
......@@ -1105,16 +1105,16 @@ static int get_statistic(PACK_MRG_INFO *mrg,HUFF_COUNTS *huff_counts)
my_off_t total_count;
char llbuf[32];
DBUG_PRINT("info", ("column: %3u", count - huff_counts + 1));
DBUG_PRINT("info", ("column: %3lu", count - huff_counts + 1));
if (verbose >= 2)
VOID(printf("column: %3u\n", count - huff_counts + 1));
VOID(printf("column: %3lu\n", count - huff_counts + 1));
if (count->tree_buff)
{
DBUG_PRINT("info", ("number of distinct values: %u",
DBUG_PRINT("info", ("number of distinct values: %lu",
(count->tree_pos - count->tree_buff) /
count->field_length));
if (verbose >= 2)
VOID(printf("number of distinct values: %u\n",
VOID(printf("number of distinct values: %lu\n",
(count->tree_pos - count->tree_buff) /
count->field_length));
}
......
......@@ -332,14 +332,6 @@ int main(int argc, char **argv __attribute__((unused)))
exit(1);
}
#ifndef pthread_attr_setstacksize /* void return value */
if ((error= pthread_attr_setstacksize(&thr_attr, 65536L)))
{
fprintf(stderr,"Got error: %d from pthread_attr_setstacksize (errno: %d)\n",
error,errno);
exit(1);
}
#endif
#ifdef HAVE_THR_SETCONCURRENCY
VOID(thr_setconcurrency(2));
#endif
......
......@@ -511,14 +511,6 @@ int main(int argc, char **argv __attribute__((unused)))
exit(1);
}
#ifndef pthread_attr_setstacksize /* void return value */
if ((error= pthread_attr_setstacksize(&thr_attr, 65536L)))
{
fprintf(stderr,"Got error: %d from pthread_attr_setstacksize (errno: %d)\n",
error,errno);
exit(1);
}
#endif
#ifdef HAVE_THR_SETCONCURRENCY
VOID(thr_setconcurrency(2));
#endif
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment