Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
mariadb
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
mariadb
Commits
5465dd08
Commit
5465dd08
authored
Jun 10, 2008
by
Guilhem Bichot
Browse files
Options
Browse Files
Download
Plain Diff
automerge
parents
a0aa81f9
7c87c307
Changes
5
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
323 additions
and
153 deletions
+323
-153
storage/maria/CMakeLists.txt
storage/maria/CMakeLists.txt
+1
-1
storage/maria/ma_loghandler.c
storage/maria/ma_loghandler.c
+237
-138
storage/maria/ma_state.c
storage/maria/ma_state.c
+2
-2
storage/maria/unittest/Makefile.am
storage/maria/unittest/Makefile.am
+3
-0
storage/maria/unittest/ma_test_loghandler_multithread-t.c
storage/maria/unittest/ma_test_loghandler_multithread-t.c
+80
-12
No files found.
storage/maria/CMakeLists.txt
View file @
5465dd08
...
...
@@ -24,7 +24,7 @@ INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/include ${CMAKE_SOURCE_DIR}/zlib
SET
(
MARIA_SOURCES ma_init.c ma_open.c ma_extra.c ma_info.c ma_rkey.c
ma_rnext.c ma_rnext_same.c
ma_search.c ma_page.c ma_key_recover.c ma_key.c
ma_locking.c
ma_locking.c
ma_state.c
ma_rrnd.c ma_scan.c ma_cache.c
ma_statrec.c ma_packrec.c ma_dynrec.c
ma_blockrec.c ma_bitmap.c
...
...
storage/maria/ma_loghandler.c
View file @
5465dd08
...
...
@@ -77,9 +77,12 @@ typedef union
normal circumstances (less then half of one and full other, or just
switched one and other), But if we met end of the file in the middle and
have to switch buffer it will be 3. + 1 buffer for flushing/writing.
We have a bigger number here for higher concurrency.
We have a bigger number here for higher concurrency and to make division
faster.
The number should be power of 2 to be fast.
*/
#define TRANSLOG_BUFFERS_NO
5
#define TRANSLOG_BUFFERS_NO
8
/* number of bytes (+ header) which can be unused on first page in sequence */
#define TRANSLOG_MINCHUNK_CONTENT 1
/* version of log file */
...
...
@@ -100,7 +103,13 @@ struct st_translog_buffer
pagecache_inject()
*/
uchar
buffer
[
TRANSLOG_WRITE_BUFFER
];
/*
Maximum LSN of records which ends in this buffer (or IMPOSSIBLE_LSN
if no LSNs ends here)
*/
LSN
last_lsn
;
/* last_lsn of previous buffer or IMPOSSIBLE_LSN if it is very first one */
LSN
prev_last_lsn
;
/* This buffer offset in the file */
TRANSLOG_ADDRESS
offset
;
/*
...
...
@@ -128,9 +137,7 @@ struct st_translog_buffer
content of page which present in both buffers)
*/
struct
st_translog_buffer
*
overlay
;
#ifndef DBUG_OFF
uint
buffer_no
;
#endif
/*
Lock for the buffer.
...
...
@@ -197,6 +204,8 @@ struct st_buffer_cursor
};
typedef
uint8
dirty_buffer_mask_t
;
struct
st_translog_descriptor
{
/* *** Parameters of the log handler *** */
...
...
@@ -245,6 +254,10 @@ struct st_translog_descriptor
File
directory_fd
;
/* buffers for log writing */
struct
st_translog_buffer
buffers
[
TRANSLOG_BUFFERS_NO
];
/* Mask where 1 in position N mean that buffer N is not flushed */
dirty_buffer_mask_t
dirty_buffer_mask
;
/* The above variable protection */
pthread_mutex_t
dirty_buffer_mask_lock
;
/*
horizon - visible end of the log (here is absolute end of the log:
position where next chunk can start
...
...
@@ -276,6 +289,7 @@ struct st_translog_descriptor
be removed in v1.5
*/
pthread_mutex_t
log_flush_lock
;
pthread_cond_t
log_flush_cond
;
/* Protects changing of headers of finished files (max_lsn) */
pthread_mutex_t
file_header_lock
;
...
...
@@ -303,6 +317,11 @@ struct st_translog_descriptor
is generated.
*/
my_bool
is_everything_flushed
;
/* True when flush pass is in progress */
my_bool
flush_in_progress
;
/* Next flush pass variables */
TRANSLOG_ADDRESS
next_pass_max_lsn
;
pthread_t
max_lsn_requester
;
};
static
struct
st_translog_descriptor
log_descriptor
;
...
...
@@ -785,6 +804,7 @@ void translog_stop_writing()
translog_status
=
(
translog_status
==
TRANSLOG_SHUTDOWN
?
TRANSLOG_UNINITED
:
TRANSLOG_READONLY
);
log_descriptor
.
is_everything_flushed
=
1
;
log_descriptor
.
open_flags
=
O_BINARY
|
O_RDONLY
;
DBUG_ASSERT
(
0
);
DBUG_VOID_RETURN
;
...
...
@@ -1373,7 +1393,9 @@ LSN translog_get_file_max_lsn_stored(uint32 file)
static
my_bool
translog_buffer_init
(
struct
st_translog_buffer
*
buffer
)
{
DBUG_ENTER
(
"translog_buffer_init"
);
buffer
->
last_lsn
=
LSN_IMPOSSIBLE
;
buffer
->
prev_last_lsn
=
buffer
->
last_lsn
=
LSN_IMPOSSIBLE
;
DBUG_PRINT
(
"info"
,
(
"last_lsn and prev_last_lsn set to 0 buffer: 0x%lx"
,
(
ulong
)
buffer
));
/* This Buffer File */
buffer
->
file
=
NULL
;
buffer
->
overlay
=
0
;
...
...
@@ -1972,7 +1994,9 @@ static void translog_start_buffer(struct st_translog_buffer *buffer,
(
ulong
)
LSN_OFFSET
(
log_descriptor
.
horizon
),
(
ulong
)
LSN_OFFSET
(
log_descriptor
.
horizon
)));
DBUG_ASSERT
(
buffer_no
==
buffer
->
buffer_no
);
buffer
->
last_lsn
=
LSN_IMPOSSIBLE
;
buffer
->
prev_last_lsn
=
buffer
->
last_lsn
=
LSN_IMPOSSIBLE
;
DBUG_PRINT
(
"info"
,
(
"last_lsn and prev_last_lsn set to 0 buffer: 0x%lx"
,
(
ulong
)
buffer
));
buffer
->
offset
=
log_descriptor
.
horizon
;
buffer
->
next_buffer_offset
=
LSN_IMPOSSIBLE
;
buffer
->
file
=
get_current_logfile
();
...
...
@@ -1987,6 +2011,10 @@ static void translog_start_buffer(struct st_translog_buffer *buffer,
cursor
->
chaser
,
(
ulong
)
cursor
->
buffer
->
size
,
(
ulong
)
(
cursor
->
ptr
-
cursor
->
buffer
->
buffer
)));
translog_check_cursor
(
cursor
);
pthread_mutex_lock
(
&
log_descriptor
.
dirty_buffer_mask_lock
);
log_descriptor
.
dirty_buffer_mask
|=
(
1
<<
buffer
->
buffer_no
);
pthread_mutex_unlock
(
&
log_descriptor
.
dirty_buffer_mask_lock
);
DBUG_VOID_RETURN
;
}
...
...
@@ -2046,7 +2074,6 @@ static my_bool translog_buffer_next(TRANSLOG_ADDRESS *horizon,
if
(
new_file
)
{
/* move the horizon to the next file and its header page */
(
*
horizon
)
+=
LSN_ONE_FILE
;
(
*
horizon
)
=
LSN_REPLACE_OFFSET
(
*
horizon
,
TRANSLOG_PAGE_SIZE
);
...
...
@@ -2065,6 +2092,13 @@ static my_bool translog_buffer_next(TRANSLOG_ADDRESS *horizon,
translog_start_buffer
(
new_buffer
,
cursor
,
new_buffer_no
);
}
log_descriptor
.
buffers
[
old_buffer_no
].
next_buffer_offset
=
new_buffer
->
offset
;
new_buffer
->
prev_last_lsn
=
((
log_descriptor
.
buffers
[
old_buffer_no
].
last_lsn
!=
LSN_IMPOSSIBLE
)
?
log_descriptor
.
buffers
[
old_buffer_no
].
last_lsn
:
log_descriptor
.
buffers
[
old_buffer_no
].
prev_last_lsn
);
DBUG_PRINT
(
"info"
,
(
"prev_last_lsn set to (%lu,0x%lx) buffer: 0x%lx"
,
LSN_IN_PARTS
(
new_buffer
->
prev_last_lsn
),
(
ulong
)
new_buffer
));
translog_new_page_header
(
horizon
,
cursor
);
DBUG_RETURN
(
0
);
}
...
...
@@ -2179,6 +2213,7 @@ static LSN translog_get_sent_to_disk()
DBUG_ENTER
(
"translog_get_sent_to_disk"
);
pthread_mutex_lock
(
&
log_descriptor
.
sent_to_disk_lock
);
lsn
=
log_descriptor
.
sent_to_disk
;
DBUG_PRINT
(
"info"
,
(
"sent to disk up to (%lu,0x%lx)"
,
LSN_IN_PARTS
(
lsn
)));
pthread_mutex_unlock
(
&
log_descriptor
.
sent_to_disk_lock
);
DBUG_RETURN
(
lsn
);
}
...
...
@@ -2392,7 +2427,6 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
TRANSLOG_FILE
*
file
=
buffer
->
file
;
uint8
ver
=
buffer
->
ver
;
DBUG_ENTER
(
"translog_buffer_flush"
);
DBUG_ASSERT
(
buffer
->
file
!=
NULL
);
DBUG_PRINT
(
"enter"
,
(
"Buffer: #%u 0x%lx file: %d offset: (%lu,0x%lx) size: %lu"
,
(
uint
)
buffer
->
buffer_no
,
(
ulong
)
buffer
,
...
...
@@ -2401,6 +2435,9 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
(
ulong
)
buffer
->
size
));
translog_buffer_lock_assert_owner
(
buffer
);
if
(
buffer
->
file
==
NULL
)
DBUG_RETURN
(
0
);
translog_wait_for_writers
(
buffer
);
if
(
buffer
->
file
!=
file
||
buffer
->
offset
!=
offset
||
buffer
->
ver
!=
ver
)
...
...
@@ -2460,6 +2497,11 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
{
TRANSLOG_ADDRESS
addr
=
(
buffer
->
offset
+
i
);
TRANSLOG_VALIDATOR_DATA
data
;
DBUG_PRINT
(
"info"
,
(
"send log form %lu till %lu address: (%lu,0x%lx) "
"page #: %lu buffer size: %lu buffer: 0x%lx"
,
(
ulong
)
i
,
(
ulong
)
(
i
+
TRANSLOG_PAGE_SIZE
),
LSN_IN_PARTS
(
addr
),
(
ulong
)
pg
,
(
ulong
)
buffer
->
size
,
(
ulong
)
buffer
));
data
.
addr
=
&
addr
;
DBUG_ASSERT
(
log_descriptor
.
pagecache
->
block_size
==
TRANSLOG_PAGE_SIZE
);
DBUG_ASSERT
(
i
+
TRANSLOG_PAGE_SIZE
<=
buffer
->
size
);
...
...
@@ -2511,6 +2553,9 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
buffer
->
file
=
NULL
;
buffer
->
overlay
=
0
;
buffer
->
ver
++
;
pthread_mutex_lock
(
&
log_descriptor
.
dirty_buffer_mask_lock
);
log_descriptor
.
dirty_buffer_mask
&=
~
(
1
<<
buffer
->
buffer_no
);
pthread_mutex_unlock
(
&
log_descriptor
.
dirty_buffer_mask_lock
);
pthread_cond_broadcast
(
&
buffer
->
waiting_filling_buffer
);
DBUG_RETURN
(
0
);
}
...
...
@@ -3352,9 +3397,12 @@ my_bool translog_init_with_table(const char *directory,
id_to_share
=
NULL
;
log_descriptor
.
directory_fd
=
-
1
;
log_descriptor
.
is_everything_flushed
=
1
;
log_descriptor
.
next_pass_max_lsn
=
LSN_IMPOSSIBLE
;
(
*
init_table_func
)();
compile_time_assert
(
sizeof
(
log_descriptor
.
dirty_buffer_mask
)
*
8
>=
TRANSLOG_BUFFERS_NO
);
log_descriptor
.
dirty_buffer_mask
=
0
;
if
(
readonly
)
log_descriptor
.
open_flags
=
O_BINARY
|
O_RDONLY
;
else
...
...
@@ -3369,6 +3417,9 @@ my_bool translog_init_with_table(const char *directory,
MY_MUTEX_INIT_FAST
)
||
pthread_mutex_init
(
&
log_descriptor
.
log_flush_lock
,
MY_MUTEX_INIT_FAST
)
||
pthread_mutex_init
(
&
log_descriptor
.
dirty_buffer_mask_lock
,
MY_MUTEX_INIT_FAST
)
||
pthread_cond_init
(
&
log_descriptor
.
log_flush_cond
,
0
)
||
my_rwlock_init
(
&
log_descriptor
.
open_files_lock
,
NULL
)
||
my_init_dynamic_array
(
&
log_descriptor
.
open_files
,
...
...
@@ -3441,9 +3492,7 @@ my_bool translog_init_with_table(const char *directory,
{
if
(
translog_buffer_init
(
log_descriptor
.
buffers
+
i
))
goto
err
;
#ifndef DBUG_OFF
log_descriptor
.
buffers
[
i
].
buffer_no
=
(
uint8
)
i
;
#endif
DBUG_PRINT
(
"info"
,
(
"translog_buffer buffer #%u: 0x%lx"
,
i
,
(
ulong
)
log_descriptor
.
buffers
+
i
));
}
...
...
@@ -4016,6 +4065,8 @@ void translog_destroy()
pthread_mutex_destroy
(
&
log_descriptor
.
unfinished_files_lock
);
pthread_mutex_destroy
(
&
log_descriptor
.
purger_lock
);
pthread_mutex_destroy
(
&
log_descriptor
.
log_flush_lock
);
pthread_mutex_destroy
(
&
log_descriptor
.
dirty_buffer_mask_lock
);
pthread_cond_destroy
(
&
log_descriptor
.
log_flush_cond
);
rwlock_destroy
(
&
log_descriptor
.
open_files_lock
);
delete_dynamic
(
&
log_descriptor
.
open_files
);
delete_dynamic
(
&
log_descriptor
.
unfinished_files
);
...
...
@@ -4686,10 +4737,13 @@ static translog_size_t translog_get_current_group_size()
static
inline
void
set_lsn
(
LSN
*
lsn
,
LSN
value
)
{
DBUG_ENTER
(
"set_lsn"
);
translog_lock_assert_owner
();
*
lsn
=
value
;
/* we generate LSN so something is not flushed in log */
log_descriptor
.
is_everything_flushed
=
0
;
DBUG_PRINT
(
"info"
,
(
"new LSN appeared: (%lu,0x%lx)"
,
LSN_IN_PARTS
(
value
)));
DBUG_VOID_RETURN
;
}
...
...
@@ -4779,6 +4833,9 @@ translog_write_variable_record_1group(LSN *lsn,
rc
|=
translog_advance_pointer
((
int
)(
full_pages
+
additional_chunk3_page
),
(
record_rest
?
record_rest
+
3
:
0
));
log_descriptor
.
bc
.
buffer
->
last_lsn
=
*
lsn
;
DBUG_PRINT
(
"info"
,
(
"last_lsn set to (%lu,0x%lx) buffer: 0x%lx"
,
LSN_IN_PARTS
(
log_descriptor
.
bc
.
buffer
->
last_lsn
),
(
ulong
)
log_descriptor
.
bc
.
buffer
));
translog_unlock
();
...
...
@@ -4902,6 +4959,9 @@ translog_write_variable_record_1chunk(LSN *lsn,
&
log_descriptor
.
bc
,
parts
->
total_record_length
,
parts
);
log_descriptor
.
bc
.
buffer
->
last_lsn
=
*
lsn
;
DBUG_PRINT
(
"info"
,
(
"last_lsn set to (%lu,0x%lx) buffer: 0x%lx"
,
LSN_IN_PARTS
(
log_descriptor
.
bc
.
buffer
->
last_lsn
),
(
ulong
)
log_descriptor
.
bc
.
buffer
));
translog_unlock
();
/*
...
...
@@ -5246,6 +5306,7 @@ translog_write_variable_record_mgroup(LSN *lsn,
uint
groups_per_page
=
(
page_capacity
-
header_fixed_part
)
/
(
7
+
1
);
uint
file_of_the_first_group
;
int
pages_to_skip
;
struct
st_translog_buffer
*
buffer_of_last_lsn
;
DBUG_ENTER
(
"translog_write_variable_record_mgroup"
);
translog_lock_assert_owner
();
...
...
@@ -5480,6 +5541,7 @@ translog_write_variable_record_mgroup(LSN *lsn,
((
page_capacity
-
header_fixed_part
)
/
(
7
+
1
))
*
(
chunk0_pages
-
1
))
*
(
7
+
1
));
buffer_of_last_lsn
=
log_descriptor
.
bc
.
buffer
;
translog_unlock
();
if
(
buffer_to_flush
!=
NULL
)
...
...
@@ -5587,6 +5649,10 @@ translog_write_variable_record_mgroup(LSN *lsn,
*/
translog_lock
();
set_lsn
(
lsn
,
horizon
);
buffer_of_last_lsn
->
last_lsn
=
*
lsn
;
DBUG_PRINT
(
"info"
,
(
"last_lsn set to (%lu,0x%lx) buffer: 0x%lx"
,
LSN_IN_PARTS
(
buffer_of_last_lsn
->
last_lsn
),
(
ulong
)
buffer_of_last_lsn
));
if
(
log_record_type_descriptor
[
type
].
inwrite_hook
&&
(
*
log_record_type_descriptor
[
type
].
inwrite_hook
)
(
type
,
trn
,
tbl_info
,
...
...
@@ -5642,8 +5708,6 @@ translog_write_variable_record_mgroup(LSN *lsn,
*
chunk0_header
=
(
uchar
)
(
TRANSLOG_CHUNK_LSN
|
TRANSLOG_CHUNK_0_CONT
);
}
while
(
chunk0_pages
!=
0
);
translog_buffer_lock
(
cursor
.
buffer
);
if
(
cmp_translog_addr
(
cursor
.
buffer
->
last_lsn
,
*
lsn
)
<
0
)
cursor
.
buffer
->
last_lsn
=
*
lsn
;
translog_buffer_decrease_writers
(
cursor
.
buffer
);
translog_buffer_unlock
(
cursor
.
buffer
);
rc
=
0
;
...
...
@@ -5896,6 +5960,9 @@ static my_bool translog_write_fixed_record(LSN *lsn,
parts
->
total_record_length
,
parts
);
log_descriptor
.
bc
.
buffer
->
last_lsn
=
*
lsn
;
DBUG_PRINT
(
"info"
,
(
"last_lsn set to (%lu,0x%lx) buffer: 0x%lx"
,
LSN_IN_PARTS
(
log_descriptor
.
bc
.
buffer
->
last_lsn
),
(
ulong
)
log_descriptor
.
bc
.
buffer
));
err:
translog_unlock
();
...
...
@@ -7301,7 +7368,8 @@ static void translog_force_current_buffer_to_finish()
#endif
/*
Now only one thread can flush log (buffer can flush many threads but
log flush is serialized) so no other thread can set is_closing_buffer
log flush log flush where this function is used can do only one thread)
so no other thread can set is_closing_buffer.
*/
DBUG_ASSERT
(
!
old_buffer
->
is_closing_buffer
);
old_buffer
->
is_closing_buffer
=
1
;
/* Other flushes will wait */
...
...
@@ -7363,170 +7431,195 @@ static void translog_force_current_buffer_to_finish()
/**
@brief Flush the log up to given LSN (included)
@brief Waits while given lsn will be flushed
@param lsn log record serial number up to which (inclusive)
the log has to be flushed
@param lsn log record serial number up to which (inclusive)
the log has to be flushed
*/
@return Operation status
@retval 0 OK
@retval 1 Error
void
translog_flush_wait_for_end
(
LSN
lsn
)
{
DBUG_ENTER
(
"translog_flush_wait_for_end"
);
DBUG_PRINT
(
"enter"
,
(
"LSN: (%lu,0x%lx)"
,
LSN_IN_PARTS
(
lsn
)));
safe_mutex_assert_owner
(
&
log_descriptor
.
log_flush_lock
);
while
(
cmp_translog_addr
(
log_descriptor
.
flushed
,
lsn
)
<
0
)
pthread_cond_wait
(
&
log_descriptor
.
log_flush_cond
,
&
log_descriptor
.
log_flush_lock
);
DBUG_VOID_RETURN
;
}
/**
@brief Sets goal for the next flush pass and waits for this pass end.
@param lsn log record serial number up to which (inclusive)
the log has to be flushed
*/
void
translog_flush_set_new_goal_and_wait
(
TRANSLOG_ADDRESS
lsn
)
{
DBUG_ENTER
(
"translog_flush_set_new_goal_and_wait"
);
DBUG_PRINT
(
"enter"
,
(
"LSN: (%lu,0x%lx)"
,
LSN_IN_PARTS
(
lsn
)));
safe_mutex_assert_owner
(
&
log_descriptor
.
log_flush_lock
);
if
(
cmp_translog_addr
(
lsn
,
log_descriptor
.
next_pass_max_lsn
)
>
0
)
{
log_descriptor
.
next_pass_max_lsn
=
lsn
;
log_descriptor
.
max_lsn_requester
=
pthread_self
();
}
while
(
log_descriptor
.
flush_in_progress
)
{
pthread_cond_wait
(
&
log_descriptor
.
log_flush_cond
,
&
log_descriptor
.
log_flush_lock
);
}
DBUG_VOID_RETURN
;
}
/**
@brief Flush the log up to given LSN (included)
@param lsn log record serial number up to which (inclusive)
the log has to be flushed
@return Operation status
@retval 0 OK
@retval 1 Error
@todo LOG: when a log write fails, we should not write to this log anymore
(if we add more log records to this log they will be unreadable: we will hit
the broken log record): all translog_flush() should be made to fail (because
translog_flush() is when a a transaction wants something durable and we
cannot make anything durable as log is corrupted). For that, a "my_bool
st_translog_descriptor::write_error" could be set to 1 when a
translog_write_record() or translog_flush() fails, and translog_flush()
would test this var (and translog_write_record() could also test this var if
it wants, though it's not absolutely needed).
Then, either shut Maria down immediately, or switch to a new log (but if we
get write error after write error, that would create too many logs).
A popular open-source transactional engine intentionally crashes as soon as
a log flush fails (we however don't want to crash the entire mysqld, but
stopping all engine's operations immediately would make sense).
Same applies to translog_write_record().
@todo: remove serialization and make group commit.
*/
my_bool
translog_flush
(
TRANSLOG_ADDRESS
lsn
)
{
LSN
old_flushed
,
sent_to_disk
;
LSN
sent_to_disk
=
LSN_IMPOSSIBLE
;
TRANSLOG_ADDRESS
flush_horizon
;
int
rc
=
0
;
/* We can't have more different files then buffers */
TRANSLOG_FILE
*
file_handlers
[
TRANSLOG_BUFFERS_NO
];
int
current_file_handler
=
-
1
;
uint32
prev_file
=
0
;
my_bool
full_circle
=
0
;
uint
fn
,
i
;
dirty_buffer_mask_t
dirty_buffer_mask
;
uint8
last_buffer_no
,
start_buffer_no
;
my_bool
rc
=
0
;
DBUG_ENTER
(
"translog_flush"
);
DBUG_PRINT
(
"enter"
,
(
"Flush up to LSN: (%lu,0x%lx)"
,
LSN_IN_PARTS
(
lsn
)));
DBUG_ASSERT
(
translog_status
==
TRANSLOG_OK
||
translog_status
==
TRANSLOG_READONLY
);
LINT_INIT
(
sent_to_disk
);
LINT_INIT
(
flush_horizon
);
pthread_mutex_lock
(
&
log_descriptor
.
log_flush_lock
);
DBUG_PRINT
(
"info"
,
(
"Everything is flushed up to (%lu,0x%lx)"
,
LSN_IN_PARTS
(
log_descriptor
.
flushed
)));
if
(
cmp_translog_addr
(
log_descriptor
.
flushed
,
lsn
)
>=
0
)
{
pthread_mutex_unlock
(
&
log_descriptor
.
log_flush_lock
);
DBUG_RETURN
(
0
);
}
if
(
log_descriptor
.
flush_in_progress
)
{
translog_flush_set_new_goal_and_wait
(
lsn
);
if
(
!
pthread_equal
(
log_descriptor
.
max_lsn_requester
,
pthread_self
()))
{
/* fix lsn if it was horizon */
if
(
cmp_translog_addr
(
lsn
,
log_descriptor
.
bc
.
buffer
->
last_lsn
)
>
0
)
lsn
=
log_descriptor
.
bc
.
buffer
->
last_lsn
;
translog_flush_wait_for_end
(
lsn
);
pthread_mutex_unlock
(
&
log_descriptor
.
log_flush_lock
);
DBUG_RETURN
(
0
);
}
log_descriptor
.
next_pass_max_lsn
=
LSN_IMPOSSIBLE
;
}
log_descriptor
.
flush_in_progress
=
1
;
DBUG_PRINT
(
"info"
,
(
"flush_in_progress is set"
));
pthread_mutex_unlock
(
&
log_descriptor
.
log_flush_lock
);
translog_lock
();
if
(
log_descriptor
.
is_everything_flushed
)
{
DBUG_PRINT
(
"info"
,
(
"everything is flushed"
));
rc
=
(
translog_status
==
TRANSLOG_READONLY
);
translog_unlock
();
goto
out
;
}
flush_horizon
=
LSN_IMPOSSIBLE
;
old_flushed
=
log_descriptor
.
flushed
;
for
(;;)
/*
We will recheck information when will lock buffers one by
one so we can use unprotected read here (this is just for
speed up buffers processing)
*/
dirty_buffer_mask
=
log_descriptor
.
dirty_buffer_mask
;
DBUG_PRINT
(
"info"
,
(
"Dirty buffer mask: %lx current buffer: %u"
,
(
ulong
)
dirty_buffer_mask
,
(
uint
)
log_descriptor
.
bc
.
buffer_no
));
for
(
i
=
(
log_descriptor
.
bc
.
buffer_no
+
1
)
%
TRANSLOG_BUFFERS_NO
;
i
!=
log_descriptor
.
bc
.
buffer_no
&&
!
(
dirty_buffer_mask
&
(
1
<<
i
));
i
=
(
i
+
1
)
%
TRANSLOG_BUFFERS_NO
)
{}
start_buffer_no
=
i
;
/* if we have to flush last buffer then we will finish it */
if
(
cmp_translog_addr
(
lsn
,
log_descriptor
.
bc
.
buffer
->
prev_last_lsn
)
>
0
)
{
uint16
buffer_no
=
log_descriptor
.
bc
.
buffer_no
;
uint16
buffer_start
=
buffer_no
;
struct
st_translog_buffer
*
buffer_unlock
=
log_descriptor
.
bc
.
buffer
;
struct
st_translog_buffer
*
buffer
=
log_descriptor
.
bc
.
buffer
;
if
(
cmp_translog_addr
(
log_descriptor
.
flushed
,
lsn
)
>=
0
)
{
DBUG_PRINT
(
"info"
,
(
"already flushed: (%lu,0x%lx)"
,
LSN_IN_PARTS
(
log_descriptor
.
flushed
)));
translog_unlock
();
goto
out
;
}
/* send to the file if it is not sent */
if
(
translog_status
!=
TRANSLOG_OK
)
{
rc
=
1
;
goto
out
;
}
sent_to_disk
=
translog_get_sent_to_disk
();
if
(
cmp_translog_addr
(
sent_to_disk
,
lsn
)
>=
0
||
full_circle
)
break
;
lsn
=
log_descriptor
.
bc
.
buffer
->
last_lsn
;
/* fix lsn if it was horizon */
last_buffer_no
=
log_descriptor
.
bc
.
buffer_no
;
log_descriptor
.
is_everything_flushed
=
1
;
translog_force_current_buffer_to_finish
();
translog_buffer_unlock
(
buffer
);
}
else
{
last_buffer_no
=
((
log_descriptor
.
bc
.
buffer_no
+
TRANSLOG_BUFFERS_NO
-
1
)
%
TRANSLOG_BUFFERS_NO
);
translog_unlock
();
}
sent_to_disk
=
translog_get_sent_to_disk
();
if
(
cmp_translog_addr
(
lsn
,
sent_to_disk
)
>
0
)
{
DBUG_PRINT
(
"info"
,
(
"Start buffer #: %u last buffer #: %u"
,
(
uint
)
start_buffer_no
,
(
uint
)
last_buffer_no
));
last_buffer_no
=
(
last_buffer_no
+
1
)
%
TRANSLOG_BUFFERS_NO
;
i
=
start_buffer_no
;
do
{
buffer_no
=
(
buffer_no
+
1
)
%
TRANSLOG_BUFFERS_NO
;
buffer
=
log_descriptor
.
buffers
+
buffer_no
;
struct
st_translog_buffer
*
buffer
=
log_descriptor
.
buffers
+
i
;
translog_buffer_lock
(
buffer
);
translog_buffer_unlock
(
buffer_unlock
);
buffer_unlock
=
buffer
;
if
(
buffer
->
file
!=
NULL
)
DBUG_PRINT
(
"info"
,
(
"Check buffer: 0x%lx #: %u "
"prev last LSN: (%lu,0x%lx) "
"last LSN: (%lu,0x%lx) status: %s"
,
(
ulong
)(
buffer
),
(
uint
)
i
,
LSN_IN_PARTS
(
buffer
->
prev_last_lsn
),
LSN_IN_PARTS
(
buffer
->
last_lsn
),
(
buffer
->
file
?
"dirty"
:
"closed"
)));
if
(
buffer
->
prev_last_lsn
<=
lsn
&&
buffer
->
file
!=
NULL
)
{
buffer_unlock
=
NULL
;
if
(
buffer_start
==
buffer_no
)
{
/* we made a circle */
full_circle
=
1
;
/*
If buffer from which we started still current we have to
finish it (we will not flush intentionally more records
then was at the moment of start flushing);
*/
if
(
buffer_start
==
log_descriptor
.
bc
.
buffer_no
)
{
translog_lock_assert_owner
();
/*
Here we have loghandler locked.
We are going to flush last buffer, and will not release
log_flush_lock until it happened, so we can set the flag here
and other process which going to flush will not be able read
it and return earlier then we finish the flush process. (But
other process can drop the flag if new LSN appeared (only
after translog_force_current_buffer_to_finish() call and
transaction log unlock of course))
*/
log_descriptor
.
is_everything_flushed
=
1
;
translog_force_current_buffer_to_finish
();
}
}
break
;
DBUG_ASSERT
(
flush_horizon
<=
buffer
->
offset
+
buffer
->
size
);
flush_horizon
=
buffer
->
offset
+
buffer
->
size
;
translog_buffer_flush
(
buffer
);
}
}
while
((
buffer_start
!=
buffer_no
)
&&
cmp_translog_addr
(
log_descriptor
.
flushed
,
lsn
)
<
0
);
if
(
buffer_unlock
!=
NULL
&&
buffer_unlock
!=
buffer
)
translog_buffer_unlock
(
buffer_unlock
);
if
(
prev_file
!=
LSN_FILE_NO
(
buffer
->
offset
))
{
TRANSLOG_FILE
*
file
;
uint32
fn
=
LSN_FILE_NO
(
buffer
->
offset
);
prev_file
=
fn
;
file
=
get_logfile_by_number
(
fn
);
DBUG_ASSERT
(
file
!=
NULL
);
if
(
!
file
->
is_sync
)
{
current_file_handler
++
;
file_handlers
[
current_file_handler
]
=
file
;
}
/* We sync file when we are closing it => do nothing if file closed */
}
DBUG_ASSERT
(
flush_horizon
<=
buffer
->
offset
+
buffer
->
size
);
flush_horizon
=
buffer
->
offset
+
buffer
->
size
;
rc
=
translog_buffer_flush
(
buffer
);
translog_buffer_unlock
(
buffer
);
if
(
rc
)
goto
out
;
/* rc is 1 */
translog_lock
();
translog_buffer_unlock
(
buffer
);
i
=
(
i
+
1
)
%
TRANSLOG_BUFFERS_NO
;
}
while
(
i
!=
last_buffer_no
);
sent_to_disk
=
translog_get_sent_to_disk
();
}
translog_unlock
();
/* sync files from previous flush till current one */
for
(
fn
=
LSN_FILE_NO
(
log_descriptor
.
flushed
);
fn
<=
LSN_FILE_NO
(
lsn
);
fn
++
)
{
TRANSLOG_FILE
*
*
cur
=
file_handlers
;
TRANSLOG_FILE
**
end
=
file_handlers
+
current_file_handler
;
for
(;
cur
<=
end
;
cur
++
)
TRANSLOG_FILE
*
file
=
get_logfile_by_number
(
fn
)
;
DBUG_ASSERT
(
file
!=
NULL
)
;
if
(
!
file
->
is_sync
)
{
(
*
cur
)
->
is_sync
=
1
;
if
(
my_sync
((
*
cur
)
->
handler
.
file
,
MYF
(
MY_WME
)))
if
(
my_sync
(
file
->
handler
.
file
,
MYF
(
MY_WME
)))
{
rc
=
1
;
translog_stop_writing
();
sent_to_disk
=
LSN_IMPOSSIBLE
;
goto
out
;
}
file
->
is_sync
=
1
;
}
}
log_descriptor
.
flushed
=
sent_to_disk
;
/*
If we should flush (due to directory flush mode) and
previous flush horizon was not within one page border with this one.
*/
if
(
sync_log_dir
>=
TRANSLOG_SYNC_DIR_ALWAYS
&&
(
LSN_FILE_NO
(
log_descriptor
.
previous_flush_horizon
)
!=
LSN_FILE_NO
(
flush_horizon
)
||
...
...
@@ -7536,7 +7629,13 @@ my_bool translog_flush(TRANSLOG_ADDRESS lsn)
rc
|=
sync_dir
(
log_descriptor
.
directory_fd
,
MYF
(
MY_WME
|
MY_IGNORE_BADFD
));
log_descriptor
.
previous_flush_horizon
=
flush_horizon
;
out:
pthread_mutex_unlock
(
&
log_descriptor
.
log_flush_lock
);
pthread_mutex_lock
(
&
log_descriptor
.
log_flush_lock
);
if
(
sent_to_disk
!=
LSN_IMPOSSIBLE
)
log_descriptor
.
flushed
=
sent_to_disk
;
log_descriptor
.
flush_in_progress
=
0
;
DBUG_PRINT
(
"info"
,
(
"flush_in_progress is dropped"
));
pthread_mutex_unlock
(
&
log_descriptor
.
log_flush_lock
);
\
pthread_cond_broadcast
(
&
log_descriptor
.
log_flush_cond
);
DBUG_RETURN
(
rc
);
}
...
...
storage/maria/ma_state.c
View file @
5465dd08
...
...
@@ -28,9 +28,9 @@
10 ended transactions since last time it was called.
*/
#include
<maria_def.h>
#include
"maria_def.h"
#include "trnman.h"
#include
<ma_blockrec.h>
#include
"ma_blockrec.h"
/**
@brief Setup initial start-of-transaction state for a table
...
...
storage/maria/unittest/Makefile.am
View file @
5465dd08
...
...
@@ -42,6 +42,7 @@ noinst_PROGRAMS = ma_control_file-t trnman-t lockman2-t \
ma_test_loghandler-t
\
ma_test_loghandler_multigroup-t
\
ma_test_loghandler_multithread-t
\
ma_test_loghandler_multiflush-t
\
ma_test_loghandler_pagecache-t
\
ma_test_loghandler_long-t
\
ma_test_loghandler_noflush-t
\
...
...
@@ -54,6 +55,8 @@ noinst_PROGRAMS = ma_control_file-t trnman-t lockman2-t \
ma_test_loghandler_t_SOURCES
=
ma_test_loghandler-t.c ma_maria_log_cleanup.c ma_loghandler_examples.c
ma_test_loghandler_multigroup_t_SOURCES
=
ma_test_loghandler_multigroup-t.c ma_maria_log_cleanup.c ma_loghandler_examples.c sequence_storage.c sequence_storage.h
ma_test_loghandler_multithread_t_SOURCES
=
ma_test_loghandler_multithread-t.c ma_maria_log_cleanup.c ma_loghandler_examples.c
ma_test_loghandler_multiflush_t_SOURCES
=
ma_test_loghandler_multithread-t.c ma_maria_log_cleanup.c ma_loghandler_examples.c
ma_test_loghandler_multiflush_t_CPPFLAGS
=
-DMULTIFLUSH_TEST
ma_test_loghandler_pagecache_t_SOURCES
=
ma_test_loghandler_pagecache-t.c ma_maria_log_cleanup.c ma_loghandler_examples.c
ma_test_loghandler_long_t_SOURCES
=
ma_test_loghandler-t.c ma_maria_log_cleanup.c ma_loghandler_examples.c
ma_test_loghandler_long_t_CPPFLAGS
=
-DLONG_LOG_TEST
...
...
storage/maria/unittest/ma_test_loghandler_multithread-t.c
View file @
5465dd08
...
...
@@ -28,16 +28,35 @@ static const char *default_dbug_option;
#define PCACHE_SIZE (1024*1024*10)
#define LOG_FILE_SIZE (1024L*1024L*1024L + 1024L*1024L*512)
/*#define LOG_FLAGS TRANSLOG_SECTOR_PROTECTION | TRANSLOG_PAGE_CRC */
#define LOG_FLAGS 0
/*#define LONG_BUFFER_SIZE (1024L*1024L*1024L + 1024L*1024L*512)*/
#ifdef MULTIFLUSH_TEST
#define LONG_BUFFER_SIZE (16384L)
#define MIN_REC_LENGTH 10
#define SHOW_DIVIDER 20
#define ITERATIONS 10000
#define FLUSH_ITERATIONS 1000
#define WRITERS 2
#define FLUSHERS 10
#else
#define LONG_BUFFER_SIZE (512L*1024L*1024L)
#define MIN_REC_LENGTH 30
#define SHOW_DIVIDER 10
#define LOG_FILE_SIZE (1024L*1024L*1024L + 1024L*1024L*512)
#define ITERATIONS 3
#define FLUSH_ITERATIONS 0
#define WRITERS 3
#define FLUSHERS 0
#endif
static
uint
number_of_writers
=
WRITERS
;
static
uint
number_of_flushers
=
FLUSHERS
;
static
pthread_cond_t
COND_thread_count
;
static
pthread_mutex_t
LOCK_thread_count
;
...
...
@@ -48,6 +67,9 @@ static LSN lsns1[WRITERS][ITERATIONS];
static
LSN
lsns2
[
WRITERS
][
ITERATIONS
];
static
uchar
*
long_buffer
;
static
LSN
last_lsn
;
/* For test purposes the variable allow dirty read/write */
/*
Get pseudo-random length of the field in
limits [MIN_REC_LENGTH..LONG_BUFFER_SIZE]
...
...
@@ -177,6 +199,7 @@ void writer(int num)
return
;
}
lsns2
[
num
][
i
]
=
lsn
;
last_lsn
=
lsn
;
pthread_mutex_lock
(
&
LOCK_thread_count
);
ok
(
1
,
"write records"
);
pthread_mutex_unlock
(
&
LOCK_thread_count
);
...
...
@@ -205,6 +228,33 @@ static void *test_thread_writer(void *arg)
}
static
void
*
test_thread_flusher
(
void
*
arg
)
{
int
param
=
*
((
int
*
)
arg
);
int
i
;
my_thread_init
();
for
(
i
=
0
;
i
<
FLUSH_ITERATIONS
;
i
++
)
{
translog_flush
(
last_lsn
);
pthread_mutex_lock
(
&
LOCK_thread_count
);
ok
(
1
,
"-- flush %d"
,
param
);
pthread_mutex_unlock
(
&
LOCK_thread_count
);
}
pthread_mutex_lock
(
&
LOCK_thread_count
);
thread_count
--
;
ok
(
1
,
"flusher finished"
);
/* just to show progress */
VOID
(
pthread_cond_signal
(
&
COND_thread_count
));
/* Tell main we are
ready */
pthread_mutex_unlock
(
&
LOCK_thread_count
);
free
((
uchar
*
)
arg
);
my_thread_end
();
return
(
0
);
}
int
main
(
int
argc
__attribute__
((
unused
)),
char
**
argv
__attribute__
((
unused
)))
{
...
...
@@ -219,7 +269,8 @@ int main(int argc __attribute__((unused)),
int
*
param
,
error
;
int
rc
;
plan
(
WRITERS
+
ITERATIONS
*
WRITERS
*
3
);
plan
(
WRITERS
+
FLUSHERS
+
ITERATIONS
*
WRITERS
*
3
+
FLUSH_ITERATIONS
*
FLUSHERS
);
bzero
(
&
pagecache
,
sizeof
(
pagecache
));
maria_data_root
=
(
char
*
)
"."
;
...
...
@@ -329,19 +380,36 @@ int main(int argc __attribute__((unused)),
pthread_mutex_lock
(
&
LOCK_thread_count
);
while
(
number_of_writers
!=
0
)
while
(
number_of_writers
!=
0
||
number_of_flushers
!=
0
)
{
param
=
(
int
*
)
malloc
(
sizeof
(
int
));
*
param
=
number_of_writers
-
1
;
if
((
error
=
pthread_create
(
&
tid
,
&
thr_attr
,
test_thread_writer
,
(
void
*
)
param
)))
if
(
number_of_writers
)
{
fprintf
(
stderr
,
"Got error: %d from pthread_create (errno: %d)
\n
"
,
error
,
errno
);
exit
(
1
);
param
=
(
int
*
)
malloc
(
sizeof
(
int
));
*
param
=
number_of_writers
-
1
;
if
((
error
=
pthread_create
(
&
tid
,
&
thr_attr
,
test_thread_writer
,
(
void
*
)
param
)))
{
fprintf
(
stderr
,
"Got error: %d from pthread_create (errno: %d)
\n
"
,
error
,
errno
);
exit
(
1
);
}
thread_count
++
;
number_of_writers
--
;
}
if
(
number_of_flushers
)
{
param
=
(
int
*
)
malloc
(
sizeof
(
int
));
*
param
=
number_of_flushers
-
1
;
if
((
error
=
pthread_create
(
&
tid
,
&
thr_attr
,
test_thread_flusher
,
(
void
*
)
param
)))
{
fprintf
(
stderr
,
"Got error: %d from pthread_create (errno: %d)
\n
"
,
error
,
errno
);
exit
(
1
);
}
thread_count
++
;
number_of_flushers
--
;
}
thread_count
++
;
number_of_writers
--
;
}
pthread_mutex_unlock
(
&
LOCK_thread_count
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment