Commit 4434fb4a authored by Marko Mäkelä's avatar Marko Mäkelä

Merge 10.7 into 10.8

parents ea948953 92a4e76a
connection node_2;
connection node_1;
CREATE TABLE t3 (c INT) PARTITION BY RANGE (c) (PARTITION p1 VALUES LESS THAN (1000));
CREATE TABLE tp2 (c INT);
ALTER TABLE t3 CONVERT TABLE tp2 TO PARTITION p2 VALUES LESS THAN (2000);
DROP TABLE t3;
--partition=ON
\ No newline at end of file
--source include/galera_cluster.inc
--source include/have_innodb.inc
CREATE TABLE t3 (c INT) PARTITION BY RANGE (c) (PARTITION p1 VALUES LESS THAN (1000));
CREATE TABLE tp2 (c INT);
ALTER TABLE t3 CONVERT TABLE tp2 TO PARTITION p2 VALUES LESS THAN (2000);
DROP TABLE t3;
\ No newline at end of file
......@@ -1603,11 +1603,18 @@ wsrep_append_fk_parent_table(THD* thd, TABLE_LIST* tables, wsrep::key_array* key
{
bool fail= false;
TABLE_LIST *table;
TABLE_LIST *table_last_in_list;
thd->release_transactional_locks();
uint counter;
MDL_savepoint mdl_savepoint= thd->mdl_context.mdl_savepoint();
for (table_last_in_list= tables;;table_last_in_list= table_last_in_list->next_local) {
if (!table_last_in_list->next_local) {
break;
}
}
if (thd->open_temporary_tables(tables) ||
open_tables(thd, &tables, &counter, MYSQL_OPEN_FORCE_SHARED_HIGH_PRIO_MDL))
{
......@@ -1639,11 +1646,19 @@ wsrep_append_fk_parent_table(THD* thd, TABLE_LIST* tables, wsrep::key_array* key
/* close the table and release MDL locks */
close_thread_tables(thd);
thd->mdl_context.rollback_to_savepoint(mdl_savepoint);
bool invalidate_next_global= false;
for (table= tables; table; table= table->next_local)
{
table->table= NULL;
table->next_global= NULL;
table->mdl_request.ticket= NULL;
// We should invalidate `next_global` only for entries that are added
// in this function
if (table == table_last_in_list) {
invalidate_next_global= true;
}
if (invalidate_next_global) {
table->next_global= NULL;
}
}
return fail;
......
......@@ -2544,8 +2544,7 @@ fil_ibd_load(uint32_t space_id, const char *filename, fil_space_t *&space)
/* Read and validate the first page of the tablespace.
Assign a tablespace name based on the tablespace type. */
switch (file.validate_for_recovery(
static_cast<uint32_t>(space_id))) {
switch (file.validate_for_recovery()) {
os_offset_t minimum_size;
case DB_SUCCESS:
deferred_space = file.m_defer;
......
......@@ -386,11 +386,10 @@ exist and be successfully opened. We initially open it in read-only mode
because we just want to read the SpaceID. However, if the first page is
corrupt and needs to be restored from the doublewrite buffer, we will
reopen it in write mode and ry to restore that page.
@param space_id space id to validate for recovery
@retval DB_SUCCESS if tablespace is valid, DB_ERROR if not.
m_is_valid is also set true on success, else false. */
dberr_t
Datafile::validate_for_recovery(uint32_t space_id)
Datafile::validate_for_recovery()
{
dberr_t err;
......@@ -433,23 +432,12 @@ Datafile::validate_for_recovery(uint32_t space_id)
}
}
const bool empty_tablespace = (m_space_id == UINT32_MAX);
if (empty_tablespace && space_id) {
/* Set space id to find out whether
the page exist in double write buffer */
m_space_id = space_id;
if (m_space_id == UINT32_MAX) {
return DB_SUCCESS; /* empty file */
}
if (restore_from_doublewrite()) {
if (!m_defer) {
return DB_CORRUPTION;
}
if (!empty_tablespace) {
return err;
}
/* InnoDB may rebuild the file from redo log */
m_space_id = UINT32_MAX;
return DB_SUCCESS; /* empty file */
return m_defer ? err : DB_CORRUPTION;
}
/* Free the previously read first page and then re-validate. */
......@@ -777,14 +765,10 @@ Datafile::restore_from_doublewrite()
in the doublewrite buffer, then the recovery is going to fail
now. Hence this is treated as an error. */
if (!m_defer) {
ib::error()
<< "Corrupted page " << page_id
<< " of datafile '" << m_filepath
<< "' could not be found in the "
<< "doublewrite buffer.";
}
ib::error()
<< "Corrupted page " << page_id
<< " of datafile '" << m_filepath
<< "' could not be found in the doublewrite buffer.";
return(true);
}
......
......@@ -207,10 +207,9 @@ class Datafile {
However, if the first page is corrupt and needs to be restored
from the doublewrite buffer, we will reopen it in write mode and
ry to restore that page.
@param space_id space id to validate for recovery
@retval DB_SUCCESS if tablespace is valid, DB_ERROR if not.
m_is_valid is also set true on success, else false. */
dberr_t validate_for_recovery(uint32_t space_id=0)
dberr_t validate_for_recovery()
MY_ATTRIBUTE((warn_unused_result));
/** Checks the consistency of the first page of a datafile when the
......
......@@ -661,7 +661,14 @@ static struct
return;
}
else if (d->second.lsn < lsn)
{
/* Reset the old tablespace name in recovered spaces list */
recv_spaces_t::iterator it{recv_spaces.find(d->first)};
if (it != recv_spaces.end() &&
it->second.name == d->second.file_name)
it->second.name = "";
defers.erase(d++);
}
else
{
ut_ad(d->second.lsn != lsn);
......@@ -675,6 +682,10 @@ static struct
p.first->second.lsn= lsn;
p.first->second.file_name= defer.file_name;
}
/* Add the newly added defered space and change the file name */
recv_spaces_t::iterator it{recv_spaces.find(space)};
if (it != recv_spaces.end())
it->second.name = defer.file_name;
}
void remove(uint32_t space)
......@@ -787,6 +798,62 @@ static struct
space->size_in_header= size;
return space;
}
/** Attempt to recover pages from the doublewrite buffer.
This is invoked if we found neither a valid first page in the
data file nor redo log records that would initialize the first
page. */
void deferred_dblwr()
{
for (auto d= defers.begin(); d != defers.end(); )
{
if (d->second.deleted)
{
next_item:
d++;
continue;
}
const page_id_t page_id{d->first, 0};
const byte *page= recv_sys.dblwr.find_page(page_id);
if (!page)
goto next_item;
const uint32_t space_id= mach_read_from_4(page + FIL_PAGE_SPACE_ID);
const uint32_t flags= fsp_header_get_flags(page);
const uint32_t page_no= mach_read_from_4(page + FIL_PAGE_OFFSET);
const uint32_t size= fsp_header_get_field(page, FSP_SIZE);
if (page_no == 0 && space_id == d->first && size >= 4 &&
fil_space_t::is_valid_flags(flags, space_id) &&
fil_space_t::logical_size(flags) == srv_page_size)
{
recv_spaces_t::iterator it {recv_spaces.find(d->first)};
ut_ad(it != recv_spaces.end());
fil_space_t *space= create(
it, d->second.file_name.c_str(), flags,
fil_space_read_crypt_data(fil_space_t::zip_size(flags), page),
size);
space->free_limit= fsp_header_get_field(page, FSP_FREE_LIMIT);
space->free_len= flst_get_len(FSP_HEADER_OFFSET + FSP_FREE + page);
fil_node_t *node= UT_LIST_GET_FIRST(space->chain);
if (!space->acquire())
goto next_item;
if (os_file_write(IORequestWrite, node->name, node->handle,
page, 0, fil_space_t::physical_size(flags) !=
DB_SUCCESS))
{
space->release();
goto next_item;
}
space->release();
it->second.space= space;
defers.erase(d++);
continue;
}
goto next_item;
}
}
}
deferred_spaces;
......@@ -860,6 +927,7 @@ bool recv_sys_t::recover_deferred(recv_sys_t::map::iterator &p,
}
node->deferred= false;
space->release();
it->second.space= space;
return false;
}
......@@ -4107,6 +4175,7 @@ recv_recovery_from_checkpoint_start(lsn_t flush_lsn)
recv_sys.parse_start_lsn = checkpoint_lsn;
if (srv_operation == SRV_OPERATION_NORMAL) {
deferred_spaces.deferred_dblwr();
buf_dblwr.recover();
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment