Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
MariaDB
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
nexedi
MariaDB
Commits
8bb2170d
Commit
8bb2170d
authored
Jul 31, 2020
by
Marko Mäkelä
Browse files
Options
Browse Files
Download
Plain Diff
Merge 10.2 into 10.3
parents
66ec3a77
879ba197
Changes
4
Show whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
163 additions
and
113 deletions
+163
-113
storage/innobase/buf/buf0dblwr.cc
storage/innobase/buf/buf0dblwr.cc
+48
-73
storage/innobase/fsp/fsp0file.cc
storage/innobase/fsp/fsp0file.cc
+6
-11
storage/innobase/include/log0recv.h
storage/innobase/include/log0recv.h
+29
-17
storage/innobase/log/log0recv.cc
storage/innobase/log/log0recv.cc
+80
-12
No files found.
storage/innobase/buf/buf0dblwr.cc
View file @
8bb2170d
...
@@ -529,30 +529,54 @@ buf_dblwr_init_or_load_pages(
...
@@ -529,30 +529,54 @@ buf_dblwr_init_or_load_pages(
void
void
buf_dblwr_process
()
buf_dblwr_process
()
{
{
ut_ad
(
recv_sys
->
parse_start_lsn
);
ulint
page_no_dblwr
=
0
;
ulint
page_no_dblwr
=
0
;
byte
*
read_buf
;
byte
*
read_buf
;
byte
*
unaligned_read_buf
;
recv_dblwr_t
&
recv_dblwr
=
recv_sys
->
dblwr
;
recv_dblwr_t
&
recv_dblwr
=
recv_sys
->
dblwr
;
if
(
!
buf_dblwr
)
{
if
(
!
buf_dblwr
)
{
return
;
return
;
}
}
unaligned_read_buf
=
static_cast
<
byte
*>
(
ut_malloc_nokey
(
3U
<<
srv_page_size_shift
));
read_buf
=
static_cast
<
byte
*>
(
read_buf
=
static_cast
<
byte
*>
(
ut_align
(
unaligned_read_buf
,
srv_page_size
));
aligned_malloc
(
3
*
srv_page_size
,
srv_page_size
));
byte
*
const
buf
=
read_buf
+
srv_page_size
;
byte
*
const
buf
=
read_buf
+
srv_page_size
;
for
(
recv_dblwr_t
::
list
::
iterator
i
=
recv_dblwr
.
pages
.
begin
();
for
(
recv_dblwr_t
::
list
::
iterator
i
=
recv_dblwr
.
pages
.
begin
();
i
!=
recv_dblwr
.
pages
.
end
();
i
!=
recv_dblwr
.
pages
.
end
();
++
i
,
++
page_no_dblwr
)
{
++
i
,
++
page_no_dblwr
)
{
byte
*
page
=
*
i
;
byte
*
page
=
*
i
;
ulint
space_id
=
page_get_space_id
(
page
);
const
ulint
page_no
=
page_get_page_no
(
page
);
fil_space_t
*
space
=
fil_space_get
(
space_id
);
if
(
!
page_no
)
{
/* page 0 should have been recovered
already via Datafile::restore_from_doublewrite() */
continue
;
}
const
ulint
space_id
=
page_get_space_id
(
page
);
const
lsn_t
lsn
=
mach_read_from_8
(
page
+
FIL_PAGE_LSN
);
if
(
recv_sys
->
parse_start_lsn
>
lsn
)
{
/* Pages written before the checkpoint are
not useful for recovery. */
continue
;
}
const
page_id_t
page_id
(
space_id
,
page_no
);
if
(
recv_sys
->
scanned_lsn
<
lsn
)
{
ib
::
warn
()
<<
"Ignoring a doublewrite copy of page "
<<
page_id
<<
" with future log sequence number "
<<
lsn
;
continue
;
}
if
(
space
==
NULL
)
{
fil_space_t
*
space
=
fil_space_acquire_for_io
(
space_id
);
if
(
!
space
)
{
/* Maybe we have dropped the tablespace
/* Maybe we have dropped the tablespace
and this page once belonged to it: do nothing */
and this page once belonged to it: do nothing */
continue
;
continue
;
...
@@ -560,9 +584,6 @@ buf_dblwr_process()
...
@@ -560,9 +584,6 @@ buf_dblwr_process()
fil_space_open_if_needed
(
space
);
fil_space_open_if_needed
(
space
);
const
ulint
page_no
=
page_get_page_no
(
page
);
const
page_id_t
page_id
(
space_id
,
page_no
);
if
(
UNIV_UNLIKELY
(
page_no
>=
space
->
size
))
{
if
(
UNIV_UNLIKELY
(
page_no
>=
space
->
size
))
{
/* Do not report the warning if the tablespace
/* Do not report the warning if the tablespace
...
@@ -578,6 +599,8 @@ buf_dblwr_process()
...
@@ -578,6 +599,8 @@ buf_dblwr_process()
<<
space
->
name
<<
space
->
name
<<
" ("
<<
space
->
size
<<
" pages)"
;
<<
" ("
<<
space
->
size
<<
" pages)"
;
}
}
next_page:
space
->
release_for_io
();
continue
;
continue
;
}
}
...
@@ -606,76 +629,27 @@ buf_dblwr_process()
...
@@ -606,76 +629,27 @@ buf_dblwr_process()
<<
"error: "
<<
err
;
<<
"error: "
<<
err
;
}
}
const
bool
is_all_zero
=
buf_is_zeroes
(
if
(
buf_is_zeroes
(
span
<
const
byte
>
(
read_buf
,
span
<
const
byte
>
(
read_buf
,
page_size
.
physical
()));
page_size
.
physical
())))
{
const
bool
expect_encrypted
=
space
->
crypt_data
&&
space
->
crypt_data
->
type
!=
CRYPT_SCHEME_UNENCRYPTED
;
if
(
is_all_zero
)
{
/* We will check if the copy in the
/* We will check if the copy in the
doublewrite buffer is valid. If not, we will
doublewrite buffer is valid. If not, we will
ignore this page (there should be redo log
ignore this page (there should be redo log
records to initialize it). */
records to initialize it). */
}
else
if
(
recv_dblwr
.
validate_page
(
page_id
,
read_buf
,
space
,
buf
))
{
goto
next_page
;
}
else
{
}
else
{
/* Decompress the page before
validating the checksum. */
ulint
decomp
=
fil_page_decompress
(
buf
,
read_buf
);
if
(
!
decomp
||
(
decomp
!=
srv_page_size
&&
page_size
.
is_compressed
()))
{
goto
bad
;
}
if
(
expect_encrypted
&&
mach_read_from_4
(
read_buf
+
FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
)
?
fil_space_verify_crypt_checksum
(
read_buf
,
page_size
)
:
!
buf_page_is_corrupted
(
true
,
read_buf
,
page_size
,
space
))
{
/* The page is good; there is no need
to consult the doublewrite buffer. */
continue
;
}
bad:
/* We intentionally skip this message for
/* We intentionally skip this message for
is_all_
zero pages. */
all-
zero pages. */
ib
::
info
()
ib
::
info
()
<<
"Trying to recover page "
<<
page_id
<<
"Trying to recover page "
<<
page_id
<<
" from the doublewrite buffer."
;
<<
" from the doublewrite buffer."
;
}
}
ulint
decomp
=
fil_page_decompress
(
buf
,
page
);
page
=
recv_dblwr
.
find_page
(
page_id
,
space
,
buf
);
if
(
!
decomp
||
(
decomp
!=
srv_page_size
&&
page_size
.
is_compressed
()))
{
continue
;
}
if
(
expect_encrypted
&&
mach_read_from_4
(
if
(
!
page
)
{
page
+
FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
)
goto
next_page
;
?
!
fil_space_verify_crypt_checksum
(
page
,
page_size
)
:
buf_page_is_corrupted
(
true
,
page
,
page_size
,
space
))
{
/* Theoretically we could have another good
copy for this page in the doublewrite
buffer. If not, we will report a fatal error
for a corrupted page somewhere else if that
page was truly needed. */
continue
;
}
if
(
page_no
==
0
)
{
/* Check the FSP_SPACE_FLAGS. */
ulint
flags
=
fsp_header_get_flags
(
page
);
if
(
!
fsp_flags_is_valid
(
flags
,
space_id
)
&&
fsp_flags_convert_from_101
(
flags
)
==
ULINT_UNDEFINED
)
{
ib
::
warn
()
<<
"Ignoring a doublewrite copy"
" of page "
<<
page_id
<<
" due to invalid flags "
<<
ib
::
hex
(
flags
);
continue
;
}
/* The flags on the page should be converted later. */
}
}
/* Write the good page from the doublewrite buffer to
/* Write the good page from the doublewrite buffer to
...
@@ -684,17 +658,18 @@ buf_dblwr_process()
...
@@ -684,17 +658,18 @@ buf_dblwr_process()
IORequest
write_request
(
IORequest
::
WRITE
);
IORequest
write_request
(
IORequest
::
WRITE
);
fil_io
(
write_request
,
true
,
page_id
,
page_size
,
fil_io
(
write_request
,
true
,
page_id
,
page_size
,
0
,
page_size
.
physical
(),
0
,
page_size
.
physical
(),
page
,
NULL
);
const_cast
<
byte
*>
(
page
),
NULL
);
ib
::
info
()
<<
"Recovered page "
<<
page_id
ib
::
info
()
<<
"Recovered page "
<<
page_id
<<
" from the doublewrite buffer."
;
<<
" from the doublewrite buffer."
;
goto
next_page
;
}
}
recv_dblwr
.
pages
.
clear
();
recv_dblwr
.
pages
.
clear
();
fil_flush_file_spaces
(
FIL_TYPE_TABLESPACE
);
fil_flush_file_spaces
(
FIL_TYPE_TABLESPACE
);
ut_free
(
unaligned_
read_buf
);
aligned_free
(
read_buf
);
}
}
/****************************************************************//**
/****************************************************************//**
...
...
storage/innobase/fsp/fsp0file.cc
View file @
8bb2170d
...
@@ -768,10 +768,10 @@ Datafile::restore_from_doublewrite()
...
@@ -768,10 +768,10 @@ Datafile::restore_from_doublewrite()
}
}
/* Find if double write buffer contains page_no of given space id. */
/* Find if double write buffer contains page_no of given space id. */
const
byte
*
page
=
recv_sys
->
dblwr
.
find_page
(
m_space_id
,
0
);
const
page_id_t
page_id
(
m_space_id
,
0
);
const
page_id_t
page_id
(
m_space_id
,
0
);
const
byte
*
page
=
recv_sys
->
dblwr
.
find_page
(
page_id
);
if
(
page
==
NULL
)
{
if
(
!
page
)
{
/* If the first page of the given user tablespace is not there
/* If the first page of the given user tablespace is not there
in the doublewrite buffer, then the recovery is going to fail
in the doublewrite buffer, then the recovery is going to fail
now. Hence this is treated as an error. */
now. Hence this is treated as an error. */
...
@@ -788,15 +788,10 @@ Datafile::restore_from_doublewrite()
...
@@ -788,15 +788,10 @@ Datafile::restore_from_doublewrite()
FSP_HEADER_OFFSET
+
FSP_SPACE_FLAGS
+
page
);
FSP_HEADER_OFFSET
+
FSP_SPACE_FLAGS
+
page
);
if
(
!
fsp_flags_is_valid
(
flags
,
m_space_id
))
{
if
(
!
fsp_flags_is_valid
(
flags
,
m_space_id
))
{
ulint
cflags
=
fsp_flags_convert_from_101
(
flags
);
flags
=
fsp_flags_convert_from_101
(
flags
);
if
(
cflags
==
ULINT_UNDEFINED
)
{
/* recv_dblwr_t::validate_page() inside find_page()
ib
::
warn
()
checked this already. */
<<
"Ignoring a doublewrite copy of page "
ut_ad
(
flags
!=
ULINT_UNDEFINED
);
<<
page_id
<<
" due to invalid flags "
<<
ib
::
hex
(
flags
);
return
(
true
);
}
flags
=
cflags
;
/* The flags on the page should be converted later. */
/* The flags on the page should be converted later. */
}
}
...
...
storage/innobase/include/log0recv.h
View file @
8bb2170d
...
@@ -181,18 +181,30 @@ struct recv_t{
...
@@ -181,18 +181,30 @@ struct recv_t{
rec_list
;
/*!< list of log records for this page */
rec_list
;
/*!< list of log records for this page */
};
};
struct
recv_dblwr_t
{
struct
recv_dblwr_t
{
/** Add a page frame to the doublewrite recovery buffer. */
/** Add a page frame to the doublewrite recovery buffer. */
void
add
(
byte
*
page
)
{
void
add
(
byte
*
page
)
{
pages
.
push_back
(
page
);
}
pages
.
push_back
(
page
);
}
/** Validate the page.
@param page_id page identifier
@param page page contents
@param space the tablespace of the page (not available for page 0)
@param tmp_buf 2*srv_page_size for decrypting and decompressing any
page_compressed or encrypted pages
@return whether the page is valid */
bool
validate_page
(
const
page_id_t
page_id
,
const
byte
*
page
,
const
fil_space_t
*
space
,
byte
*
tmp_buf
);
/** Find a doublewrite copy of a page.
/** Find a doublewrite copy of a page.
@param[in] space_id tablespace identifier
@param page_id page identifier
@param[in] page_no page number
@param space tablespace (not available for page_id.page_no()==0)
@param tmp_buf 2*srv_page_size for decrypting and decompressing any
page_compressed or encrypted pages
@return page frame
@return page frame
@retval NULL if no page was found */
@retval NULL if no valid page for page_id was found */
const
byte
*
find_page
(
ulint
space_id
,
ulint
page_no
);
byte
*
find_page
(
const
page_id_t
page_id
,
const
fil_space_t
*
space
=
NULL
,
byte
*
tmp_buf
=
NULL
);
typedef
std
::
list
<
byte
*
,
ut_allocator
<
byte
*>
>
list
;
typedef
std
::
list
<
byte
*
,
ut_allocator
<
byte
*>
>
list
;
...
...
storage/innobase/log/log0recv.cc
View file @
8bb2170d
...
@@ -57,6 +57,7 @@ Created 9/20/1997 Heikki Tuuri
...
@@ -57,6 +57,7 @@ Created 9/20/1997 Heikki Tuuri
#include "srv0start.h"
#include "srv0start.h"
#include "trx0roll.h"
#include "trx0roll.h"
#include "row0merge.h"
#include "row0merge.h"
#include "fil0pagecompress.h"
/** Log records are stored in the hash table in chunks at most of this size;
/** Log records are stored in the hash table in chunks at most of this size;
this must be less than srv_page_size as it is stored in the buffer pool */
this must be less than srv_page_size as it is stored in the buffer pool */
...
@@ -3843,6 +3844,8 @@ recv_recovery_from_checkpoint_start(lsn_t flush_lsn)
...
@@ -3843,6 +3844,8 @@ recv_recovery_from_checkpoint_start(lsn_t flush_lsn)
rescan
=
true
;
rescan
=
true
;
}
}
recv_sys
->
parse_start_lsn
=
checkpoint_lsn
;
if
(
srv_operation
==
SRV_OPERATION_NORMAL
)
{
if
(
srv_operation
==
SRV_OPERATION_NORMAL
)
{
buf_dblwr_process
();
buf_dblwr_process
();
}
}
...
@@ -4012,26 +4015,91 @@ recv_recovery_rollback_active(void)
...
@@ -4012,26 +4015,91 @@ recv_recovery_rollback_active(void)
}
}
}
}
/** Find a doublewrite copy of a page.
bool
recv_dblwr_t
::
validate_page
(
const
page_id_t
page_id
,
@param[in] space_id tablespace identifier
const
byte
*
page
,
@param[in] page_no page number
const
fil_space_t
*
space
,
@return page frame
byte
*
tmp_buf
)
@retval NULL if no page was found */
const
byte
*
recv_dblwr_t
::
find_page
(
ulint
space_id
,
ulint
page_no
)
{
{
const
byte
*
result
=
NULL
;
if
(
page_id
.
page_no
()
==
0
)
{
ulint
flags
=
fsp_header_get_flags
(
page
);
if
(
!
fsp_flags_is_valid
(
flags
,
page_id
.
space
()))
{
ulint
cflags
=
fsp_flags_convert_from_101
(
flags
);
if
(
cflags
==
ULINT_UNDEFINED
)
{
ib
::
warn
()
<<
"Ignoring a doublewrite copy of page "
<<
page_id
<<
"due to invalid flags "
<<
ib
::
hex
(
flags
);
return
false
;
}
flags
=
cflags
;
}
/* Page 0 is never page_compressed or encrypted. */
return
!
buf_page_is_corrupted
(
true
,
page
,
page_size_t
(
flags
));
}
ut_ad
(
tmp_buf
);
byte
*
tmp_frame
=
tmp_buf
;
byte
*
tmp_page
=
tmp_buf
+
srv_page_size
;
const
uint16_t
page_type
=
mach_read_from_2
(
page
+
FIL_PAGE_TYPE
);
const
page_size_t
page_size
(
space
->
flags
);
const
bool
expect_encrypted
=
space
->
crypt_data
&&
space
->
crypt_data
->
type
!=
CRYPT_SCHEME_UNENCRYPTED
;
if
(
expect_encrypted
&&
mach_read_from_4
(
page
+
FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
))
{
if
(
!
fil_space_verify_crypt_checksum
(
page
,
page_size
))
return
false
;
if
(
page_type
!=
FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED
)
return
true
;
if
(
page_size
.
is_compressed
())
return
false
;
memcpy
(
tmp_page
,
page
,
page_size
.
physical
());
if
(
!
fil_space_decrypt
(
space
,
tmp_frame
,
tmp_page
))
return
false
;
}
switch
(
page_type
)
{
case
FIL_PAGE_PAGE_COMPRESSED
:
memcpy
(
tmp_page
,
page
,
page_size
.
physical
());
/* fall through */
case
FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED
:
if
(
page_size
.
is_compressed
())
return
false
;
/* ROW_FORMAT=COMPRESSED cannot be page_compressed */
ulint
decomp
=
fil_page_decompress
(
tmp_frame
,
tmp_page
);
if
(
!
decomp
)
return
false
;
/* decompression failed */
if
(
decomp
==
srv_page_size
)
return
false
;
/* the page was not compressed (invalid page type) */
return
!
buf_page_is_corrupted
(
true
,
tmp_page
,
page_size
,
space
);
}
return
!
buf_page_is_corrupted
(
true
,
page
,
page_size
,
space
);
}
byte
*
recv_dblwr_t
::
find_page
(
const
page_id_t
page_id
,
const
fil_space_t
*
space
,
byte
*
tmp_buf
)
{
byte
*
result
=
NULL
;
lsn_t
max_lsn
=
0
;
lsn_t
max_lsn
=
0
;
for
(
list
::
const_iterator
i
=
pages
.
begin
();
i
!=
pages
.
end
();
++
i
)
for
(
list
::
const_iterator
i
=
pages
.
begin
();
i
!=
pages
.
end
();
++
i
)
{
{
const
byte
*
page
=
*
i
;
byte
*
page
=
*
i
;
if
(
page_get_page_no
(
page
)
!=
page_
no
||
if
(
page_get_page_no
(
page
)
!=
page_
id
.
page_no
()
||
page_get_space_id
(
page
)
!=
space_id
)
page_get_space_id
(
page
)
!=
page_id
.
space
()
)
continue
;
continue
;
const
lsn_t
lsn
=
mach_read_from_8
(
page
+
FIL_PAGE_LSN
);
const
lsn_t
lsn
=
mach_read_from_8
(
page
+
FIL_PAGE_LSN
);
if
(
lsn
<=
max_lsn
)
if
(
lsn
<=
max_lsn
||
!
validate_page
(
page_id
,
page
,
space
,
tmp_buf
))
{
/* Mark processed for subsequent iterations in buf_dblwr_process() */
memset
(
page
+
FIL_PAGE_LSN
,
0
,
8
);
continue
;
continue
;
}
max_lsn
=
lsn
;
max_lsn
=
lsn
;
result
=
page
;
result
=
page
;
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment