Commit 5c83ae3f authored by serg@serg.mysql.com's avatar serg@serg.mysql.com

multithreaded repair-by-sort code

parallel read access to IO_CACHE
parent 28749171
...@@ -36,7 +36,7 @@ extern int NEAR my_errno; /* Last error in mysys */ ...@@ -36,7 +36,7 @@ extern int NEAR my_errno; /* Last error in mysys */
#include <m_ctype.h> /* for CHARSET_INFO */ #include <m_ctype.h> /* for CHARSET_INFO */
#endif #endif
#include <stdarg.h> #include <stdarg.h>
#define MYSYS_PROGRAM_USES_CURSES() { error_handler_hook = my_message_curses; mysys_uses_curses=1; } #define MYSYS_PROGRAM_USES_CURSES() { error_handler_hook = my_message_curses; mysys_uses_curses=1; }
#define MYSYS_PROGRAM_DONT_USE_CURSES() { error_handler_hook = my_message_no_curses; mysys_uses_curses=0;} #define MYSYS_PROGRAM_DONT_USE_CURSES() { error_handler_hook = my_message_no_curses; mysys_uses_curses=0;}
...@@ -278,7 +278,7 @@ extern struct my_file_info ...@@ -278,7 +278,7 @@ extern struct my_file_info
{ {
my_string name; my_string name;
enum file_type type; enum file_type type;
#if defined(THREAD) && !defined(HAVE_PREAD) #if defined(THREAD) && !defined(HAVE_PREAD)
pthread_mutex_t mutex; pthread_mutex_t mutex;
#endif #endif
} my_file_info[MY_NFILE]; } my_file_info[MY_NFILE];
...@@ -299,6 +299,45 @@ typedef struct st_dynamic_string { ...@@ -299,6 +299,45 @@ typedef struct st_dynamic_string {
struct st_io_cache; struct st_io_cache;
typedef int (*IO_CACHE_CALLBACK)(struct st_io_cache*); typedef int (*IO_CACHE_CALLBACK)(struct st_io_cache*);
#ifdef THREAD
typedef struct st_io_cache_share
{
/* to sync on reads into buffer */
pthread_mutex_t mutex;
pthread_cond_t cond;
int count;
/* actual IO_CACHE that filled the buffer */
struct st_io_cache *active;
/* the following will go implemented whenever the need arises */
#ifdef NOT_IMPLEMENTED
/* whether the structure should be free'd */
my_bool alloced;
#endif
} IO_CACHE_SHARE;
#define lock_io_cache(info) \
( \
(errno=pthread_mutex_lock(&((info)->share->mutex))) ? -1 : ( \
(info)->share->count ? ( \
--((info)->share->count), \
pthread_cond_wait(&((info)->share->cond), \
&((info)->share->mutex)), \
(++((info)->share->count) ? \
pthread_mutex_unlock(&((info)->share->mutex)) : 0)) \
: 1 ) \
)
#define unlock_io_cache(info) \
( \
pthread_cond_broadcast(&((info)->share->cond)), \
pthread_mutex_unlock (&((info)->share->mutex)) \
)
/* -- to catch errors
#else
#define lock_io_cache(info)
#define unlock_io_cache(info)
*/
#endif
typedef struct st_io_cache /* Used when cacheing files */ typedef struct st_io_cache /* Used when cacheing files */
{ {
...@@ -331,10 +370,16 @@ typedef struct st_io_cache /* Used when cacheing files */ ...@@ -331,10 +370,16 @@ typedef struct st_io_cache /* Used when cacheing files */
WRITE_CACHE, and &read_pos and &read_end respectively otherwise WRITE_CACHE, and &read_pos and &read_end respectively otherwise
*/ */
byte **current_pos, **current_end; byte **current_pos, **current_end;
/* The lock is for append buffer used in SEQ_READ_APPEND cache */
#ifdef THREAD #ifdef THREAD
/* The lock is for append buffer used in SEQ_READ_APPEND cache
need mutex copying from append buffer to read buffer. */
pthread_mutex_t append_buffer_lock; pthread_mutex_t append_buffer_lock;
/* need mutex copying from append buffer to read buffer */ /* The following is used when several threads are reading the
same file in parallel. They are synchronized on disk
accesses reading the cached part of the file asynchronously.
It should be set to NULL to disable the feature. Only
READ_CACHE mode is supported. */
IO_CACHE_SHARE *share;
#endif #endif
/* a caller will use my_b_read() macro to read from the cache /* a caller will use my_b_read() macro to read from the cache
if the data is already in cache, it will be simply copied with if the data is already in cache, it will be simply copied with
...@@ -626,6 +671,12 @@ extern my_bool reinit_io_cache(IO_CACHE *info,enum cache_type type, ...@@ -626,6 +671,12 @@ extern my_bool reinit_io_cache(IO_CACHE *info,enum cache_type type,
my_off_t seek_offset,pbool use_async_io, my_off_t seek_offset,pbool use_async_io,
pbool clear_cache); pbool clear_cache);
extern int _my_b_read(IO_CACHE *info,byte *Buffer,uint Count); extern int _my_b_read(IO_CACHE *info,byte *Buffer,uint Count);
#ifdef THREAD
extern int _my_b_read_r(IO_CACHE *info,byte *Buffer,uint Count);
extern int init_io_cache_share(IO_CACHE *info,
IO_CACHE_SHARE *s, uint num_threads);
extern int remove_io_thread(IO_CACHE *info);
#endif
extern int _my_b_seq_read(IO_CACHE *info,byte *Buffer,uint Count); extern int _my_b_seq_read(IO_CACHE *info,byte *Buffer,uint Count);
extern int _my_b_net_read(IO_CACHE *info,byte *Buffer,uint Count); extern int _my_b_net_read(IO_CACHE *info,byte *Buffer,uint Count);
extern int _my_b_get(IO_CACHE *info); extern int _my_b_get(IO_CACHE *info);
......
...@@ -314,24 +314,6 @@ typedef struct st_sort_key_blocks { /* Used when sorting */ ...@@ -314,24 +314,6 @@ typedef struct st_sort_key_blocks { /* Used when sorting */
int inited; int inited;
} SORT_KEY_BLOCKS; } SORT_KEY_BLOCKS;
struct st_mi_check_param;
typedef struct st_sort_info {
MI_INFO *info;
struct st_mi_check_param *param;
enum data_file_type new_data_file_type;
SORT_KEY_BLOCKS *key_block,*key_block_end;
uint key,find_length,real_key_length;
my_off_t pos,max_pos,filepos,start_recpos,filelength,dupp,buff_length;
ha_rows max_records;
ulonglong unique[MI_MAX_KEY_SEG+1];
my_bool fix_datafile;
char *record,*buff;
void *wordlist, *wordptr;
MI_KEYDEF *keyinfo;
MI_KEYSEG *keyseg;
} SORT_INFO;
typedef struct st_mi_check_param typedef struct st_mi_check_param
{ {
ulonglong auto_increment_value; ulonglong auto_increment_value;
...@@ -354,7 +336,6 @@ typedef struct st_mi_check_param ...@@ -354,7 +336,6 @@ typedef struct st_mi_check_param
int tmpfile_createflag; int tmpfile_createflag;
myf myf_rw; myf myf_rw;
IO_CACHE read_cache; IO_CACHE read_cache;
SORT_INFO sort_info;
ulonglong unique_count[MI_MAX_KEY_SEG+1]; ulonglong unique_count[MI_MAX_KEY_SEG+1];
ha_checksum key_crc[MI_MAX_POSSIBLE_KEY]; ha_checksum key_crc[MI_MAX_POSSIBLE_KEY];
ulong rec_per_key_part[MI_MAX_KEY_SEG*MI_MAX_POSSIBLE_KEY]; ulong rec_per_key_part[MI_MAX_KEY_SEG*MI_MAX_POSSIBLE_KEY];
...@@ -363,17 +344,42 @@ typedef struct st_mi_check_param ...@@ -363,17 +344,42 @@ typedef struct st_mi_check_param
char *op_name; char *op_name;
} MI_CHECK; } MI_CHECK;
typedef struct st_sort_info {
typedef struct st_mi_sortinfo { MI_INFO *info;
MI_CHECK *param;
enum data_file_type new_data_file_type;
SORT_KEY_BLOCKS *key_block,*key_block_end;
uint kei, total_keys;
my_off_t filelength,dupp,buff_length;
ha_rows max_records; ha_rows max_records;
char *buff;
myf myf_rw;
/* sync things*/
uint got_error, threads_running;
pthread_mutex_t mutex;
pthread_cond_t cond;
} SORT_INFO;
typedef struct st_mi_sort_param {
pthread_t thr;
IO_CACHE read_cache;
ulonglong unique[MI_MAX_KEY_SEG+1];
uint key, key_length,real_key_length,sortbuff_size;
uint maxbuffers, keys, find_length, sort_keys_length;
uchar **sort_keys;
void *wordlist, *wordptr;
MI_KEYDEF *keyinfo;
SORT_INFO *sort_info; SORT_INFO *sort_info;
IO_CACHE tempfile, tempfile_for_exceptions;
DYNAMIC_ARRAY buffpek;
my_off_t pos,max_pos,filepos,start_recpos;
my_bool fix_datafile;
char *record;
char *tmpdir; char *tmpdir;
int (*key_cmp)(SORT_INFO *info, const void *, const void *); int (*key_cmp)(struct st_mi_sort_param *, const void *, const void *);
int (*key_read)(SORT_INFO *info,void *buff); int (*key_read)(struct st_mi_sort_param *,void *);
int (*key_write)(SORT_INFO *info, const void *buff); int (*key_write)(struct st_mi_sort_param *, const void *);
void (*lock_in_memory)(MI_CHECK *info); void (*lock_in_memory)(MI_CHECK *);
uint key_length;
myf myf_rw;
} MI_SORT_PARAM; } MI_SORT_PARAM;
/* functions in mi_check */ /* functions in mi_check */
...@@ -398,14 +404,17 @@ int flush_blocks(MI_CHECK *param, File file); ...@@ -398,14 +404,17 @@ int flush_blocks(MI_CHECK *param, File file);
void update_auto_increment_key(MI_CHECK *param, MI_INFO *info, void update_auto_increment_key(MI_CHECK *param, MI_INFO *info,
my_bool repair); my_bool repair);
int update_state_info(MI_CHECK *param, MI_INFO *info,uint update); int update_state_info(MI_CHECK *param, MI_INFO *info,uint update);
void update_key_parts(MI_KEYDEF *keyinfo, ulong *rec_per_key_part,
ulonglong *unique, ulonglong records);
int filecopy(MI_CHECK *param, File to,File from,my_off_t start, int filecopy(MI_CHECK *param, File to,File from,my_off_t start,
my_off_t length, const char *type); my_off_t length, const char *type);
int movepoint(MI_INFO *info,byte *record,my_off_t oldpos, int movepoint(MI_INFO *info,byte *record,my_off_t oldpos,
my_off_t newpos, uint prot_key); my_off_t newpos, uint prot_key);
int sort_write_record(SORT_INFO *sort_info); int sort_write_record(MI_SORT_PARAM *sort_param);
int write_data_suffix(MI_CHECK *param, MI_INFO *info); int write_data_suffix(SORT_INFO *sort_info, my_bool fix_datafile);
int _create_index_by_sort(MI_SORT_PARAM *info,my_bool no_messages, int _create_index_by_sort(MI_SORT_PARAM *info,my_bool no_messages, ulong);
ulong); void *_thr_find_all_keys(MI_SORT_PARAM *info);
int _thr_write_keys(MI_SORT_PARAM *sort_param);
int test_if_almost_full(MI_INFO *info); int test_if_almost_full(MI_INFO *info);
int recreate_table(MI_CHECK *param, MI_INFO **org_info, char *filename); int recreate_table(MI_CHECK *param, MI_INFO **org_info, char *filename);
void mi_disable_non_unique_index(MI_INFO *info, ha_rows rows); void mi_disable_non_unique_index(MI_INFO *info, ha_rows rows);
......
...@@ -14,14 +14,26 @@ ...@@ -14,14 +14,26 @@
along with this program; if not, write to the Free Software along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/* Functions for read record cacheing with myisam */ /*
/* Used instead of my_b_read() to allow for no-cacheed seeks */ Functions for read record cacheing with myisam
Used for reading dynamic/compressed records from datafile.
#include "myisamdef.h" Can fetch data directly from file (outside cache),
if reading a small chunk straight before the cached part (with possible
overlap).
Can be explicitly asked not to use cache (by not setting READING_NEXT in
flag) - useful for occasional out-of-cache reads, when the next read is
expected to hit the cache again.
Allows "partial read" errors in the record header (when READING_HEADER flag
is set) - unread part is bzero'ed
Note: out-of-cache reads are disabled for shared IO_CACHE's
*/
/* Copy block from cache if it`s in it. If re_read_if_possibly is */
/* set read to cache (if after current file-position) else read to */ #include "myisamdef.h"
/* buff */
int _mi_read_cache(IO_CACHE *info, byte *buff, my_off_t pos, uint length, int _mi_read_cache(IO_CACHE *info, byte *buff, my_off_t pos, uint length,
int flag) int flag)
...@@ -31,7 +43,7 @@ int _mi_read_cache(IO_CACHE *info, byte *buff, my_off_t pos, uint length, ...@@ -31,7 +43,7 @@ int _mi_read_cache(IO_CACHE *info, byte *buff, my_off_t pos, uint length,
char *in_buff_pos; char *in_buff_pos;
DBUG_ENTER("_mi_read_cache"); DBUG_ENTER("_mi_read_cache");
if (pos < info->pos_in_file) if (pos < info->pos_in_file && ! info->share)
{ {
read_length=length; read_length=length;
if ((my_off_t) read_length > (my_off_t) (info->pos_in_file-pos)) if ((my_off_t) read_length > (my_off_t) (info->pos_in_file-pos))
...@@ -44,7 +56,8 @@ int _mi_read_cache(IO_CACHE *info, byte *buff, my_off_t pos, uint length, ...@@ -44,7 +56,8 @@ int _mi_read_cache(IO_CACHE *info, byte *buff, my_off_t pos, uint length,
pos+=read_length; pos+=read_length;
buff+=read_length; buff+=read_length;
} }
if ((offset= (my_off_t) (pos - info->pos_in_file)) < if (pos >= info->pos_in_file &&
(offset= (my_off_t) (pos - info->pos_in_file)) <
(my_off_t) (info->read_end - info->request_pos)) (my_off_t) (info->read_end - info->request_pos))
{ {
in_buff_pos=info->request_pos+(uint) offset; in_buff_pos=info->request_pos+(uint) offset;
...@@ -57,10 +70,10 @@ int _mi_read_cache(IO_CACHE *info, byte *buff, my_off_t pos, uint length, ...@@ -57,10 +70,10 @@ int _mi_read_cache(IO_CACHE *info, byte *buff, my_off_t pos, uint length,
} }
else else
in_buff_length=0; in_buff_length=0;
if (flag & READING_NEXT) if (flag & READING_NEXT || info->share)
{ {
if (pos != ((info)->pos_in_file + if (pos !=
(uint) ((info)->read_end - (info)->request_pos))) (info->pos_in_file + (uint) (info->read_end - info->request_pos)))
{ {
info->pos_in_file=pos; /* Force start here */ info->pos_in_file=pos; /* Force start here */
info->read_pos=info->read_end=info->request_pos; /* Everything used */ info->read_pos=info->read_end=info->request_pos; /* Everything used */
...@@ -70,34 +83,25 @@ int _mi_read_cache(IO_CACHE *info, byte *buff, my_off_t pos, uint length, ...@@ -70,34 +83,25 @@ int _mi_read_cache(IO_CACHE *info, byte *buff, my_off_t pos, uint length,
info->read_pos=info->read_end; /* All block used */ info->read_pos=info->read_end; /* All block used */
if (!(*info->read_function)(info,buff,length)) if (!(*info->read_function)(info,buff,length))
DBUG_RETURN(0); DBUG_RETURN(0);
if (!(flag & READING_HEADER) || info->error == -1 || read_length=info->error;
(uint) info->error+in_buff_length < 3) }
{ else
DBUG_PRINT("error", {
("Error %d reading next-multi-part block (Got %d bytes)", info->seek_not_done=1;
my_errno, info->error)); if ((read_length=my_pread(info->file,buff,length,pos,MYF(0))) == length)
if (!my_errno || my_errno == -1) DBUG_RETURN(0);
my_errno=HA_ERR_WRONG_IN_RECORD;
DBUG_RETURN(1);
}
bzero(buff+info->error,MI_BLOCK_INFO_HEADER_LENGTH - in_buff_length -
(uint) info->error);
DBUG_RETURN(0);
} }
info->seek_not_done=1;
if ((read_length=my_pread(info->file,buff,length,pos,MYF(0))) == length)
DBUG_RETURN(0);
if (!(flag & READING_HEADER) || (int) read_length == -1 || if (!(flag & READING_HEADER) || (int) read_length == -1 ||
read_length+in_buff_length < 3) read_length+in_buff_length < 3)
{ {
DBUG_PRINT("error", DBUG_PRINT("error",
("Error %d reading new block (Got %d bytes)", ("Error %d reading next-multi-part block (Got %d bytes)",
my_errno, (int) read_length)); my_errno, (int) read_length));
if (!my_errno || my_errno == -1) if (!my_errno || my_errno == -1)
my_errno=HA_ERR_WRONG_IN_RECORD; my_errno=HA_ERR_WRONG_IN_RECORD;
DBUG_RETURN(1); DBUG_RETURN(1);
} }
bzero(buff+read_length,MI_BLOCK_INFO_HEADER_LENGTH - in_buff_length - bzero(buff+read_length,MI_BLOCK_INFO_HEADER_LENGTH - in_buff_length -
read_length); read_length);
DBUG_RETURN(0); DBUG_RETURN(0);
} /* _mi_read_cache */ } /* _mi_read_cache */
This diff is collapsed.
...@@ -75,7 +75,8 @@ static int mi_sort_records(MI_CHECK *param, ...@@ -75,7 +75,8 @@ static int mi_sort_records(MI_CHECK *param,
uint sort_key, uint sort_key,
my_bool write_info, my_bool write_info,
my_bool update_index); my_bool update_index);
static int sort_record_index(MI_CHECK *param,MI_INFO *info,MI_KEYDEF *keyinfo, static int sort_record_index(MI_SORT_PARAM *sort_param,MI_INFO *info,
MI_KEYDEF *keyinfo,
my_off_t page,uchar *buff,uint sortkey, my_off_t page,uchar *buff,uint sortkey,
File new_file, my_bool update_index); File new_file, my_bool update_index);
...@@ -1327,7 +1328,7 @@ static int mi_sort_records(MI_CHECK *param, ...@@ -1327,7 +1328,7 @@ static int mi_sort_records(MI_CHECK *param,
register MI_INFO *info, my_string name, register MI_INFO *info, my_string name,
uint sort_key, uint sort_key,
my_bool write_info, my_bool write_info,
my_bool update_index) my_bool update_index)
{ {
int got_error; int got_error;
uint key; uint key;
...@@ -1337,11 +1338,14 @@ static int mi_sort_records(MI_CHECK *param, ...@@ -1337,11 +1338,14 @@ static int mi_sort_records(MI_CHECK *param,
ha_rows old_record_count; ha_rows old_record_count;
MYISAM_SHARE *share=info->s; MYISAM_SHARE *share=info->s;
char llbuff[22],llbuff2[22]; char llbuff[22],llbuff2[22];
SORT_INFO *sort_info= &param->sort_info; SORT_INFO sort_info;
MI_SORT_PARAM sort_param;
DBUG_ENTER("sort_records"); DBUG_ENTER("sort_records");
bzero((char*) sort_info,sizeof(*sort_info)); bzero((char*)&sort_info,sizeof(sort_info));
sort_info->param=param; bzero((char*)&sort_param,sizeof(sort_param));
sort_param.sort_info=&sort_info;
sort_info.param=param;
keyinfo= &share->keyinfo[sort_key]; keyinfo= &share->keyinfo[sort_key];
got_error=1; got_error=1;
temp_buff=0; temp_buff=0;
...@@ -1377,7 +1381,7 @@ static int mi_sort_records(MI_CHECK *param, ...@@ -1377,7 +1381,7 @@ static int mi_sort_records(MI_CHECK *param,
mi_check_print_error(param,"Not enough memory for key block"); mi_check_print_error(param,"Not enough memory for key block");
goto err; goto err;
} }
if (!(sort_info->record=(byte*) my_malloc((uint) share->base.pack_reclength, if (!(sort_param.record=(byte*) my_malloc((uint) share->base.pack_reclength,
MYF(0)))) MYF(0))))
{ {
mi_check_print_error(param,"Not enough memory for record"); mi_check_print_error(param,"Not enough memory for record");
...@@ -1419,18 +1423,18 @@ static int mi_sort_records(MI_CHECK *param, ...@@ -1419,18 +1423,18 @@ static int mi_sort_records(MI_CHECK *param,
} }
/* Setup param for sort_write_record */ /* Setup param for sort_write_record */
sort_info->info=info; sort_info.info=info;
sort_info->new_data_file_type=share->data_file_type; sort_info.new_data_file_type=share->data_file_type;
sort_info->fix_datafile=1; sort_param.fix_datafile=1;
sort_info->filepos=share->pack.header_length; sort_param.filepos=share->pack.header_length;
old_record_count=info->state->records; old_record_count=info->state->records;
info->state->records=0; info->state->records=0;
if (sort_info->new_data_file_type != COMPRESSED_RECORD) if (sort_info.new_data_file_type != COMPRESSED_RECORD)
share->state.checksum=0; share->state.checksum=0;
if (sort_record_index(param, info,keyinfo,share->state.key_root[sort_key], if (sort_record_index(&sort_param,info,keyinfo,share->state.key_root[sort_key],
temp_buff, sort_key,new_file,update_index) || temp_buff, sort_key,new_file,update_index) ||
write_data_suffix(param, info) || write_data_suffix(&sort_info,1) ||
flush_io_cache(&info->rec_cache)) flush_io_cache(&info->rec_cache))
goto err; goto err;
...@@ -1448,7 +1452,7 @@ static int mi_sort_records(MI_CHECK *param, ...@@ -1448,7 +1452,7 @@ static int mi_sort_records(MI_CHECK *param,
info->state->del=0; info->state->del=0;
info->state->empty=0; info->state->empty=0;
share->state.dellink= HA_OFFSET_ERROR; share->state.dellink= HA_OFFSET_ERROR;
info->state->data_file_length=sort_info->filepos; info->state->data_file_length=sort_param.filepos;
share->state.split=info->state->records; /* Only hole records */ share->state.split=info->state->records; /* Only hole records */
share->state.version=(ulong) time((time_t*) 0); share->state.version=(ulong) time((time_t*) 0);
...@@ -1472,11 +1476,11 @@ static int mi_sort_records(MI_CHECK *param, ...@@ -1472,11 +1476,11 @@ static int mi_sort_records(MI_CHECK *param,
{ {
my_afree((gptr) temp_buff); my_afree((gptr) temp_buff);
} }
my_free(sort_info->record,MYF(MY_ALLOW_ZERO_PTR)); my_free(sort_param.record,MYF(MY_ALLOW_ZERO_PTR));
info->opt_flag&= ~(READ_CACHE_USED | WRITE_CACHE_USED); info->opt_flag&= ~(READ_CACHE_USED | WRITE_CACHE_USED);
VOID(end_io_cache(&info->rec_cache)); VOID(end_io_cache(&info->rec_cache));
my_free(sort_info->buff,MYF(MY_ALLOW_ZERO_PTR)); my_free(sort_info.buff,MYF(MY_ALLOW_ZERO_PTR));
sort_info->buff=0; sort_info.buff=0;
share->state.sortkey=sort_key; share->state.sortkey=sort_key;
DBUG_RETURN(flush_blocks(param, share->kfile) | got_error); DBUG_RETURN(flush_blocks(param, share->kfile) | got_error);
} /* sort_records */ } /* sort_records */
...@@ -1484,7 +1488,8 @@ static int mi_sort_records(MI_CHECK *param, ...@@ -1484,7 +1488,8 @@ static int mi_sort_records(MI_CHECK *param,
/* Sort records recursive using one index */ /* Sort records recursive using one index */
static int sort_record_index(MI_CHECK *param,MI_INFO *info, MI_KEYDEF *keyinfo, static int sort_record_index(MI_SORT_PARAM *sort_param,MI_INFO *info,
MI_KEYDEF *keyinfo,
my_off_t page, uchar *buff, uint sort_key, my_off_t page, uchar *buff, uint sort_key,
File new_file,my_bool update_index) File new_file,my_bool update_index)
{ {
...@@ -1493,7 +1498,8 @@ static int sort_record_index(MI_CHECK *param,MI_INFO *info, MI_KEYDEF *keyinfo, ...@@ -1493,7 +1498,8 @@ static int sort_record_index(MI_CHECK *param,MI_INFO *info, MI_KEYDEF *keyinfo,
my_off_t next_page,rec_pos; my_off_t next_page,rec_pos;
uchar lastkey[MI_MAX_KEY_BUFF]; uchar lastkey[MI_MAX_KEY_BUFF];
char llbuff[22]; char llbuff[22];
SORT_INFO *sort_info= &param->sort_info; SORT_INFO *sort_info= sort_param->sort_info;
MI_CHECK *param=sort_info->param;
DBUG_ENTER("sort_record_index"); DBUG_ENTER("sort_record_index");
nod_flag=mi_test_if_nod(buff); nod_flag=mi_test_if_nod(buff);
...@@ -1524,7 +1530,7 @@ static int sort_record_index(MI_CHECK *param,MI_INFO *info, MI_KEYDEF *keyinfo, ...@@ -1524,7 +1530,7 @@ static int sort_record_index(MI_CHECK *param,MI_INFO *info, MI_KEYDEF *keyinfo,
llstr(next_page,llbuff)); llstr(next_page,llbuff));
goto err; goto err;
} }
if (sort_record_index(param, info,keyinfo,next_page,temp_buff,sort_key, if (sort_record_index(sort_param, info,keyinfo,next_page,temp_buff,sort_key,
new_file, update_index)) new_file, update_index))
goto err; goto err;
} }
...@@ -1535,23 +1541,23 @@ static int sort_record_index(MI_CHECK *param,MI_INFO *info, MI_KEYDEF *keyinfo, ...@@ -1535,23 +1541,23 @@ static int sort_record_index(MI_CHECK *param,MI_INFO *info, MI_KEYDEF *keyinfo,
break; break;
rec_pos= _mi_dpos(info,0,lastkey+key_length); rec_pos= _mi_dpos(info,0,lastkey+key_length);
if ((*info->s->read_rnd)(info,sort_info->record,rec_pos,0)) if ((*info->s->read_rnd)(info,sort_param->record,rec_pos,0))
{ {
mi_check_print_error(param,"%d when reading datafile",my_errno); mi_check_print_error(param,"%d when reading datafile",my_errno);
goto err; goto err;
} }
if (rec_pos != sort_info->filepos && update_index) if (rec_pos != sort_param->filepos && update_index)
{ {
_mi_dpointer(info,keypos-nod_flag-info->s->rec_reflength, _mi_dpointer(info,keypos-nod_flag-info->s->rec_reflength,
sort_info->filepos); sort_param->filepos);
if (movepoint(info,sort_info->record,rec_pos,sort_info->filepos, if (movepoint(info,sort_param->record,rec_pos,sort_param->filepos,
sort_key)) sort_key))
{ {
mi_check_print_error(param,"%d when updating key-pointers",my_errno); mi_check_print_error(param,"%d when updating key-pointers",my_errno);
goto err; goto err;
} }
} }
if (sort_write_record(sort_info)) if (sort_write_record(sort_param))
goto err; goto err;
} }
/* Clear end of block to get better compression if the table is backuped */ /* Clear end of block to get better compression if the table is backuped */
......
...@@ -658,7 +658,7 @@ int _mi_init_bulk_insert(MI_INFO *info); ...@@ -658,7 +658,7 @@ int _mi_init_bulk_insert(MI_INFO *info);
void mi_check_print_error _VARARGS((MI_CHECK *param, const char *fmt,...)); void mi_check_print_error _VARARGS((MI_CHECK *param, const char *fmt,...));
void mi_check_print_warning _VARARGS((MI_CHECK *param, const char *fmt,...)); void mi_check_print_warning _VARARGS((MI_CHECK *param, const char *fmt,...));
void mi_check_print_info _VARARGS((MI_CHECK *param, const char *fmt,...)); void mi_check_print_info _VARARGS((MI_CHECK *param, const char *fmt,...));
int flush_pending_blocks(MI_CHECK *param); int flush_pending_blocks(MI_SORT_PARAM *param);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
This diff is collapsed.
...@@ -85,7 +85,11 @@ init_functions(IO_CACHE* info, enum cache_type type) ...@@ -85,7 +85,11 @@ init_functions(IO_CACHE* info, enum cache_type type)
info->write_function = 0; /* Force a core if used */ info->write_function = 0; /* Force a core if used */
break; break;
default: default:
info->read_function = _my_b_read; info->read_function =
#ifdef THREAD
info->share ? _my_b_read_r :
#endif
_my_b_read;
info->write_function = _my_b_write; info->write_function = _my_b_write;
} }
...@@ -127,6 +131,9 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize, ...@@ -127,6 +131,9 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
info->alloced_buffer = 0; info->alloced_buffer = 0;
info->buffer=0; info->buffer=0;
info->seek_not_done= test(file >= 0); info->seek_not_done= test(file >= 0);
#ifdef THREAD
info->share=0;
#endif
if (!cachesize) if (!cachesize)
if (! (cachesize= my_default_record_cache_size)) if (! (cachesize= my_default_record_cache_size))
...@@ -214,7 +221,6 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize, ...@@ -214,7 +221,6 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
DBUG_RETURN(0); DBUG_RETURN(0);
} /* init_io_cache */ } /* init_io_cache */
/* Wait until current request is ready */ /* Wait until current request is ready */
#ifdef HAVE_AIOWAIT #ifdef HAVE_AIOWAIT
...@@ -419,6 +425,90 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count) ...@@ -419,6 +425,90 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
DBUG_RETURN(0); DBUG_RETURN(0);
} }
#ifdef THREAD
int init_io_cache_share(IO_CACHE *info, IO_CACHE_SHARE *s, uint num_threads)
{
DBUG_ASSERT(info->type == READ_CACHE);
pthread_mutex_init(& s->mutex, MY_MUTEX_INIT_FAST);
pthread_cond_init (& s->cond, 0);
s->count=num_threads;
s->active=0; /* to catch errors */
info->share=s;
info->read_function=_my_b_read_r;
}
int remove_io_thread(IO_CACHE *info)
{
if (errno=pthread_mutex_lock(& info->share->mutex))
return -1;
if (! info->share->count--)
pthread_cond_signal(& info->share->cond);
pthread_mutex_unlock(& info->share->mutex);
return 0;
}
int _my_b_read_r(register IO_CACHE *info, byte *Buffer, uint Count)
{
my_off_t pos_in_file;
int length,diff_length,read_len;
DBUG_ENTER("_my_b_read_r");
if ((read_len=(uint) (info->read_end-info->read_pos)))
{
DBUG_ASSERT(Count >= read_len); /* User is not using my_b_read() */
memcpy(Buffer,info->read_pos, (size_t) (read_len));
Buffer+=read_len;
Count-=read_len;
}
#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
#define IO_ROUND_DN(X) ( (X) & ~(IO_SIZE-1))
while (Count) {
int cnt, len;
pos_in_file= info->pos_in_file + (uint)(info->read_end - info->buffer);
diff_length= pos_in_file & (IO_SIZE-1);
length=IO_ROUND_UP(Count+diff_length)-diff_length;
length=(length <= info->read_length) ?
length + IO_ROUND_DN(info->read_length - length) :
length - IO_ROUND_UP(length - info->read_length) ;
if (lock_io_cache(info))
{
info->share->active=info;
if (info->seek_not_done) /* File touched, do seek */
VOID(my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)));
len=my_read(info->file,info->buffer, length, info->myflags);
info->read_end=info->buffer + (len == -1 ? 0 : len);
info->error=(len == length ? 0 : len);
info->pos_in_file=pos_in_file;
unlock_io_cache(info);
}
else
{
info->error= info->share->active->error;
info->read_end= info->share->active->read_end;
info->pos_in_file= info->share->active->pos_in_file;
len= (info->error == -1 ? -1 : info->read_end-info->buffer);
}
info->read_pos=info->buffer;
info->seek_not_done=0;
if (info->error)
{
info->error=read_len;
DBUG_RETURN(1);
}
cnt=(len > Count) ? Count : len;
memcpy(Buffer,info->read_pos, (size_t)cnt);
Count -=cnt;
Buffer+=cnt;
read_len+=cnt;
info->read_pos+=cnt;
}
DBUG_RETURN(0);
}
#endif
/* /*
Do sequential read from the SEQ_READ_APPEND cache Do sequential read from the SEQ_READ_APPEND cache
we do this in three stages: we do this in three stages:
...@@ -454,7 +544,7 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) ...@@ -454,7 +544,7 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count)
*/ */
VOID(my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0))); VOID(my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)));
info->seek_not_done=0; info->seek_not_done=0;
diff_length=(uint) (pos_in_file & (IO_SIZE-1)); diff_length=(uint) (pos_in_file & (IO_SIZE-1));
/* now the second stage begins - read from file descriptor */ /* now the second stage begins - read from file descriptor */
...@@ -510,7 +600,7 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) ...@@ -510,7 +600,7 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count)
memcpy(Buffer,info->buffer,(size_t) length); memcpy(Buffer,info->buffer,(size_t) length);
Count -= length; Count -= length;
Buffer += length; Buffer += length;
/* /*
added the line below to make added the line below to make
DBUG_ASSERT(pos_in_file==info->end_of_file) pass. DBUG_ASSERT(pos_in_file==info->end_of_file) pass.
...@@ -544,7 +634,7 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) ...@@ -544,7 +634,7 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count)
/* /*
TODO: figure out if the assert below is needed or correct. TODO: figure out if the assert below is needed or correct.
*/ */
DBUG_ASSERT(pos_in_file == info->end_of_file); DBUG_ASSERT(pos_in_file == info->end_of_file);
copy_len=min(Count, len_in_buff); copy_len=min(Count, len_in_buff);
memcpy(Buffer, info->append_read_pos, copy_len); memcpy(Buffer, info->append_read_pos, copy_len);
info->append_read_pos += copy_len; info->append_read_pos += copy_len;
...@@ -910,7 +1000,7 @@ int _flush_io_cache(IO_CACHE *info, int need_append_buffer_lock) ...@@ -910,7 +1000,7 @@ int _flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
if (!(append_cache = (info->type == SEQ_READ_APPEND))) if (!(append_cache = (info->type == SEQ_READ_APPEND)))
need_append_buffer_lock=0; need_append_buffer_lock=0;
if (info->type == WRITE_CACHE || append_cache) if (info->type == WRITE_CACHE || append_cache)
{ {
if (info->file == -1) if (info->file == -1)
...@@ -919,7 +1009,7 @@ int _flush_io_cache(IO_CACHE *info, int need_append_buffer_lock) ...@@ -919,7 +1009,7 @@ int _flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
DBUG_RETURN((info->error= -1)); DBUG_RETURN((info->error= -1));
} }
LOCK_APPEND_BUFFER; LOCK_APPEND_BUFFER;
if ((length=(uint) (info->write_pos - info->write_buffer))) if ((length=(uint) (info->write_pos - info->write_buffer)))
{ {
pos_in_file=info->pos_in_file; pos_in_file=info->pos_in_file;
...@@ -956,7 +1046,7 @@ int _flush_io_cache(IO_CACHE *info, int need_append_buffer_lock) ...@@ -956,7 +1046,7 @@ int _flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
info->end_of_file+=(info->write_pos-info->append_read_pos); info->end_of_file+=(info->write_pos-info->append_read_pos);
DBUG_ASSERT(info->end_of_file == my_tell(info->file,MYF(0))); DBUG_ASSERT(info->end_of_file == my_tell(info->file,MYF(0)));
} }
info->append_read_pos=info->write_pos=info->write_buffer; info->append_read_pos=info->write_pos=info->write_buffer;
UNLOCK_APPEND_BUFFER; UNLOCK_APPEND_BUFFER;
DBUG_RETURN(info->error); DBUG_RETURN(info->error);
...@@ -980,6 +1070,18 @@ int end_io_cache(IO_CACHE *info) ...@@ -980,6 +1070,18 @@ int end_io_cache(IO_CACHE *info)
IO_CACHE_CALLBACK pre_close; IO_CACHE_CALLBACK pre_close;
DBUG_ENTER("end_io_cache"); DBUG_ENTER("end_io_cache");
#ifdef THREAD
/* simple protection against multi-close: destroying share first */
if (info->share)
if (pthread_cond_destroy (& info->share->cond) |
pthread_mutex_destroy(& info->share->mutex))
{
DBUG_RETURN(1);
}
else
info->share=0;
#endif
if ((pre_close=info->pre_close)) if ((pre_close=info->pre_close))
(*pre_close)(info); (*pre_close)(info);
if (info->alloced_buffer) if (info->alloced_buffer)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment