Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
MariaDB
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
nexedi
MariaDB
Commits
ef26fe22
Commit
ef26fe22
authored
Jun 30, 2002
by
unknown
Browse files
Options
Browse Files
Download
Plain Diff
merged
include/myisam.h: Auto merged myisam/myisamchk.c: Auto merged
parents
52b4a3a2
cc1142bd
Changes
6
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
149 additions
and
75 deletions
+149
-75
include/my_sys.h
include/my_sys.h
+0
-2
include/myisam.h
include/myisam.h
+3
-1
myisam/mi_check.c
myisam/mi_check.c
+31
-17
myisam/myisamchk.c
myisam/myisamchk.c
+23
-7
myisam/sort.c
myisam/sort.c
+24
-17
mysys/mf_iocache.c
mysys/mf_iocache.c
+68
-31
No files found.
include/my_sys.h
View file @
ef26fe22
...
...
@@ -666,8 +666,6 @@ extern int _my_b_read_r(IO_CACHE *info,byte *Buffer,uint Count);
extern
void
init_io_cache_share
(
IO_CACHE
*
info
,
IO_CACHE_SHARE
*
s
,
uint
num_threads
);
extern
void
remove_io_thread
(
IO_CACHE
*
info
);
int
lock_io_cache
(
IO_CACHE
*
);
void
unlock_io_cache
(
IO_CACHE
*
);
#endif
extern
int
_my_b_seq_read
(
IO_CACHE
*
info
,
byte
*
Buffer
,
uint
Count
);
extern
int
_my_b_net_read
(
IO_CACHE
*
info
,
byte
*
Buffer
,
uint
Count
);
...
...
include/myisam.h
View file @
ef26fe22
...
...
@@ -384,7 +384,7 @@ typedef struct st_mi_sort_param
IO_CACHE
tempfile
,
tempfile_for_exceptions
;
DYNAMIC_ARRAY
buffpek
;
my_off_t
pos
,
max_pos
,
filepos
,
start_recpos
;
my_bool
fix_datafile
;
my_bool
fix_datafile
,
master
;
char
*
record
;
char
*
tmpdir
;
int
(
*
key_cmp
)(
struct
st_mi_sort_param
*
,
const
void
*
,
const
void
*
);
...
...
@@ -406,6 +406,8 @@ int mi_repair(MI_CHECK *param, register MI_INFO *info,
int
mi_sort_index
(
MI_CHECK
*
param
,
register
MI_INFO
*
info
,
my_string
name
);
int
mi_repair_by_sort
(
MI_CHECK
*
param
,
register
MI_INFO
*
info
,
const
char
*
name
,
int
rep_quick
);
int
mi_repair_parallel
(
MI_CHECK
*
param
,
register
MI_INFO
*
info
,
const
char
*
name
,
int
rep_quick
);
int
change_to_newfile
(
const
char
*
filename
,
const
char
*
old_ext
,
const
char
*
new_ext
,
uint
raid_chunks
,
myf
myflags
);
...
...
myisam/mi_check.c
View file @
ef26fe22
...
...
@@ -14,7 +14,7 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/* Descri
pt
, check and repair of MyISAM tables */
/* Descri
be
, check and repair of MyISAM tables */
#include "ftdefs.h"
#include <m_ctype.h>
...
...
@@ -1187,6 +1187,7 @@ int mi_repair(MI_CHECK *param, register MI_INFO *info,
my_seek
(
info
->
dfile
,
0L
,
MY_SEEK_END
,
MYF
(
0
));
sort_info
.
dupp
=
0
;
sort_param
.
fix_datafile
=
(
my_bool
)
(
!
rep_quick
);
sort_param
.
master
=
1
;
sort_info
.
max_records
=
~
(
ha_rows
)
0
;
set_data_file_type
(
&
sort_info
,
share
);
...
...
@@ -1888,6 +1889,7 @@ int mi_repair_by_sort(MI_CHECK *param, register MI_INFO *info,
sort_param
.
tmpdir
=
param
->
tmpdir
;
sort_param
.
sort_info
=&
sort_info
;
sort_param
.
fix_datafile
=
(
my_bool
)
(
!
rep_quick
);
sort_param
.
master
=
1
;
del
=
info
->
state
->
del
;
param
->
glob_crc
=
0
;
...
...
@@ -2107,13 +2109,14 @@ int mi_repair_by_sort(MI_CHECK *param, register MI_INFO *info,
DESCRIPTION
Same as mi_repair_by_sort but do it multithreaded
Each key is handled by a separate thread.
TODO: make a number of thread a parameter
RESULT
0 ok
<>0 Error
*/
int
mi_repair_
by_sort_r
(
MI_CHECK
*
param
,
register
MI_INFO
*
info
,
int
mi_repair_
parallel
(
MI_CHECK
*
param
,
register
MI_INFO
*
info
,
const
char
*
name
,
int
rep_quick
)
{
int
got_error
;
...
...
@@ -2131,7 +2134,7 @@ int mi_repair_by_sort_r(MI_CHECK *param, register MI_INFO *info,
SORT_INFO
sort_info
;
ulonglong
key_map
=
share
->
state
.
key_map
;
pthread_attr_t
thr_attr
;
DBUG_ENTER
(
"mi_repair_
by_sort_r
"
);
DBUG_ENTER
(
"mi_repair_
parallel
"
);
start_records
=
info
->
state
->
records
;
got_error
=
1
;
...
...
@@ -2267,6 +2270,8 @@ int mi_repair_by_sort_r(MI_CHECK *param, register MI_INFO *info,
i
--
;
continue
;
}
if
((
!
(
param
->
testflag
&
T_SILENT
)))
printf
(
"- Fixing index %d
\n
"
,
key
+
1
);
sort_param
[
i
].
key_read
=
((
sort_param
[
i
].
keyinfo
->
flag
&
HA_FULLTEXT
)
?
sort_ft_key_read
:
sort_key_read
);
sort_param
[
i
].
key_cmp
=
sort_key_cmp
;
...
...
@@ -2274,6 +2279,7 @@ int mi_repair_by_sort_r(MI_CHECK *param, register MI_INFO *info,
sort_param
[
i
].
lock_in_memory
=
lock_memory
;
sort_param
[
i
].
tmpdir
=
param
->
tmpdir
;
sort_param
[
i
].
sort_info
=&
sort_info
;
sort_param
[
i
].
master
=
0
;
sort_param
[
i
].
fix_datafile
=
0
;
sort_param
[
i
].
filepos
=
new_header_length
;
...
...
@@ -2300,7 +2306,8 @@ int mi_repair_by_sort_r(MI_CHECK *param, register MI_INFO *info,
sort_param
[
i
].
key_length
+=
ft_max_word_len_for_sort
-
ft_max_word_len
;
}
sort_info
.
total_keys
=
i
;
sort_param
[
0
].
fix_datafile
=
!
rep_quick
;
sort_param
[
0
].
master
=
1
;
sort_param
[
0
].
fix_datafile
=
(
my_bool
)(
!
rep_quick
);
sort_info
.
got_error
=
0
;
pthread_mutex_init
(
&
sort_info
.
mutex
,
MY_MUTEX_INIT_FAST
);
...
...
@@ -2321,10 +2328,10 @@ int mi_repair_by_sort_r(MI_CHECK *param, register MI_INFO *info,
In the second one all the threads will fill their sort_buffers
(and call write_keys) at the same time, putting more stress on i/o.
*/
#if
1
#if
ndef USING_SECOND_APPROACH
param
->
sort_buffer_length
/
sort_info
.
total_keys
;
#else
param
->
sort_buffer_length
*
sort_param
[
i
].
key_length
/
total_key_length
;
param
->
sort_buffer_length
*
sort_param
[
i
].
key_length
/
total_key_length
;
#endif
if
(
pthread_create
(
&
sort_param
[
i
].
thr
,
&
thr_attr
,
thr_find_all_keys
,
...
...
@@ -2488,7 +2495,8 @@ static int sort_key_read(MI_SORT_PARAM *sort_param, void *key)
if
(
info
->
state
->
records
==
sort_info
->
max_records
)
{
mi_check_print_error
(
sort_info
->
param
,
"Found too many records; Can`t continue"
);
"Key %d - Found too many records; Can't continue"
,
sort_param
->
key
+
1
);
DBUG_RETURN
(
1
);
}
sort_param
->
real_key_length
=
...
...
@@ -2578,7 +2586,8 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
if
(
!
sort_param
->
fix_datafile
)
{
sort_param
->
filepos
=
sort_param
->
pos
;
share
->
state
.
split
++
;
if
(
sort_param
->
master
)
share
->
state
.
split
++
;
}
sort_param
->
max_pos
=
(
sort_param
->
pos
+=
share
->
base
.
pack_reclength
);
if
(
*
sort_param
->
record
)
...
...
@@ -2588,7 +2597,7 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
mi_static_checksum
(
info
,
sort_param
->
record
));
DBUG_RETURN
(
0
);
}
if
(
!
sort_param
->
fix_datafile
)
if
(
!
sort_param
->
fix_datafile
&&
sort_param
->
master
)
{
info
->
state
->
del
++
;
info
->
state
->
empty
+=
share
->
base
.
pack_reclength
;
...
...
@@ -2734,7 +2743,8 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
}
if
(
b_type
&
(
BLOCK_DELETED
|
BLOCK_SYNC_ERROR
))
{
if
(
!
sort_param
->
fix_datafile
&&
(
b_type
&
BLOCK_DELETED
))
if
(
!
sort_param
->
fix_datafile
&&
sort_param
->
master
&&
(
b_type
&
BLOCK_DELETED
))
{
info
->
state
->
empty
+=
block_info
.
block_len
;
info
->
state
->
del
++
;
...
...
@@ -2753,7 +2763,7 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
continue
;
}
if
(
!
sort_param
->
fix_datafile
)
if
(
!
sort_param
->
fix_datafile
&&
sort_param
->
master
)
share
->
state
.
split
++
;
if
(
!
found_record
++
)
{
...
...
@@ -2895,7 +2905,8 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
if
(
!
sort_param
->
fix_datafile
)
{
sort_param
->
filepos
=
sort_param
->
pos
;
share
->
state
.
split
++
;
if
(
sort_param
->
master
)
share
->
state
.
split
++
;
}
sort_param
->
max_pos
=
(
sort_param
->
pos
=
block_info
.
filepos
+
block_info
.
rec_len
);
...
...
@@ -3003,12 +3014,15 @@ int sort_write_record(MI_SORT_PARAM *sort_param)
break
;
}
}
info
->
state
->
records
++
;
if
((
param
->
testflag
&
T_WRITE_LOOP
)
&&
(
info
->
state
->
records
%
WRITE_COUNT
)
==
0
)
if
(
sort_param
->
master
)
{
char
llbuff
[
22
];
printf
(
"%s
\r
"
,
llstr
(
info
->
state
->
records
,
llbuff
));
VOID
(
fflush
(
stdout
));
info
->
state
->
records
++
;
if
((
param
->
testflag
&
T_WRITE_LOOP
)
&&
(
info
->
state
->
records
%
WRITE_COUNT
)
==
0
)
{
char
llbuff
[
22
];
printf
(
"%s
\r
"
,
llstr
(
info
->
state
->
records
,
llbuff
));
VOID
(
fflush
(
stdout
));
}
}
DBUG_RETURN
(
0
);
}
/* sort_write_record */
...
...
myisam/myisamchk.c
View file @
ef26fe22
...
...
@@ -194,6 +194,9 @@ static struct my_option my_long_options[] =
{
"force"
,
'f'
,
"Restart with -r if there are any errors in the table. States will be updated as with --update-state."
,
0
,
0
,
0
,
GET_NO_ARG
,
NO_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"HELP"
,
'H'
,
"Display this help and exit."
,
0
,
0
,
0
,
GET_NO_ARG
,
NO_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"help"
,
'?'
,
"Display this help and exit."
,
0
,
0
,
0
,
GET_NO_ARG
,
NO_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
...
...
@@ -216,9 +219,15 @@ static struct my_option my_long_options[] =
{
"recover"
,
'r'
,
"Can fix almost anything except unique keys that aren't unique."
,
0
,
0
,
0
,
GET_NO_ARG
,
NO_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"paraller-recover"
,
'p'
,
"Same as '-r' but creates all the keys in parallel"
,
0
,
0
,
0
,
GET_NO_ARG
,
NO_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"safe-recover"
,
'o'
,
"Uses old recovery method; Slower than '-r' but can handle a couple of cases where '-r' reports that it can't fix the data file."
,
0
,
0
,
0
,
GET_NO_ARG
,
NO_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"sort-recover"
,
'n'
,
"Force recovering with sorting even if the temporary file was very big."
,
0
,
0
,
0
,
GET_NO_ARG
,
NO_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"start-check-pos"
,
OPT_START_CHECK_POS
,
"No help available."
,
0
,
0
,
0
,
GET_ULL
,
REQUIRED_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
...
...
@@ -244,9 +253,6 @@ static struct my_option my_long_options[] =
(
gptr
*
)
&
check_param
.
opt_sort_key
,
(
gptr
*
)
&
check_param
.
opt_sort_key
,
0
,
GET_UINT
,
REQUIRED_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"sort-recover"
,
'n'
,
"Force recovering with sorting even if the temporary file was very big."
,
0
,
0
,
0
,
GET_NO_ARG
,
NO_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"tmpdir"
,
't'
,
"Path for temporary files."
,
(
gptr
*
)
&
check_param
.
tmpdir
,
...
...
@@ -514,6 +520,11 @@ get_one_option(int optid,
if
(
argument
!=
disabled_my_option
)
check_param
.
testflag
|=
T_REP_BY_SORT
;
break
;
case
'p'
:
check_param
.
testflag
&=
~
T_REP_ANY
;
if
(
argument
!=
disabled_my_option
)
check_param
.
testflag
|=
T_REP_PARALLEL
;
break
;
case
'o'
:
check_param
.
testflag
&=
~
T_REP_ANY
;
check_param
.
force_sort
=
0
;
...
...
@@ -616,6 +627,9 @@ get_one_option(int optid,
check_param
.
start_check_pos
=
strtoull
(
argument
,
NULL
,
0
);
break
;
#endif
case
'H'
:
my_print_help
(
my_long_options
);
exit
(
0
);
case
'?'
:
usage
();
exit
(
0
);
...
...
@@ -864,8 +878,7 @@ static int myisamchk(MI_CHECK *param, my_string filename)
if
(
tmp
!=
share
->
state
.
key_map
)
info
->
update
|=
HA_STATE_CHANGED
;
}
if
(
rep_quick
&&
chk_del
(
param
,
info
,
param
->
testflag
&
~
T_VERBOSE
))
if
(
rep_quick
&&
chk_del
(
param
,
info
,
param
->
testflag
&
~
T_VERBOSE
))
{
if
(
param
->
testflag
&
T_FORCE_CREATE
)
{
...
...
@@ -881,14 +894,17 @@ static int myisamchk(MI_CHECK *param, my_string filename)
}
if
(
!
error
)
{
if
((
param
->
testflag
&
T_REP_BY_SORT
)
&&
if
((
param
->
testflag
&
(
T_REP_BY_SORT
|
T_REP_PARALLEL
)
)
&&
(
share
->
state
.
key_map
||
(
rep_quick
&&
!
param
->
keys_in_use
&&
!
recreate
))
&&
mi_test_if_sort_rep
(
info
,
info
->
state
->
records
,
info
->
s
->
state
.
key_map
,
param
->
force_sort
))
{
error
=
mi_repair_by_sort
(
param
,
info
,
filename
,
rep_quick
);
if
(
param
->
testflag
&
T_REP_BY_SORT
)
error
=
mi_repair_by_sort
(
param
,
info
,
filename
,
rep_quick
);
else
error
=
mi_repair_parallel
(
param
,
info
,
filename
,
rep_quick
);
state_updated
=
1
;
}
else
if
(
param
->
testflag
&
T_REP_ANY
)
...
...
myisam/sort.c
View file @
ef26fe22
...
...
@@ -284,21 +284,25 @@ pthread_handler_decl(thr_find_all_keys,arg)
uint
memavl
,
old_memavl
,
keys
,
sort_length
;
uint
idx
,
maxbuffer
;
uchar
**
sort_keys
;
error
=
1
;
if
(
my_thread_init
())
goto
err
;
if
(
info
->
sort_info
->
got_error
)
goto
err
;
my_b_clear
(
&
info
->
tempfile
);
my_b_clear
(
&
info
->
tempfile_for_exceptions
);
bzero
((
char
*
)
&
info
->
buffpek
,
sizeof
(
info
->
buffpek
));
bzero
((
char
*
)
&
info
->
unique
,
sizeof
(
info
->
unique
));
sort_keys
=
(
uchar
**
)
NULL
;
error
=
1
;
if
(
info
->
sort_info
->
got_error
)
goto
err
;
memavl
=
max
(
info
->
sortbuff_size
,
MIN_SORT_MEMORY
);
idx
=
info
->
sort_info
->
max_records
;
sort_length
=
info
->
key_length
;
maxbuffer
=
1
;
maxbuffer
=
1
;
while
(
memavl
>=
MIN_SORT_MEMORY
)
{
if
((
my_off_t
)
(
idx
+
1
)
*
(
sort_length
+
sizeof
(
char
*
))
<=
...
...
@@ -340,6 +344,7 @@ pthread_handler_decl(thr_find_all_keys,arg)
mi_check_print_error
(
info
->
sort_info
->
param
,
"Sort buffer to small"
);
/* purecov: tested */
goto
err
;
/* purecov: tested */
}
// (*info->lock_in_memory)(info->sort_info->param);/* Everything is allocated */
if
(
info
->
sort_info
->
param
->
testflag
&
T_VERBOSE
)
printf
(
"Key %d - Allocating buffer for %d keys
\n
"
,
info
->
key
+
1
,
keys
);
...
...
@@ -348,8 +353,8 @@ pthread_handler_decl(thr_find_all_keys,arg)
idx
=
error
=
0
;
sort_keys
[
0
]
=
(
uchar
*
)
(
sort_keys
+
keys
);
while
(
!
(
error
=
info
->
sort_info
->
got_error
)
||
!
(
error
=
(
*
info
->
key_read
)(
info
,
sort_keys
[
idx
])))
while
(
!
(
error
=
info
->
sort_info
->
got_error
)
&&
!
(
error
=
(
*
info
->
key_read
)(
info
,
sort_keys
[
idx
])))
{
if
(
info
->
real_key_length
>
info
->
key_length
)
{
...
...
@@ -364,7 +369,6 @@ pthread_handler_decl(thr_find_all_keys,arg)
(
BUFFPEK
*
)
alloc_dynamic
(
&
info
->
buffpek
),
&
info
->
tempfile
))
goto
err
;
sort_keys
[
0
]
=
(
uchar
*
)
(
sort_keys
+
keys
);
memcpy
(
sort_keys
[
0
],
sort_keys
[
idx
-
1
],(
size_t
)
info
->
key_length
);
idx
=
1
;
...
...
@@ -401,6 +405,7 @@ pthread_handler_decl(thr_find_all_keys,arg)
info
->
sort_info
->
threads_running
--
;
pthread_cond_signal
(
&
info
->
sort_info
->
cond
);
pthread_mutex_unlock
(
&
info
->
sort_info
->
mutex
);
my_thread_end
();
return
NULL
;
}
...
...
@@ -414,14 +419,14 @@ int thr_write_keys(MI_SORT_PARAM *sort_param)
int
got_error
=
sort_info
->
got_error
;
uint
i
;
MI_INFO
*
info
=
sort_info
->
info
;
MYISAM_SHARE
*
share
=
info
->
s
;
MYISAM_SHARE
*
share
=
info
->
s
;
MI_SORT_PARAM
*
sinfo
;
byte
*
mergebuf
=
0
;
LINT_INIT
(
length
);
for
(
i
=
0
,
sinfo
=
sort_param
;
i
<
sort_info
->
total_keys
;
i
++
,
sinfo
++
,
rec_per_key_part
+=
sinfo
->
keyinfo
->
keysegs
)
for
(
i
=
0
,
sinfo
=
sort_param
;
i
<
sort_info
->
total_keys
;
i
++
,
rec_per_key_part
+=
sinfo
->
keyinfo
->
keysegs
,
sinfo
++
)
{
if
(
!
sinfo
->
sort_keys
)
{
...
...
@@ -447,11 +452,11 @@ int thr_write_keys(MI_SORT_PARAM *sort_param)
sinfo
->
sort_keys
=
0
;
}
for
(
i
=
0
,
sinfo
=
sort_param
;
i
<
sort_info
->
total_keys
;
i
++
,
sinfo
++
,
delete_dynamic
(
&
sinfo
->
buffpek
),
close_cached_file
(
&
sinfo
->
tempfile
),
close_cached_file
(
&
sinfo
->
tempfile_for_exceptions
)
)
for
(
i
=
0
,
sinfo
=
sort_param
;
i
<
sort_info
->
total_keys
;
i
++
,
delete_dynamic
(
&
sinfo
->
buffpek
),
close_cached_file
(
&
sinfo
->
tempfile
),
close_cached_file
(
&
sinfo
->
tempfile_for_exceptions
),
sinfo
++
)
{
if
(
got_error
)
continue
;
...
...
@@ -552,8 +557,10 @@ static int NEAR_F write_keys(MI_SORT_PARAM *info, register uchar **sort_keys,
buffpek
->
count
=
count
;
for
(
end
=
sort_keys
+
count
;
sort_keys
!=
end
;
sort_keys
++
)
{
if
(
my_b_write
(
tempfile
,(
byte
*
)
*
sort_keys
,(
uint
)
sort_length
))
DBUG_RETURN
(
1
);
/* purecov: inspected */
}
DBUG_RETURN
(
0
);
}
/* write_keys */
...
...
@@ -576,7 +583,7 @@ static int NEAR_F write_key(MI_SORT_PARAM *info, uchar *key,
}
/* write_key */
/* Write index */
/* Write index */
static
int
NEAR_F
write_index
(
MI_SORT_PARAM
*
info
,
register
uchar
**
sort_keys
,
register
uint
count
)
...
...
mysys/mf_iocache.c
View file @
ef26fe22
...
...
@@ -68,6 +68,9 @@ static void my_aiowait(my_aio_result *result);
#define unlock_append_buffer(info)
#endif
#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
#define IO_ROUND_DN(X) ( (X) & ~(IO_SIZE-1))
static
void
init_functions
(
IO_CACHE
*
info
,
enum
cache_type
type
)
{
...
...
@@ -424,22 +427,24 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
DBUG_RETURN
(
0
);
}
#ifdef THREAD
/* Initialzie multi-thread usage of the IO cache */
/* Prepare IO_CACHE for shared use */
void
init_io_cache_share
(
IO_CACHE
*
info
,
IO_CACHE_SHARE
*
s
,
uint
num_threads
)
{
DBUG_ASSERT
(
info
->
type
==
READ_CACHE
);
pthread_mutex_init
(
&
s
->
mutex
,
MY_MUTEX_INIT_FAST
);
pthread_cond_init
(
&
s
->
cond
,
0
);
s
->
count
=
num_threads
;
pthread_cond_init
(
&
s
->
cond
,
0
);
s
->
count
=
num_threads
-
1
;
s
->
active
=
0
;
/* to catch errors */
info
->
share
=
s
;
info
->
read_function
=
_my_b_read_r
;
}
/*
Remove a thread from shared access to IO_CACHE
Every thread should do that on exit for not
to deadlock other threads
*/
void
remove_io_thread
(
IO_CACHE
*
info
)
{
pthread_mutex_lock
(
&
info
->
share
->
mutex
);
...
...
@@ -448,34 +453,41 @@ void remove_io_thread(IO_CACHE *info)
pthread_mutex_unlock
(
&
info
->
share
->
mutex
);
}
int
lock_io_cache
(
IO_CACHE
*
info
)
static
int
lock_io_cache
(
IO_CACHE
*
info
)
{
pthread_mutex_lock
(
&
info
->
share
->
mutex
);
if
(
!
info
->
share
->
count
)
return
1
;
info
->
share
->
count
--
;
pthread_cond_wait
(
&
((
info
)
->
share
->
cond
),
&
((
info
)
->
share
->
mutex
));
if
(
!++
info
->
share
->
count
)
--
(
info
->
share
->
count
);
pthread_cond_wait
(
&
info
->
share
->
cond
,
&
info
->
share
->
mutex
);
/*
count can be -1 here, if one thread was removed (remove_io_thread)
while all others were locked (lock_io_cache).
If this is the case, this thread behaves as if count was 0 from the
very beginning, that is returns 1 and does not unlock the mutex.
*/
if
(
++
(
info
->
share
->
count
))
return
pthread_mutex_unlock
(
&
info
->
share
->
mutex
);
else
return
1
;
pthread_mutex_unlock
(
&
info
->
share
->
mutex
);
return
0
;
}
void
unlock_io_cache
(
IO_CACHE
*
info
)
static
void
unlock_io_cache
(
IO_CACHE
*
info
)
{
pthread_cond_broadcast
(
&
info
->
share
->
cond
);
pthread_mutex_unlock
(
&
info
->
share
->
mutex
);
}
/*
Read from the io cache in a thread safe manner
Read from IO_CACHE when it is shared between several threads.
It works as follows: when a thread tries to read from a file
(that is, after using all the data from the (shared) buffer),
it just hangs on lock_io_cache(), wating for other threads.
When the very last thread attempts a read, lock_io_cache()
returns 1, the thread does actual IO and unlock_io_cache(),
which signals all the waiting threads that data is in the buffer.
*/
#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
#define IO_ROUND_DN(X) ( (X) & ~(IO_SIZE-1))
int
_my_b_read_r
(
register
IO_CACHE
*
info
,
byte
*
Buffer
,
uint
Count
)
{
my_off_t
pos_in_file
;
...
...
@@ -491,24 +503,40 @@ int _my_b_read_r(register IO_CACHE *info, byte *Buffer, uint Count)
}
while
(
Count
)
{
u
int
cnt
,
len
;
int
cnt
,
len
;
pos_in_file
=
info
->
pos_in_file
+
(
uint
)(
info
->
read_end
-
info
->
buffer
);
diff_length
=
(
uint
)
(
pos_in_file
&
(
IO_SIZE
-
1
));
length
=
IO_ROUND_UP
(
Count
+
diff_length
)
-
diff_length
;
length
=
((
length
<=
info
->
read_length
)
?
length
+
IO_ROUND_DN
(
info
->
read_length
-
length
)
:
length
-
IO_ROUND_UP
(
length
-
info
->
read_length
))
;
length
=
IO_ROUND_UP
(
Count
+
diff_length
)
-
diff_length
;
length
=
(
length
<=
info
->
read_length
)
?
length
+
IO_ROUND_DN
(
info
->
read_length
-
length
)
:
length
-
IO_ROUND_UP
(
length
-
info
->
read_length
)
;
if
(
info
->
type
!=
READ_FIFO
&&
(
length
>
info
->
end_of_file
-
pos_in_file
))
length
=
info
->
end_of_file
-
pos_in_file
;
if
(
length
==
0
)
{
info
->
error
=
(
int
)
read_len
;
DBUG_RETURN
(
1
);
}
if
(
lock_io_cache
(
info
))
{
#if 0 && SAFE_MUTEX
#define PRINT_LOCK(M) printf("Thread %d: mutex is %s\n", my_thread_id(), \
(((safe_mutex_t *)(M))->count ? "Locked" : "Unlocked"))
#else
#define PRINT_LOCK(M)
#endif
PRINT_LOCK
(
&
info
->
share
->
mutex
);
info
->
share
->
active
=
info
;
if
(
info
->
seek_not_done
)
/* File touched, do seek */
VOID
(
my_seek
(
info
->
file
,
pos_in_file
,
MY_SEEK_SET
,
MYF
(
0
)));
len
=
my_read
(
info
->
file
,
info
->
buffer
,
length
,
info
->
myflags
);
info
->
read_end
=
info
->
buffer
+
(
len
==
(
uint
)
-
1
?
0
:
len
);
info
->
error
=
(
len
==
length
?
0
:
len
);
len
=
(
int
)
my_read
(
info
->
file
,
info
->
buffer
,
length
,
info
->
myflags
);
info
->
read_end
=
info
->
buffer
+
(
len
==
-
1
?
0
:
len
);
info
->
error
=
(
len
==
(
int
)
length
?
0
:
len
);
info
->
pos_in_file
=
pos_in_file
;
unlock_io_cache
(
info
);
PRINT_LOCK
(
&
info
->
share
->
mutex
);
}
else
{
...
...
@@ -516,15 +544,16 @@ int _my_b_read_r(register IO_CACHE *info, byte *Buffer, uint Count)
info
->
read_end
=
info
->
share
->
active
->
read_end
;
info
->
pos_in_file
=
info
->
share
->
active
->
pos_in_file
;
len
=
(
info
->
error
==
-
1
?
-
1
:
info
->
read_end
-
info
->
buffer
);
PRINT_LOCK
(
&
info
->
share
->
mutex
);
}
info
->
read_pos
=
info
->
buffer
;
info
->
seek_not_done
=
0
;
if
(
info
->
error
)
if
(
len
<=
0
)
{
info
->
error
=
(
int
)
read_len
;
DBUG_RETURN
(
1
);
}
cnt
=
(
len
>
Count
)
?
Count
:
len
;
cnt
=
(
len
>
Count
)
?
(
int
)
Count
:
len
;
memcpy
(
Buffer
,
info
->
read_pos
,
(
size_t
)
cnt
);
Count
-=
cnt
;
Buffer
+=
cnt
;
...
...
@@ -1098,11 +1127,19 @@ int end_io_cache(IO_CACHE *info)
DBUG_ENTER
(
"end_io_cache"
);
#ifdef THREAD
/* simple protection against multi-close: destroying share first */
if
(
info
->
share
)
{
pthread_cond_destroy
(
&
info
->
share
->
cond
);
#ifdef SAFE_MUTEX
/* simple protection against multi-close: destroying share first */
if
(
pthread_cond_destroy
(
&
info
->
share
->
cond
)
|
pthread_mutex_destroy
(
&
info
->
share
->
mutex
))
{
DBUG_RETURN
(
1
);
}
#else
pthread_cond_destroy
(
&
info
->
share
->
cond
);
pthread_mutex_destroy
(
&
info
->
share
->
mutex
);
#endif
info
->
share
=
0
;
}
#endif
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment