Commit 3d7915f0 authored by Marko Mäkelä's avatar Marko Mäkelä

Merge 10.1 into 10.2

parents 82aeb6b5 4629db0d
......@@ -35,6 +35,8 @@
# define MEM_CHECK_DEFINED(a,len) VALGRIND_CHECK_MEM_IS_DEFINED(a,len)
#elif defined(__SANITIZE_ADDRESS__)
# include <sanitizer/asan_interface.h>
/* How to do manual poisoning:
https://github.com/google/sanitizers/wiki/AddressSanitizerManualPoisoning */
# define MEM_UNDEFINED(a,len) ASAN_UNPOISON_MEMORY_REGION(a,len)
# define MEM_NOACCESS(a,len) ASAN_POISON_MEMORY_REGION(a,len)
# define MEM_CHECK_ADDRESSABLE(a,len) ((void) 0)
......
......@@ -721,6 +721,20 @@ SELECT * FROM t1 JOIN t2 ON c1 = c2 HAVING c2 > 'a' ORDER BY c2 LIMIT 1;
c1 c2
x x
DROP TABLE t1,t2;
#
# MDEV-6736: Valgrind warnings 'Invalid read' in subselect_engine::calc_const_tables with SQ
# in WHERE and HAVING, ORDER BY, materialization+semijoin
#
CREATE TABLE t1 (a INT) ENGINE=MyISAM;
INSERT INTO t1 VALUES (3),(8);
CREATE TABLE t2 (b INT) ENGINE=MyISAM;
INSERT INTO t2 VALUES (2),(1);
SELECT a FROM t1
WHERE 9 IN ( SELECT MIN( a ) FROM t1 )
HAVING a <> ( SELECT COUNT(*) FROM t2 )
ORDER BY a;
a
DROP TABLE t1,t2;
End of 10.0 tests
#
# MDEV-10716: Assertion `real_type() != FIELD_ITEM' failed in
......
......@@ -1215,7 +1215,7 @@ c1 c2
838:59:59 838:59:59
UPDATE IGNORE t1 SET t1.c2='99999.99999' WHERE c1 BETWEEN 080000 AND 100000;
Warnings:
Warning 1265 Data truncated for column 'c2' at row 1
Warning 1265 Data truncated for column 'c2' at row N
SELECT * FROM t1;
c1 c2
-12:12:12 12:12:12
......
......@@ -172,6 +172,7 @@ SELECT * FROM t1;
# Update using range
# EXPLAIN SELECT * FROM t1 WHERE c1 BETWEEN 080000 AND 100000;
--replace_regex /(Data truncated for column 'c2' at row) [1-9][0-9]*/\1 N/
UPDATE IGNORE t1 SET t1.c2='99999.99999' WHERE c1 BETWEEN 080000 AND 100000;
--sorted_result
SELECT * FROM t1;
......
......@@ -56,3 +56,4 @@ galera_ist_progress: MDEV-15236 galera_ist_progress fails when trying to read tr
galera_gtid : MDEV-13549 Galera test failures 10.1
galera_gtid_slave : MDEV-13549 Galera test failures 10.1
galera_unicode_identifiers : MDEV-13549 Galera test failures 10.1
galera.galera_gcs_fc_limit : MDEV-13549 Galera test failures 10.1
......@@ -23,7 +23,7 @@ SELECT * FROM t1;
SET GLOBAL wsrep_reject_queries = ALL_KILL;
--connection node_1a
--error ER_CONNECTION_KILLED,2013
--error ER_CONNECTION_KILLED,2013,2006
SELECT * FROM t1;
--connect node_1b, 127.0.0.1, root, , test, $NODE_MYPORT_1
......
......@@ -41,6 +41,9 @@ SHOW TABLE STATUS LIKE 'tab';
Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment
tab InnoDB # Compact # # # # # # NULL # NULL NULL latin1_swedish_ci NULL
ALTER TABLE tab DISCARD TABLESPACE;
call mtr.add_suppression("InnoDB: Tried to read .* bytes at offset 0");
ALTER TABLE tab IMPORT TABLESPACE;
ERROR HY000: Internal error: Cannot reset LSNs in table `test`.`tab` : I/O error
ALTER TABLE tab IMPORT TABLESPACE;
SELECT * FROM tab;
a
......
......@@ -81,7 +81,14 @@ SHOW TABLE STATUS LIKE 'tab';
ALTER TABLE tab DISCARD TABLESPACE;
# Move the *ibd,*.cfg file into orginal location
--copy_file $MYSQLD_DATADIR/tab.cfg $MYSQLD_DATADIR/test/tab.ibd
--move_file $MYSQLD_DATADIR/tab.cfg $MYSQLD_DATADIR/test/tab.cfg
call mtr.add_suppression("InnoDB: Tried to read .* bytes at offset 0");
--error ER_INTERNAL_ERROR
ALTER TABLE tab IMPORT TABLESPACE;
--remove_file $MYSQLD_DATADIR/test/tab.ibd
--move_file $MYSQLD_DATADIR/tab.ibd $MYSQLD_DATADIR/test/tab.ibd
# Check import is successful (because same row_format)
......
install plugin DISKS soname 'disks';
show create table information_schema.disks;
Table Create Table
DISKS CREATE TEMPORARY TABLE `DISKS` (
`Disk` varchar(4096) NOT NULL DEFAULT '',
`Path` varchar(4096) NOT NULL DEFAULT '',
`Total` int(32) NOT NULL DEFAULT 0,
`Used` int(32) NOT NULL DEFAULT 0,
`Available` int(32) NOT NULL DEFAULT 0
) ENGINE=MEMORY DEFAULT CHARSET=utf8
select sum(Total) > sum(Available), sum(Total)>sum(Used) from information_schema.disks;
sum(Total) > sum(Available) sum(Total)>sum(Used)
1 1
uninstall plugin DISKS;
--source include/not_windows.inc
if (!$DISKS_SO) {
skip No DISKS plugin;
}
install plugin DISKS soname 'disks';
show create table information_schema.disks;
select sum(Total) > sum(Available), sum(Total)>sum(Used) from information_schema.disks;
uninstall plugin DISKS;
......@@ -759,6 +759,24 @@ SELECT * FROM t1 JOIN t2 ON c1 = c2 HAVING c2 > 'a' ORDER BY c2 LIMIT 1;
DROP TABLE t1,t2;
--echo #
--echo # MDEV-6736: Valgrind warnings 'Invalid read' in subselect_engine::calc_const_tables with SQ
--echo # in WHERE and HAVING, ORDER BY, materialization+semijoin
--echo #
CREATE TABLE t1 (a INT) ENGINE=MyISAM;
INSERT INTO t1 VALUES (3),(8);
CREATE TABLE t2 (b INT) ENGINE=MyISAM;
INSERT INTO t2 VALUES (2),(1);
SELECT a FROM t1
WHERE 9 IN ( SELECT MIN( a ) FROM t1 )
HAVING a <> ( SELECT COUNT(*) FROM t2 )
ORDER BY a;
DROP TABLE t1,t2;
--echo End of 10.0 tests
--echo #
......
IF(NOT WIN32)
INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/sql)
MYSQL_ADD_PLUGIN(DISKS information_schema_disks.cc MODULE_ONLY RECOMPILE_FOR_EMBEDDED)
ENDIF()
Information Schema Disks
------------------------
This is a proof-of-concept information schema plugin that allows the
disk space situation to be monitored. When installed, it can be used
as follows:
> select * from information_schema.disks;
+-----------+-----------------------+-----------+----------+-----------+
| Disk | Path | Total | Used | Available |
+-----------+-----------------------+-----------+----------+-----------+
| /dev/sda3 | / | 47929956 | 30666304 | 14805864 |
| /dev/sda1 | /boot/efi | 191551 | 3461 | 188090 |
| /dev/sda4 | /home | 174679768 | 80335392 | 85448120 |
| /dev/sdb1 | /mnt/hdd | 961301832 | 83764 | 912363644 |
| /dev/sdb1 | /home/wikman/Music | 961301832 | 83764 | 912363644 |
| /dev/sdb1 | /home/wikman/Videos | 961301832 | 83764 | 912363644 |
| /dev/sdb1 | /home/wikman/hdd | 961301832 | 83764 | 912363644 |
| /dev/sdb1 | /home/wikman/Pictures | 961301832 | 83764 | 912363644 |
| /dev/sda3 | /var/lib/docker/aufs | 47929956 | 30666304 | 14805864 |
+-----------+-----------------------+-----------+----------+-----------+
9 rows in set (0.00 sec)
- 'Disk' is the name of the disk itself.
- 'Path' is the mount point of the disk.
- 'Total' is the total space in KiB.
- 'Used' is the used amount of space in KiB, and
- 'Available' is the amount of space in KiB available to non-root users.
Note that as the amount of space available to root may be more that what
is available to non-root users, 'available' + 'used' may be less than 'total'.
All paths to which a particular disk has been mounted are reported. The
rationale is that someone might want to take different action e.g. depending
on which disk is relevant for a particular path. This leads to the same disk
being reported multiple times. An alternative to this would be to have two
tables; disks and mounts.
> select * from information_schema.disks;
+-----------+-----------+----------+-----------+
| Disk | Total | Used | Available |
+-----------+-----------+----------+-----------+
| /dev/sda3 | 47929956 | 30666304 | 14805864 |
| /dev/sda1 | 191551 | 3461 | 188090 |
| /dev/sda4 | 174679768 | 80335392 | 85448120 |
| /dev/sdb1 | 961301832 | 83764 | 912363644 |
+-----------+-----------+----------+-----------+
> select * from information_schema.mounts;
+-----------------------+-----------+
| Path | Disk |
+-----------------------+-----------+
| / | /dev/sda3 |
| /boot/efi | /dev/sda1 |
| /home | /dev/sda4 |
| /mnt/hdd | /dev/sdb1 |
| /home/wikman/Music | /dev/sdb1 |
...
Building
--------
- Ensure that the directory information_schema_disks is in the top-level
directory of the server.
- Add
ADD_SUBDIRECTORY(information_schema_disks)
to the top-level CMakeLists.txt
> Invoke make
$ make
Installation
------------
- Copy information_schema_disks/libinformation_schema_disks.so to the plugin
directory of the server:
$ cd information_schema_disks
$ sudo cp libinformation_schema_disks.so plugin-directory-of-server
- Using mysql, install the plugin:
MariaDB [(none)]> install plugin disks soname 'libinformation_schema_disks.so';
Usage
-----
The plugin appears as the table 'disks' in 'information_schema'.
MariaDB [(none)]> select * from information_schema.disks;
+-----------+-----------------------+-----------+----------+-----------+
| Disk | Path | Total | Used | Available |
+-----------+-----------------------+-----------+----------+-----------+
| /dev/sda3 | / | 47929956 | 30666308 | 14805860 |
| /dev/sda1 | /boot/efi | 191551 | 3461 | 188090 |
| /dev/sda4 | /home | 174679768 | 80348148 | 85435364 |
| /dev/sdb1 | /mnt/hdd | 961301832 | 83764 | 912363644 |
| /dev/sdb1 | /home/wikman/Music | 961301832 | 83764 | 912363644 |
| /dev/sdb1 | /home/wikman/Videos | 961301832 | 83764 | 912363644 |
...
/*
Copyright (c) 2017, MariaDB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
#include <sys/statvfs.h>
#include <sys/types.h>
#include <mntent.h>
#include <sql_class.h>
#include <table.h>
#include <innodb_priv.h>
namespace
{
struct st_mysql_information_schema disks_table_info = { MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION };
ST_FIELD_INFO disks_table_fields[]=
{
{ "Disk", PATH_MAX, MYSQL_TYPE_STRING, 0, 0 ,0, 0 },
{ "Path", PATH_MAX, MYSQL_TYPE_STRING, 0, 0 ,0, 0 },
{ "Total", 32, MYSQL_TYPE_LONG, 0, 0 ,0 ,0 }, // Total amount available
{ "Used", 32, MYSQL_TYPE_LONG, 0, 0 ,0 ,0 }, // Amount of space used
{ "Available", 32, MYSQL_TYPE_LONG, 0, 0 ,0 ,0 }, // Amount available to users other than root.
{ 0, 0, MYSQL_TYPE_NULL, 0, 0, 0, 0 }
};
int disks_table_add_row(THD* pThd,
TABLE* pTable,
const char* zDisk,
const char* zPath,
const struct statvfs& info)
{
// From: http://pubs.opengroup.org/onlinepubs/009695399/basedefs/sys/statvfs.h.html
//
// f_frsize Fundamental file system block size.
// f_blocks Total number of blocks on file system in units of f_frsize.
// f_bfree Total number of free blocks.
// f_bavail Number of free blocks available to non-privileged process.
size_t total = (info.f_frsize * info.f_blocks) / 1024;
size_t used = (info.f_frsize * (info.f_blocks - info.f_bfree)) / 1024;
size_t avail = (info.f_frsize * info.f_bavail) / 1024;
pTable->field[0]->store(zDisk, strlen(zDisk), system_charset_info);
pTable->field[1]->store(zPath, strlen(zPath), system_charset_info);
pTable->field[2]->store(total);
pTable->field[3]->store(used);
pTable->field[4]->store(avail);
// 0 means success.
return (schema_table_store_record(pThd, pTable) != 0) ? 1 : 0;
}
int disks_table_add_row(THD* pThd, TABLE* pTable, const char* zDisk, const char* zPath)
{
int rv = 0;
struct statvfs info;
if (statvfs(zPath, &info) == 0) // We ignore failures.
{
rv = disks_table_add_row(pThd, pTable, zDisk, zPath, info);
}
return rv;
}
int disks_fill_table(THD* pThd, TABLE_LIST* pTables, Item* pCond)
{
int rv = 1;
TABLE* pTable = pTables->table;
FILE* pFile = setmntent("/etc/mtab", "r");
if (pFile)
{
const size_t BUFFER_SIZE = 4096; // 4K should be sufficient.
char* pBuffer = new (std::nothrow) char [BUFFER_SIZE];
if (pBuffer)
{
rv = 0;
struct mntent ent;
struct mntent* pEnt;
while ((rv == 0) && (pEnt = getmntent_r(pFile, &ent, pBuffer, BUFFER_SIZE)))
{
// We only report the ones that refer to physical disks.
if (pEnt->mnt_fsname[0] == '/')
{
rv = disks_table_add_row(pThd, pTable, pEnt->mnt_fsname, pEnt->mnt_dir);
}
}
delete [] pBuffer;
}
else
{
rv = 1;
}
endmntent(pFile);
}
return rv;
}
int disks_table_init(void *ptr)
{
ST_SCHEMA_TABLE* pSchema_table = (ST_SCHEMA_TABLE*)ptr;
pSchema_table->fields_info = disks_table_fields;
pSchema_table->fill_table = disks_fill_table;
return 0;
}
}
extern "C"
{
mysql_declare_plugin(disks_library)
{
MYSQL_INFORMATION_SCHEMA_PLUGIN,
&disks_table_info, /* type-specific descriptor */
"DISKS", /* table name */
"MariaDB", /* author */
"Disk space information", /* description */
PLUGIN_LICENSE_GPL, /* license type */
disks_table_init, /* init function */
NULL,
0x0100, /* version = 1.0 */
NULL, /* no status variables */
NULL, /* no system variables */
NULL, /* no reserved information */
0 /* no flags */
}
mysql_declare_plugin_end;
}
......@@ -24,9 +24,7 @@ The tablespace memory cache
Created 10/25/1995 Heikki Tuuri
*******************************************************/
#include "ha_prototypes.h"
#include "fil0pagecompress.h"
#include "fsp0pagecompress.h"
#include "fil0fil.h"
#include "fil0crypt.h"
#include "btr0btr.h"
......@@ -5681,496 +5679,6 @@ fil_close(void)
}
}
/********************************************************************//**
Initializes a buffer control block when the buf_pool is created. */
static
void
fil_buf_block_init(
/*===============*/
buf_block_t* block, /*!< in: pointer to control block */
byte* frame) /*!< in: pointer to buffer frame */
{
UNIV_MEM_DESC(frame, UNIV_PAGE_SIZE);
block->frame = frame;
block->page.io_fix = BUF_IO_NONE;
/* There are assertions that check for this. */
block->page.buf_fix_count = 1;
block->page.state = BUF_BLOCK_READY_FOR_USE;
page_zip_des_init(&block->page.zip);
}
struct fil_iterator_t {
pfs_os_file_t file; /*!< File handle */
const char* filepath; /*!< File path name */
os_offset_t start; /*!< From where to start */
os_offset_t end; /*!< Where to stop */
os_offset_t file_size; /*!< File size in bytes */
ulint page_size; /*!< Page size */
ulint n_io_buffers; /*!< Number of pages to use
for IO */
byte* io_buffer; /*!< Buffer to use for IO */
fil_space_crypt_t *crypt_data; /*!< MariaDB Crypt data (if encrypted) */
byte* crypt_io_buffer; /*!< MariaDB IO buffer when
encrypted */
dict_table_t* table; /*!< Imported table */
};
/********************************************************************//**
TODO: This can be made parallel trivially by chunking up the file and creating
a callback per thread. Main benefit will be to use multiple CPUs for
checksums and compressed tables. We have to do compressed tables block by
block right now. Secondly we need to decompress/compress and copy too much
of data. These are CPU intensive.
Iterate over all the pages in the tablespace.
@param iter Tablespace iterator
@param block block to use for IO
@param callback Callback to inspect and update page contents
@retval DB_SUCCESS or error code */
static
dberr_t
fil_iterate(
/*========*/
const fil_iterator_t& iter,
buf_block_t* block,
PageCallback& callback)
{
os_offset_t offset;
ulint page_no = 0;
ulint space_id = callback.get_space_id();
ulint n_bytes = iter.n_io_buffers * iter.page_size;
ut_ad(!srv_read_only_mode);
/* TODO: For compressed tables we do a lot of useless
copying for non-index pages. Unfortunately, it is
required by buf_zip_decompress() */
const bool row_compressed
= callback.get_page_size().is_compressed();
for (offset = iter.start; offset < iter.end; offset += n_bytes) {
byte* io_buffer = iter.io_buffer;
block->frame = io_buffer;
if (row_compressed) {
page_zip_des_init(&block->page.zip);
page_zip_set_size(&block->page.zip, iter.page_size);
block->page.size.copy_from(
page_size_t(iter.page_size,
univ_page_size.logical(),
true));
block->page.zip.data = block->frame + UNIV_PAGE_SIZE;
ut_d(block->page.zip.m_external = true);
ut_ad(iter.page_size
== callback.get_page_size().physical());
/* Zip IO is done in the compressed page buffer. */
io_buffer = block->page.zip.data;
}
/* We have to read the exact number of bytes. Otherwise the
InnoDB IO functions croak on failed reads. */
n_bytes = static_cast<ulint>(
ut_min(static_cast<os_offset_t>(n_bytes),
iter.end - offset));
ut_ad(n_bytes > 0);
ut_ad(!(n_bytes % iter.page_size));
const bool encrypted = iter.crypt_data != NULL
&& iter.crypt_data->should_encrypt();
/* Use additional crypt io buffer if tablespace is encrypted */
byte* const readptr = encrypted
? iter.crypt_io_buffer : io_buffer;
byte* const writeptr = readptr;
IORequest read_request(IORequest::READ);
dberr_t err = os_file_read(
read_request, iter.file, readptr, offset,
(ulint) n_bytes);
if (err != DB_SUCCESS) {
ib::error() << "os_file_read() failed";
return(err);
}
bool updated = false;
os_offset_t page_off = offset;
ulint n_pages_read = (ulint) n_bytes / iter.page_size;
bool decrypted = false;
for (ulint i = 0; i < n_pages_read; ++i) {
ulint size = iter.page_size;
dberr_t err = DB_SUCCESS;
byte* src = readptr + (i * size);
byte* dst = io_buffer + (i * size);
bool frame_changed = false;
ulint page_type = mach_read_from_2(src+FIL_PAGE_TYPE);
const bool page_compressed
= page_type == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED
|| page_type == FIL_PAGE_PAGE_COMPRESSED;
/* If tablespace is encrypted, we need to decrypt
the page. Note that tablespaces are not in
fil_system during import. */
if (encrypted) {
decrypted = fil_space_decrypt(
iter.crypt_data,
dst, //dst
callback.get_page_size(),
src, // src
&err); // src
if (err != DB_SUCCESS) {
return(err);
}
if (decrypted) {
updated = true;
} else if (!page_compressed
&& !row_compressed) {
block->frame = src;
frame_changed = true;
} else {
memcpy(dst, src, size);
}
}
/* If the original page is page_compressed, we need
to decompress page before we can update it. */
if (page_compressed) {
fil_decompress_page(NULL, dst, ulong(size),
NULL);
updated = true;
}
buf_block_set_file_page(
block, page_id_t(space_id, page_no++));
if ((err = callback(page_off, block)) != DB_SUCCESS) {
return(err);
} else if (!updated) {
updated = buf_block_get_state(block)
== BUF_BLOCK_FILE_PAGE;
}
buf_block_set_state(block, BUF_BLOCK_NOT_USED);
buf_block_set_state(block, BUF_BLOCK_READY_FOR_USE);
/* If tablespace is encrypted we use additional
temporary scratch area where pages are read
for decrypting readptr == crypt_io_buffer != io_buffer.
Destination for decryption is a buffer pool block
block->frame == dst == io_buffer that is updated.
Pages that did not require decryption even when
tablespace is marked as encrypted are not copied
instead block->frame is set to src == readptr.
For encryption we again use temporary scratch area
writeptr != io_buffer == dst
that is then written to the tablespace
(1) For normal tables io_buffer == dst == writeptr
(2) For only page compressed tables
io_buffer == dst == writeptr
(3) For encrypted (and page compressed)
readptr != io_buffer == dst != writeptr
*/
ut_ad(!encrypted && !page_compressed ?
src == dst && dst == writeptr + (i * size):1);
ut_ad(page_compressed && !encrypted ?
src == dst && dst == writeptr + (i * size):1);
ut_ad(encrypted ?
src != dst && dst != writeptr + (i * size):1);
if (encrypted) {
memcpy(writeptr + (i * size),
row_compressed ? block->page.zip.data :
block->frame, size);
}
if (frame_changed) {
block->frame = dst;
}
src = io_buffer + (i * size);
if (page_compressed) {
ulint len = 0;
byte * res = fil_compress_page(
NULL,
src,
NULL,
size,
dict_table_page_compression_level(iter.table),
512,/* FIXME: use proper block size */
encrypted,
&len);
if (len != size) {
memset(res+len, 0, size-len);
}
updated = true;
}
/* If tablespace is encrypted, encrypt page before we
write it back. Note that we should not encrypt the
buffer that is in buffer pool. */
/* NOTE: At this stage of IMPORT the
buffer pool is not being used at all! */
if (decrypted && encrypted) {
byte *dest = writeptr + (i * size);
ulint space = mach_read_from_4(
src + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID);
ulint offset = mach_read_from_4(src + FIL_PAGE_OFFSET);
ib_uint64_t lsn = mach_read_from_8(src + FIL_PAGE_LSN);
byte* tmp = fil_encrypt_buf(
iter.crypt_data,
space,
offset,
lsn,
src,
callback.get_page_size(),
dest);
if (tmp == src) {
/* TODO: remove unnecessary memcpy's */
memcpy(dest, src, iter.page_size);
}
updated = true;
}
page_off += iter.page_size;
block->frame += iter.page_size;
}
IORequest write_request(IORequest::WRITE);
/* A page was updated in the set, write back to disk.
Note: We don't have the compression algorithm, we write
out the imported file as uncompressed. */
if (updated
&& (err = os_file_write(
write_request,
iter.filepath, iter.file, writeptr,
offset, (ulint) n_bytes)) != DB_SUCCESS) {
ib::error() << "os_file_write() failed";
return(err);
}
/* Clean up the temporal buffer. */
memset(writeptr, 0, n_bytes);
}
return(DB_SUCCESS);
}
/********************************************************************//**
Iterate over all the pages in the tablespace.
@param table the table definiton in the server
@param n_io_buffers number of blocks to read and write together
@param callback functor that will do the page updates
@return DB_SUCCESS or error code */
dberr_t
fil_tablespace_iterate(
/*===================*/
dict_table_t* table,
ulint n_io_buffers,
PageCallback& callback)
{
dberr_t err;
pfs_os_file_t file;
char* filepath;
bool success;
ut_a(n_io_buffers > 0);
ut_ad(!srv_read_only_mode);
DBUG_EXECUTE_IF("ib_import_trigger_corruption_1",
return(DB_CORRUPTION););
/* Make sure the data_dir_path is set. */
dict_get_and_save_data_dir_path(table, false);
if (DICT_TF_HAS_DATA_DIR(table->flags)) {
ut_a(table->data_dir_path);
filepath = fil_make_filepath(
table->data_dir_path, table->name.m_name, IBD, true);
} else {
filepath = fil_make_filepath(
NULL, table->name.m_name, IBD, false);
}
if (filepath == NULL) {
return(DB_OUT_OF_MEMORY);
}
file = os_file_create_simple_no_error_handling(
innodb_data_file_key, filepath,
OS_FILE_OPEN, OS_FILE_READ_WRITE, srv_read_only_mode, &success);
DBUG_EXECUTE_IF("fil_tablespace_iterate_failure",
{
static bool once;
if (!once || ut_rnd_interval(0, 10) == 5) {
once = true;
success = false;
os_file_close(file);
}
});
if (!success) {
/* The following call prints an error message */
os_file_get_last_error(true);
ib::error() << "Trying to import a tablespace, but could not"
" open the tablespace file " << filepath;
ut_free(filepath);
return(DB_TABLESPACE_NOT_FOUND);
} else {
err = DB_SUCCESS;
}
callback.set_file(filepath, file);
os_offset_t file_size = os_file_get_size(file);
ut_a(file_size != (os_offset_t) -1);
/* The block we will use for every physical page */
buf_block_t* block;
block = reinterpret_cast<buf_block_t*>(ut_zalloc_nokey(sizeof(*block)));
mutex_create(LATCH_ID_BUF_BLOCK_MUTEX, &block->mutex);
/* Allocate a page to read in the tablespace header, so that we
can determine the page size and zip size (if it is compressed).
We allocate an extra page in case it is a compressed table. One
page is to ensure alignement. */
void* page_ptr = ut_malloc_nokey(3 * UNIV_PAGE_SIZE);
byte* page = static_cast<byte*>(ut_align(page_ptr, UNIV_PAGE_SIZE));
fil_buf_block_init(block, page);
/* Read the first page and determine the page and zip size. */
IORequest request(IORequest::READ);
err = os_file_read(request, file, page, 0, UNIV_PAGE_SIZE);
if (err != DB_SUCCESS) {
err = DB_IO_ERROR;
} else if ((err = callback.init(file_size, block)) == DB_SUCCESS) {
fil_iterator_t iter;
iter.file = file;
iter.start = 0;
iter.end = file_size;
iter.filepath = filepath;
iter.file_size = file_size;
iter.n_io_buffers = n_io_buffers;
iter.page_size = callback.get_page_size().physical();
iter.table = table;
/* read (optional) crypt data */
iter.crypt_data = fil_space_read_crypt_data(
callback.get_page_size(), page);
if (err == DB_SUCCESS) {
/* Compressed pages can't be optimised for block IO
for now. We do the IMPORT page by page. */
if (callback.get_page_size().is_compressed()) {
iter.n_io_buffers = 1;
ut_a(iter.page_size
== callback.get_page_size().physical());
}
/** Add an extra page for compressed page scratch
area. */
void* io_buffer = ut_malloc_nokey(
(2 + iter.n_io_buffers) * UNIV_PAGE_SIZE);
iter.io_buffer = static_cast<byte*>(
ut_align(io_buffer, UNIV_PAGE_SIZE));
void* crypt_io_buffer;
if (iter.crypt_data) {
crypt_io_buffer = static_cast<byte*>(
ut_malloc_nokey((2 + iter.n_io_buffers)
* UNIV_PAGE_SIZE));
iter.crypt_io_buffer = static_cast<byte*>(
ut_align(crypt_io_buffer,
UNIV_PAGE_SIZE));
} else {
crypt_io_buffer = NULL;
}
err = fil_iterate(iter, block, callback);
if (iter.crypt_data) {
fil_space_destroy_crypt_data(&iter.crypt_data);
}
ut_free(io_buffer);
ut_free(crypt_io_buffer);
}
}
if (err == DB_SUCCESS) {
ib::info() << "Sync to disk";
if (!os_file_flush(file)) {
ib::info() << "os_file_flush() failed!";
err = DB_IO_ERROR;
} else {
ib::info() << "Sync to disk - done!";
}
}
os_file_close(file);
ut_free(page_ptr);
ut_free(filepath);
mutex_free(&block->mutex);
ut_free(block);
return(err);
}
/********************************************************************//**
Delete the tablespace file and any related files like .cfg.
This should not be called for temporary tables.
......
......@@ -1391,90 +1391,6 @@ fil_delete_file(
/*============*/
const char* path); /*!< in: filepath of the ibd tablespace */
/** Callback functor. */
struct PageCallback {
/** Default constructor */
PageCallback()
:
m_page_size(0, 0, false),
m_filepath() UNIV_NOTHROW {}
virtual ~PageCallback() UNIV_NOTHROW {}
/** Called for page 0 in the tablespace file at the start.
@param file_size size of the file in bytes
@param block contents of the first page in the tablespace file
@retval DB_SUCCESS or error code. */
virtual dberr_t init(
os_offset_t file_size,
const buf_block_t* block) UNIV_NOTHROW = 0;
/** Called for every page in the tablespace. If the page was not
updated then its state must be set to BUF_PAGE_NOT_USED. For
compressed tables the page descriptor memory will be at offset:
block->frame + UNIV_PAGE_SIZE;
@param offset physical offset within the file
@param block block read from file, note it is not from the buffer pool
@retval DB_SUCCESS or error code. */
virtual dberr_t operator()(
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW = 0;
/** Set the name of the physical file and the file handle that is used
to open it for the file that is being iterated over.
@param filename the name of the tablespace file
@param file OS file handle */
void set_file(const char* filename, pfs_os_file_t file) UNIV_NOTHROW
{
m_file = file;
m_filepath = filename;
}
/**
@return the space id of the tablespace */
virtual ulint get_space_id() const UNIV_NOTHROW = 0;
/**
@retval the space flags of the tablespace being iterated over */
virtual ulint get_space_flags() const UNIV_NOTHROW = 0;
/** The compressed page size
@return the compressed page size */
const page_size_t& get_page_size() const
{
return(m_page_size);
}
/** The tablespace page size. */
page_size_t m_page_size;
/** File handle to the tablespace */
pfs_os_file_t m_file;
/** Physical file path. */
const char* m_filepath;
protected:
// Disable copying
PageCallback(const PageCallback&);
PageCallback& operator=(const PageCallback&);
};
/********************************************************************//**
Iterate over all the pages in the tablespace.
@param table the table definiton in the server
@param n_io_buffers number of blocks to read and write together
@param callback functor that will do the page updates
@return DB_SUCCESS or error code */
dberr_t
fil_tablespace_iterate(
/*===================*/
dict_table_t* table,
ulint n_io_buffers,
PageCallback& callback)
MY_ATTRIBUTE((warn_unused_result));
/********************************************************************//**
Looks for a pre-existing fil_space_t with the given tablespace ID
and, if found, returns the name and filepath in newly allocated buffers that the caller must free.
......
......@@ -348,6 +348,11 @@ mem_heap_create_block_func(
heap->total_size += len;
}
/* Poison all available memory. Individual chunks will be unpoisoned on
every mem_heap_alloc() call. */
compile_time_assert(MEM_BLOCK_HEADER_SIZE >= sizeof *block);
UNIV_MEM_FREE(block + 1, len - sizeof *block);
ut_ad((ulint)MEM_BLOCK_HEADER_SIZE < len);
return(block);
......
......@@ -701,28 +701,50 @@ static
bool
os_aio_validate();
/** Handle errors for file operations.
@param[in] name name of a file or NULL
@param[in] operation operation
@param[in] should_abort whether to abort on an unknown error
@param[in] on_error_silent whether to suppress reports of non-fatal errors
@return true if we should retry the operation */
static MY_ATTRIBUTE((warn_unused_result))
bool
os_file_handle_error_cond_exit(
const char* name,
const char* operation,
bool should_abort,
bool on_error_silent);
/** Does error handling when a file operation fails.
@param[in] name File name or NULL
@param[in] operation Name of operation e.g., "read", "write"
@param[in] name name of a file or NULL
@param[in] operation operation name that failed
@return true if we should retry the operation */
static
bool
os_file_handle_error(
const char* name,
const char* operation);
const char* operation)
{
/* Exit in case of unknown error */
return(os_file_handle_error_cond_exit(name, operation, true, false));
}
/**
Does error handling when a file operation fails.
@param[in] name File name or NULL
@param[in] operation Name of operation e.g., "read", "write"
@param[in] silent if true then don't print any message to the log.
/** Does error handling when a file operation fails.
@param[in] name name of a file or NULL
@param[in] operation operation name that failed
@param[in] on_error_silent if true then don't print any message to the log.
@return true if we should retry the operation */
static
bool
os_file_handle_error_no_exit(
const char* name,
const char* operation,
bool silent);
bool on_error_silent)
{
/* Don't exit in case of unknown error */
return(os_file_handle_error_cond_exit(
name, operation, false, on_error_silent));
}
/** Does simulated AIO. This function should be called by an i/o-handler
thread.
......@@ -5077,52 +5099,31 @@ os_file_read_page(
ut_ad(type.validate());
ut_ad(n > 0);
for (;;) {
ssize_t n_bytes;
ssize_t n_bytes = os_file_pread(type, file, buf, n, offset, &err);
n_bytes = os_file_pread(type, file, buf, n, offset, &err);
if (o != NULL) {
if (o) {
*o = n_bytes;
}
if (err != DB_SUCCESS && !exit_on_err) {
return(err);
} else if ((ulint) n_bytes == n) {
return(DB_SUCCESS);
}
ib::error() << "Tried to read " << n
<< " bytes at offset " << offset
<< ", but was only able to read " << n_bytes;
if (exit_on_err) {
if (!os_file_handle_error(NULL, "read")) {
/* Hard error */
break;
}
} else if (!os_file_handle_error_no_exit(NULL, "read", false)) {
/* Hard error */
break;
if (ulint(n_bytes) == n || (err != DB_SUCCESS && !exit_on_err)) {
return err;
}
if (n_bytes > 0 && (ulint) n_bytes < n) {
n -= (ulint) n_bytes;
offset += (ulint) n_bytes;
buf = reinterpret_cast<uchar*>(buf) + (ulint) n_bytes;
}
}
ib::error() << "Tried to read " << n << " bytes at offset "
<< offset << ", but was only able to read " << n_bytes;
if (!os_file_handle_error_cond_exit(
NULL, "read", exit_on_err, false)) {
ib::fatal()
<< "Cannot read from file. OS error number "
<< errno << ".";
}
return(err);
if (err == DB_SUCCESS) {
err = DB_IO_ERROR;
}
return err;
}
/** Retrieves the last error number if an error occurs in a file io function.
......@@ -5228,37 +5229,6 @@ os_file_handle_error_cond_exit(
return(false);
}
/** Does error handling when a file operation fails.
@param[in] name name of a file or NULL
@param[in] operation operation name that failed
@return true if we should retry the operation */
static
bool
os_file_handle_error(
const char* name,
const char* operation)
{
/* Exit in case of unknown error */
return(os_file_handle_error_cond_exit(name, operation, true, false));
}
/** Does error handling when a file operation fails.
@param[in] name name of a file or NULL
@param[in] operation operation name that failed
@param[in] on_error_silent if true then don't print any message to the log.
@return true if we should retry the operation */
static
bool
os_file_handle_error_no_exit(
const char* name,
const char* operation,
bool on_error_silent)
{
/* Don't exit in case of unknown error */
return(os_file_handle_error_cond_exit(
name, operation, false, on_error_silent));
}
#ifndef _WIN32
/** Tries to disable OS caching on an opened file descriptor.
@param[in] fd file descriptor to alter
......
......@@ -37,6 +37,7 @@ Created 2012-02-08 by Sunny Bains.
#include "row0mysql.h"
#include "srv0start.h"
#include "row0quiesce.h"
#include "fil0pagecompress.h"
#include "ut0new.h"
#include <vector>
......@@ -45,12 +46,10 @@ Created 2012-02-08 by Sunny Bains.
#include <my_aes.h>
#endif
/** The size of the buffer to use for IO. Note: os_file_read() doesn't expect
reads to fail. If you set the buffer size to be greater than a multiple of the
file size then it will assert. TODO: Fix this limitation of the IO functions.
@param n page size of the tablespace.
@retval number of pages */
#define IO_BUFFER_SIZE(m, n) ((m) / (n))
/** The size of the buffer to use for IO.
@param n physical page size
@return number of pages */
#define IO_BUFFER_SIZE(n) ((1024 * 1024) / n)
/** For gathering stats on records during phase I */
struct row_stats_t {
......@@ -346,12 +345,14 @@ class IndexPurge {
/** Functor that is called for each physical page that is read from the
tablespace file. */
class AbstractCallback : public PageCallback {
class AbstractCallback
{
public:
/** Constructor
@param trx covering transaction */
AbstractCallback(trx_t* trx)
:
m_page_size(0, 0, false),
m_trx(trx),
m_space(ULINT_UNDEFINED),
m_xdes(),
......@@ -384,31 +385,50 @@ class AbstractCallback : public PageCallback {
return(m_space_flags);
}
protected:
/** Get the data page depending on the table type, compressed or not.
@param block block read from disk
@retval the buffer frame */
buf_frame_t* get_frame(buf_block_t* block) const UNIV_NOTHROW
/**
Set the name of the physical file and the file handle that is used
to open it for the file that is being iterated over.
@param filename the physical name of the tablespace file
@param file OS file handle */
void set_file(const char* filename, pfs_os_file_t file) UNIV_NOTHROW
{
if (is_compressed_table()) {
return(block->page.zip.data);
m_file = file;
m_filepath = filename;
}
return(buf_block_get_frame(block));
}
const page_size_t& get_page_size() const { return m_page_size; }
/** Check for session interrupt. If required we could
even flush to disk here every N pages.
@retval DB_SUCCESS or error code */
dberr_t periodic_check() UNIV_NOTHROW
{
if (trx_is_interrupted(m_trx)) {
return(DB_INTERRUPTED);
}
const char* filename() const { return m_filepath; }
return(DB_SUCCESS);
/**
Called for every page in the tablespace. If the page was not
updated then its state must be set to BUF_PAGE_NOT_USED. For
compressed tables the page descriptor memory will be at offset:
block->frame + UNIV_PAGE_SIZE;
@param offset - physical offset within the file
@param block - block read from file, note it is not from the buffer pool
@retval DB_SUCCESS or error code. */
virtual dberr_t operator()(
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW = 0;
/**
@return the space id of the tablespace */
virtual ulint get_space_id() const UNIV_NOTHROW = 0;
bool is_interrupted() const { return trx_is_interrupted(m_trx); }
/**
Get the data page depending on the table type, compressed or not.
@param block - block read from disk
@retval the buffer frame */
static byte* get_frame(const buf_block_t* block)
{
return block->page.zip.data
? block->page.zip.data : block->frame;
}
protected:
/** Get the physical offset of the extent descriptor within the page.
@param page_no page number of the extent descriptor
@param page contents of the page containing the extent descriptor.
......@@ -488,6 +508,15 @@ class AbstractCallback : public PageCallback {
}
protected:
/** The tablespace page size. */
page_size_t m_page_size;
/** File handle to the tablespace */
pfs_os_file_t m_file;
/** Physical file path. */
const char* m_filepath;
/** Covering transaction. */
trx_t* m_trx;
......@@ -564,9 +593,7 @@ AbstractCallback::init(
m_free_limit = mach_read_from_4(page + FSP_FREE_LIMIT);
m_space = mach_read_from_4(page + FSP_HEADER_OFFSET + FSP_SPACE_ID);
dberr_t err = set_current_xdes(0, page);
return(err);
return set_current_xdes(0, page);
}
/**
......@@ -637,11 +664,7 @@ FetchIndexRootPages::operator() (
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW
{
dberr_t err;
if ((err = periodic_check()) != DB_SUCCESS) {
return(err);
}
if (is_interrupted()) return DB_INTERRUPTED;
const page_t* page = get_frame(block);
......@@ -654,9 +677,9 @@ FetchIndexRootPages::operator() (
<< ", file offset: "
<< (offset / m_page_size.physical());
err = DB_CORRUPTION;
return DB_CORRUPTION;
} else if (page_type == FIL_PAGE_TYPE_XDES) {
err = set_current_xdes(block->page.id.page_no(), page);
return set_current_xdes(block->page.id.page_no(), page);
} else if (fil_page_index_page_check(page)
&& !is_free(block->page.id.page_no())
&& page_is_root(page)) {
......@@ -680,7 +703,7 @@ FetchIndexRootPages::operator() (
}
}
return(err);
return DB_SUCCESS;
}
/**
......@@ -808,14 +831,6 @@ class PageConverter : public AbstractCallback {
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW;
private:
/** Status returned by PageConverter::validate() */
enum import_page_status_t {
IMPORT_PAGE_STATUS_OK, /*!< Page is OK */
IMPORT_PAGE_STATUS_ALL_ZERO, /*!< Page is all zeros */
IMPORT_PAGE_STATUS_CORRUPTED /*!< Page is corrupted */
};
/** Update the page, set the space id, max trx id and index id.
@param block block read from file
@param page_type type of the page
......@@ -824,17 +839,6 @@ class PageConverter : public AbstractCallback {
buf_block_t* block,
ulint& page_type) UNIV_NOTHROW;
#ifdef UNIV_DEBUG
/**
@return true error condition is enabled. */
bool trigger_corruption() UNIV_NOTHROW
{
return(false);
}
#else
#define trigger_corruption() (false)
#endif /* UNIV_DEBUG */
/** Update the space, index id, trx id.
@param block block to convert
@return DB_SUCCESS or error code */
......@@ -846,14 +850,6 @@ class PageConverter : public AbstractCallback {
@retval DB_SUCCESS or error code */
dberr_t update_records(buf_block_t* block) UNIV_NOTHROW;
/** Validate the page, check for corruption.
@param offset physical offset within file.
@param page page read from file.
@return 0 on success, 1 if all zero, 2 if corrupted */
import_page_status_t validate(
os_offset_t offset,
buf_block_t* page) UNIV_NOTHROW;
/** Validate the space flags and update tablespace header page.
@param block block read from file, not from the buffer pool.
@retval DB_SUCCESS or error code */
......@@ -1530,6 +1526,7 @@ IndexPurge::purge() UNIV_NOTHROW
/** Constructor
@param cfg config of table being imported.
@param trx transaction covering the import */
inline
PageConverter::PageConverter(
row_import* cfg,
trx_t* trx)
......@@ -1553,6 +1550,7 @@ PageConverter::PageConverter(
@param offsets column offsets for the record
@param i column ordinal value
@return DB_SUCCESS or error code */
inline
dberr_t
PageConverter::adjust_cluster_index_blob_column(
rec_t* rec,
......@@ -1581,13 +1579,11 @@ PageConverter::adjust_cluster_index_blob_column(
field += BTR_EXTERN_SPACE_ID - BTR_EXTERN_FIELD_REF_SIZE + len;
if (is_compressed_table()) {
mach_write_to_4(field, get_space_id());
if (m_page_zip_ptr) {
page_zip_write_blob_ptr(
m_page_zip_ptr, rec, m_cluster_index, offsets, i, 0);
} else {
mlog_write_ulint(field, get_space_id(), MLOG_4BYTES, 0);
}
return(DB_SUCCESS);
......@@ -1598,6 +1594,7 @@ stored columns.
@param rec record to update
@param offsets column offsets for the record
@return DB_SUCCESS or error code */
inline
dberr_t
PageConverter::adjust_cluster_index_blob_columns(
rec_t* rec,
......@@ -1630,6 +1627,7 @@ BLOB reference, write the new space id.
@param rec record to update
@param offsets column offsets for the record
@return DB_SUCCESS or error code */
inline
dberr_t
PageConverter::adjust_cluster_index_blob_ref(
rec_t* rec,
......@@ -1652,6 +1650,7 @@ PageConverter::adjust_cluster_index_blob_ref(
re-organising the B+tree.
@param offsets current row offsets.
@return true if purge succeeded */
inline
bool
PageConverter::purge(const ulint* offsets) UNIV_NOTHROW
{
......@@ -1674,6 +1673,7 @@ PageConverter::purge(const ulint* offsets) UNIV_NOTHROW
@param rec record to update
@param offsets column offsets for the record
@return DB_SUCCESS or error code. */
inline
dberr_t
PageConverter::adjust_cluster_record(
const dict_index_t* index,
......@@ -1700,6 +1700,7 @@ PageConverter::adjust_cluster_record(
rows that can't be purged optimistically.
@param block block to update
@retval DB_SUCCESS or error code */
inline
dberr_t
PageConverter::update_records(
buf_block_t* block) UNIV_NOTHROW
......@@ -1763,6 +1764,7 @@ PageConverter::update_records(
/** Update the space, index id, trx id.
@return DB_SUCCESS or error code */
inline
dberr_t
PageConverter::update_index_page(
buf_block_t* block) UNIV_NOTHROW
......@@ -1843,6 +1845,7 @@ PageConverter::update_index_page(
/** Validate the space flags and update tablespace header page.
@param block block read from file, not from the buffer pool.
@retval DB_SUCCESS or error code */
inline
dberr_t
PageConverter::update_header(
buf_block_t* block) UNIV_NOTHROW
......@@ -1879,6 +1882,7 @@ PageConverter::update_header(
/** Update the page, set the space id, max trx id and index id.
@param block block read from file
@retval DB_SUCCESS or error code */
inline
dberr_t
PageConverter::update_page(
buf_block_t* block,
......@@ -1886,6 +1890,14 @@ PageConverter::update_page(
{
dberr_t err = DB_SUCCESS;
ut_ad(!block->page.zip.data == !is_compressed_table());
if (block->page.zip.data) {
m_page_zip_ptr = &block->page.zip;
} else {
ut_ad(!m_page_zip_ptr);
}
switch (page_type = fil_page_get_type(get_frame(block))) {
case FIL_PAGE_TYPE_FSP_HDR:
ut_a(block->page.id.page_no() == 0);
......@@ -1940,117 +1952,41 @@ PageConverter::update_page(
return(DB_CORRUPTION);
}
/** Validate the page
@param offset physical offset within file.
@param page page read from file.
@return status */
PageConverter::import_page_status_t
PageConverter::validate(
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW
{
buf_frame_t* page = get_frame(block);
/* Check that the page number corresponds to the offset in
the file. Flag as corrupt if it doesn't. Disable the check
for LSN in buf_page_is_corrupted() */
if (buf_page_is_corrupted(
false, page, get_page_size(), NULL)
|| (page_get_page_no(page) != offset / m_page_size.physical()
&& page_get_page_no(page) != 0)) {
return(IMPORT_PAGE_STATUS_CORRUPTED);
} else if (offset > 0 && page_get_page_no(page) == 0) {
/* The page is all zero: do nothing. We already checked
for all NULs in buf_page_is_corrupted() */
return(IMPORT_PAGE_STATUS_ALL_ZERO);
}
return(IMPORT_PAGE_STATUS_OK);
}
/** Called for every page in the tablespace. If the page was not
updated then its state must be set to BUF_PAGE_NOT_USED.
@param offset physical offset within the file
@param block block read from file, note it is not from the buffer pool
@retval DB_SUCCESS or error code. */
dberr_t
PageConverter::operator() (
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW
PageConverter::operator() (os_offset_t, buf_block_t* block) UNIV_NOTHROW
{
ulint page_type;
dberr_t err = DB_SUCCESS;
if ((err = periodic_check()) != DB_SUCCESS) {
return(err);
}
if (is_compressed_table()) {
m_page_zip_ptr = &block->page.zip;
} else {
ut_ad(m_page_zip_ptr == 0);
}
switch (validate(offset, block)) {
case IMPORT_PAGE_STATUS_OK:
/* We have to decompress the compressed pages before
we can work on them */
if ((err = update_page(block, page_type)) != DB_SUCCESS) {
break;
}
/* If we already had an old page with matching number
in the buffer pool, evict it now, because
we no longer evict the pages on DISCARD TABLESPACE. */
buf_page_get_gen(block->page.id, get_page_size(),
RW_NO_LATCH, NULL, BUF_EVICT_IF_IN_POOL,
__FILE__, __LINE__, NULL, NULL);
/* Note: For compressed pages this function will write to the
zip descriptor and for uncompressed pages it will write to
page (ie. the block->frame). Therefore the caller should write
out the descriptor contents and not block->frame for compressed
pages. */
ulint page_type;
if (!is_compressed_table()
|| fil_page_type_is_index(page_type)) {
dberr_t err = update_page(block, page_type);
if (err != DB_SUCCESS) return err;
if (!block->page.zip.data) {
buf_flush_init_for_writing(
NULL, block->frame, NULL, m_current_lsn);
} else if (fil_page_type_is_index(page_type)) {
buf_flush_init_for_writing(
!is_compressed_table() ? block : NULL,
!is_compressed_table()
? block->frame : block->page.zip.data,
!is_compressed_table() ? 0 : m_page_zip_ptr,
NULL, block->page.zip.data, &block->page.zip,
m_current_lsn);
} else {
/* Calculate and update the checksum of non-btree
pages for compressed tables explicitly here. */
/* Calculate and update the checksum of non-index
pages for ROW_FORMAT=COMPRESSED tables. */
buf_flush_update_zip_checksum(
get_frame(block), get_page_size().physical(),
block->page.zip.data, get_page_size().physical(),
m_current_lsn);
}
break;
case IMPORT_PAGE_STATUS_ALL_ZERO:
/* The page is all zero: leave it as is. */
break;
case IMPORT_PAGE_STATUS_CORRUPTED:
ib::warn() << "Page " << (offset / m_page_size.physical())
<< " at offset " << offset
<< " looks corrupted in file " << m_filepath;
err = DB_CORRUPTION;
}
/* If we already had and old page with matching number
in the buffer pool, evict it now, because
we no longer evict the pages on DISCARD TABLESPACE. */
buf_page_get_gen(block->page.id, get_page_size(),
RW_NO_LATCH, NULL, BUF_EVICT_IF_IN_POOL,
__FILE__, __LINE__, NULL, NULL);
return(err);
return DB_SUCCESS;
}
/*****************************************************************//**
......@@ -3328,6 +3264,441 @@ row_import_update_discarded_flag(
return(err);
}
struct fil_iterator_t {
pfs_os_file_t file; /*!< File handle */
const char* filepath; /*!< File path name */
os_offset_t start; /*!< From where to start */
os_offset_t end; /*!< Where to stop */
os_offset_t file_size; /*!< File size in bytes */
ulint n_io_buffers; /*!< Number of pages to use
for IO */
byte* io_buffer; /*!< Buffer to use for IO */
fil_space_crypt_t *crypt_data; /*!< Crypt data (if encrypted) */
byte* crypt_io_buffer; /*!< IO buffer when encrypted */
};
/********************************************************************//**
TODO: This can be made parallel trivially by chunking up the file and creating
a callback per thread. . Main benefit will be to use multiple CPUs for
checksums and compressed tables. We have to do compressed tables block by
block right now. Secondly we need to decompress/compress and copy too much
of data. These are CPU intensive.
Iterate over all the pages in the tablespace.
@param iter - Tablespace iterator
@param block - block to use for IO
@param callback - Callback to inspect and update page contents
@retval DB_SUCCESS or error code */
static
dberr_t
fil_iterate(
/*========*/
const fil_iterator_t& iter,
buf_block_t* block,
AbstractCallback& callback)
{
os_offset_t offset;
const ulint size = callback.get_page_size().physical();
ulint n_bytes = iter.n_io_buffers * size;
ut_ad(!srv_read_only_mode);
/* TODO: For ROW_FORMAT=COMPRESSED tables we do a lot of useless
copying for non-index pages. Unfortunately, it is
required by buf_zip_decompress() */
for (offset = iter.start; offset < iter.end; offset += n_bytes) {
if (callback.is_interrupted()) {
return DB_INTERRUPTED;
}
byte* io_buffer = iter.io_buffer;
block->frame = io_buffer;
if (block->page.zip.data) {
/* Zip IO is done in the compressed page buffer. */
io_buffer = block->page.zip.data;
}
/* We have to read the exact number of bytes. Otherwise the
InnoDB IO functions croak on failed reads. */
n_bytes = ulint(ut_min(os_offset_t(n_bytes),
iter.end - offset));
ut_ad(n_bytes > 0);
ut_ad(!(n_bytes % size));
const bool encrypted = iter.crypt_data != NULL
&& iter.crypt_data->should_encrypt();
/* Use additional crypt io buffer if tablespace is encrypted */
byte* const readptr = encrypted
? iter.crypt_io_buffer : io_buffer;
byte* const writeptr = readptr;
IORequest read_request(IORequest::READ);
read_request.disable_partial_io_warnings();
dberr_t err = os_file_read_no_error_handling(
read_request, iter.file, readptr, offset, n_bytes, 0);
if (err != DB_SUCCESS) {
ib::error() << iter.filepath
<< ": os_file_read() failed";
}
bool updated = false;
os_offset_t page_off = offset;
ulint n_pages_read = n_bytes / size;
bool decrypted = false;
block->page.id.set_page_no(ulint(page_off / size));
for (ulint i = 0; i < n_pages_read;
block->page.id.set_page_no(block->page.id.page_no() + 1),
++i, page_off += size, block->frame += size) {
err = DB_SUCCESS;
byte* src = readptr + i * size;
byte* dst = io_buffer + i * size;
bool frame_changed = false;
ulint page_type = mach_read_from_2(src+FIL_PAGE_TYPE);
const bool page_compressed
= page_type
== FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED
|| page_type == FIL_PAGE_PAGE_COMPRESSED;
const ulint page_no = page_get_page_no(src);
if (!page_no && page_off) {
const ulint* b = reinterpret_cast<const ulint*>
(src);
const ulint* const e = b + size / sizeof *b;
do {
if (*b++) {
goto page_corrupted;
}
} while (b != e);
/* Proceed to the next page,
because this one is all zero. */
continue;
}
if (page_no != page_off / size) {
goto page_corrupted;
}
if (encrypted) {
decrypted = fil_space_decrypt(
iter.crypt_data, dst,
callback.get_page_size(), src, &err);
if (err != DB_SUCCESS) {
return err;
}
if (decrypted) {
updated = true;
} else {
if (!page_compressed
&& !block->page.zip.data) {
block->frame = src;
frame_changed = true;
} else {
memcpy(dst, src, size);
}
}
}
/* If the original page is page_compressed, we need
to decompress it before adjusting further. */
if (page_compressed) {
fil_decompress_page(NULL, dst, ulong(size),
NULL);
updated = true;
} else if (buf_page_is_corrupted(
false,
encrypted && !frame_changed
? dst : src,
callback.get_page_size(), NULL)) {
page_corrupted:
ib::warn() << callback.filename()
<< ": Page " << (offset / size)
<< " at offset " << offset
<< " looks corrupted.";
return DB_CORRUPTION;
}
if ((err = callback(page_off, block)) != DB_SUCCESS) {
return err;
} else if (!updated) {
updated = buf_block_get_state(block)
== BUF_BLOCK_FILE_PAGE;
}
/* If tablespace is encrypted we use additional
temporary scratch area where pages are read
for decrypting readptr == crypt_io_buffer != io_buffer.
Destination for decryption is a buffer pool block
block->frame == dst == io_buffer that is updated.
Pages that did not require decryption even when
tablespace is marked as encrypted are not copied
instead block->frame is set to src == readptr.
For encryption we again use temporary scratch area
writeptr != io_buffer == dst
that is then written to the tablespace
(1) For normal tables io_buffer == dst == writeptr
(2) For only page compressed tables
io_buffer == dst == writeptr
(3) For encrypted (and page compressed)
readptr != io_buffer == dst != writeptr
*/
ut_ad(!encrypted && !page_compressed ?
src == dst && dst == writeptr + (i * size):1);
ut_ad(page_compressed && !encrypted ?
src == dst && dst == writeptr + (i * size):1);
ut_ad(encrypted ?
src != dst && dst != writeptr + (i * size):1);
if (encrypted) {
memcpy(writeptr + (i * size),
callback.get_frame(block), size);
}
if (frame_changed) {
block->frame = dst;
}
src = io_buffer + (i * size);
if (page_compressed) {
ulint len = 0;
fil_compress_page(
NULL,
src,
NULL,
size,
0,/* FIXME: compression level */
512,/* FIXME: use proper block size */
encrypted,
&len);
ut_ad(len <= size);
memset(src + len, 0, size - len);
updated = true;
}
/* Encrypt the page if encryption was used. */
if (encrypted && decrypted) {
byte *dest = writeptr + i * size;
byte* tmp = fil_encrypt_buf(
iter.crypt_data,
block->page.id.space(),
block->page.id.page_no(),
mach_read_from_8(src + FIL_PAGE_LSN),
src, callback.get_page_size(), dest);
if (tmp == src) {
/* TODO: remove unnecessary memcpy's */
memcpy(dest, src, size);
}
updated = true;
}
}
/* A page was updated in the set, write back to disk. */
if (updated) {
IORequest write_request(IORequest::WRITE);
err = os_file_write(write_request,
iter.filepath, iter.file,
writeptr, offset, n_bytes);
if (err != DB_SUCCESS) {
return err;
}
}
}
return DB_SUCCESS;
}
/********************************************************************//**
Iterate over all the pages in the tablespace.
@param table - the table definiton in the server
@param n_io_buffers - number of blocks to read and write together
@param callback - functor that will do the page updates
@return DB_SUCCESS or error code */
static
dberr_t
fil_tablespace_iterate(
/*===================*/
dict_table_t* table,
ulint n_io_buffers,
AbstractCallback& callback)
{
dberr_t err;
pfs_os_file_t file;
char* filepath;
ut_a(n_io_buffers > 0);
ut_ad(!srv_read_only_mode);
DBUG_EXECUTE_IF("ib_import_trigger_corruption_1",
return(DB_CORRUPTION););
/* Make sure the data_dir_path is set. */
dict_get_and_save_data_dir_path(table, false);
if (DICT_TF_HAS_DATA_DIR(table->flags)) {
ut_a(table->data_dir_path);
filepath = fil_make_filepath(
table->data_dir_path, table->name.m_name, IBD, true);
} else {
filepath = fil_make_filepath(
NULL, table->name.m_name, IBD, false);
}
if (!filepath) {
return(DB_OUT_OF_MEMORY);
} else {
bool success;
file = os_file_create_simple_no_error_handling(
innodb_data_file_key, filepath,
OS_FILE_OPEN, OS_FILE_READ_WRITE, false, &success);
if (!success) {
/* The following call prints an error message */
os_file_get_last_error(true);
ib::error() << "Trying to import a tablespace,"
" but could not open the tablespace file "
<< filepath;
ut_free(filepath);
return DB_TABLESPACE_NOT_FOUND;
} else {
err = DB_SUCCESS;
}
}
callback.set_file(filepath, file);
os_offset_t file_size = os_file_get_size(file);
ut_a(file_size != (os_offset_t) -1);
/* Allocate a page to read in the tablespace header, so that we
can determine the page size and zip_size (if it is compressed).
We allocate an extra page in case it is a compressed table. One
page is to ensure alignement. */
void* page_ptr = ut_malloc_nokey(3 * UNIV_PAGE_SIZE);
byte* page = static_cast<byte*>(ut_align(page_ptr, UNIV_PAGE_SIZE));
buf_block_t* block = reinterpret_cast<buf_block_t*>
(ut_zalloc_nokey(sizeof *block));
block->frame = page;
block->page.id.copy_from(page_id_t(0, 0));
block->page.io_fix = BUF_IO_NONE;
block->page.buf_fix_count = 1;
block->page.state = BUF_BLOCK_FILE_PAGE;
/* Read the first page and determine the page and zip size. */
IORequest request(IORequest::READ);
request.disable_partial_io_warnings();
err = os_file_read_no_error_handling(request, file, page, 0,
UNIV_PAGE_SIZE, 0);
if (err == DB_SUCCESS) {
err = callback.init(file_size, block);
}
if (err == DB_SUCCESS) {
block->page.id.copy_from(
page_id_t(callback.get_space_id(), 0));
block->page.size.copy_from(callback.get_page_size());
if (block->page.size.is_compressed()) {
page_zip_set_size(&block->page.zip,
callback.get_page_size().physical());
/* ROW_FORMAT=COMPRESSED is not optimised for block IO
for now. We do the IMPORT page by page. */
n_io_buffers = 1;
}
fil_iterator_t iter;
/* read (optional) crypt data */
iter.crypt_data = fil_space_read_crypt_data(
callback.get_page_size(), page);
/* If tablespace is encrypted, it needs extra buffers */
if (iter.crypt_data && n_io_buffers > 1) {
/* decrease io buffers so that memory
consumption will not double */
n_io_buffers /= 2;
}
iter.file = file;
iter.start = 0;
iter.end = file_size;
iter.filepath = filepath;
iter.file_size = file_size;
iter.n_io_buffers = n_io_buffers;
/* Add an extra page for compressed page scratch area. */
void* io_buffer = ut_malloc_nokey(
(2 + iter.n_io_buffers) * UNIV_PAGE_SIZE);
iter.io_buffer = static_cast<byte*>(
ut_align(io_buffer, UNIV_PAGE_SIZE));
void* crypt_io_buffer = NULL;
if (iter.crypt_data) {
crypt_io_buffer = ut_malloc_nokey(
(2 + iter.n_io_buffers) * UNIV_PAGE_SIZE);
iter.crypt_io_buffer = static_cast<byte*>(
ut_align(crypt_io_buffer, UNIV_PAGE_SIZE));
}
if (block->page.zip.ssize) {
ut_ad(iter.n_io_buffers == 1);
block->frame = iter.io_buffer;
block->page.zip.data = block->frame + UNIV_PAGE_SIZE;
}
err = fil_iterate(iter, block, callback);
if (iter.crypt_data) {
fil_space_destroy_crypt_data(&iter.crypt_data);
}
ut_free(crypt_io_buffer);
ut_free(io_buffer);
}
if (err == DB_SUCCESS) {
ib::info() << "Sync to disk";
if (!os_file_flush(file)) {
ib::info() << "os_file_flush() failed!";
err = DB_IO_ERROR;
} else {
ib::info() << "Sync to disk - done!";
}
}
os_file_close(file);
ut_free(page_ptr);
ut_free(filepath);
ut_free(block);
return(err);
}
/*****************************************************************//**
Imports a tablespace. The space id in the .ibd file must match the space id
of the table in the data dictionary.
......@@ -3448,9 +3819,7 @@ row_import_for_mysql(
FetchIndexRootPages fetchIndexRootPages(table, trx);
err = fil_tablespace_iterate(
table, IO_BUFFER_SIZE(
cfg.m_page_size.physical(),
cfg.m_page_size.physical()),
table, IO_BUFFER_SIZE(cfg.m_page_size.physical()),
fetchIndexRootPages);
if (err == DB_SUCCESS) {
......@@ -3488,9 +3857,7 @@ row_import_for_mysql(
/* Set the IO buffer size in pages. */
err = fil_tablespace_iterate(
table, IO_BUFFER_SIZE(
cfg.m_page_size.physical(),
cfg.m_page_size.physical()), converter);
table, IO_BUFFER_SIZE(cfg.m_page_size.physical()), converter);
DBUG_EXECUTE_IF("ib_import_reset_space_and_lsn_failure",
err = DB_TOO_MANY_CONCURRENT_TRXS;);
......
......@@ -25,8 +25,6 @@ Created 10/25/1995 Heikki Tuuri
*******************************************************/
#include "fil0fil.h"
#include "fil0pagecompress.h"
#include "fsp0pagecompress.h"
#include "fil0crypt.h"
#include <debug_sync.h>
......@@ -49,12 +47,10 @@ Created 10/25/1995 Heikki Tuuri
#include "page0zip.h"
#include "trx0sys.h"
#include "row0mysql.h"
#include "os0file.h"
#ifndef UNIV_HOTBACKUP
# include "buf0lru.h"
# include "ibuf0ibuf.h"
# include "sync0sync.h"
# include "os0sync.h"
#else /* !UNIV_HOTBACKUP */
# include "srv0srv.h"
static ulint srv_data_read, srv_data_written;
......@@ -704,7 +700,7 @@ fil_node_open_file(
space->size += node->size;
}
ulint atomic_writes = fsp_flags_get_atomic_writes(space->flags);
ulint atomic_writes = FSP_FLAGS_GET_ATOMIC_WRITES(space->flags);
/* printf("Opening file %s\n", node->name); */
......@@ -4110,7 +4106,6 @@ fil_open_single_table_tablespace(
fsp_open_info remote;
ulint tablespaces_found = 0;
ulint valid_tablespaces_found = 0;
ulint atomic_writes = 0;
fil_space_crypt_t* crypt_data = NULL;
#ifdef UNIV_SYNC_DEBUG
......@@ -4124,8 +4119,8 @@ fil_open_single_table_tablespace(
return(DB_CORRUPTION);
}
ut_ad(fsp_flags_is_valid(flags & ~FSP_FLAGS_MEM_MASK));
atomic_writes = fsp_flags_get_atomic_writes(flags);
ut_ad(fsp_flags_is_valid(flags & ~FSP_FLAGS_MEM_MASK, id));
const ulint atomic_writes = FSP_FLAGS_GET_ATOMIC_WRITES(flags);
memset(&def, 0, sizeof(def));
memset(&dict, 0, sizeof(dict));
......@@ -6151,7 +6146,8 @@ fil_io(
} else if (type == OS_FILE_WRITE) {
ut_ad(!srv_read_only_mode);
srv_stats.data_written.add(len);
if (fil_page_is_index_page((byte *)buf)) {
if (mach_read_from_2(static_cast<const byte*>(buf)
+ FIL_PAGE_TYPE) == FIL_PAGE_INDEX) {
srv_stats.index_pages_written.inc();
} else {
srv_stats.non_index_pages_written.inc();
......@@ -6683,479 +6679,6 @@ fil_close(void)
fil_system = NULL;
}
/********************************************************************//**
Initializes a buffer control block when the buf_pool is created. */
static
void
fil_buf_block_init(
/*===============*/
buf_block_t* block, /*!< in: pointer to control block */
byte* frame) /*!< in: pointer to buffer frame */
{
UNIV_MEM_DESC(frame, UNIV_PAGE_SIZE);
block->frame = frame;
block->page.io_fix = BUF_IO_NONE;
/* There are assertions that check for this. */
block->page.buf_fix_count = 1;
block->page.state = BUF_BLOCK_READY_FOR_USE;
page_zip_des_init(&block->page.zip);
}
struct fil_iterator_t {
pfs_os_file_t file; /*!< File handle */
const char* filepath; /*!< File path name */
os_offset_t start; /*!< From where to start */
os_offset_t end; /*!< Where to stop */
os_offset_t file_size; /*!< File size in bytes */
ulint page_size; /*!< Page size */
ulint n_io_buffers; /*!< Number of pages to use
for IO */
byte* io_buffer; /*!< Buffer to use for IO */
fil_space_crypt_t *crypt_data; /*!< Crypt data (if encrypted) */
byte* crypt_io_buffer; /*!< IO buffer when encrypted */
};
/********************************************************************//**
TODO: This can be made parallel trivially by chunking up the file and creating
a callback per thread. . Main benefit will be to use multiple CPUs for
checksums and compressed tables. We have to do compressed tables block by
block right now. Secondly we need to decompress/compress and copy too much
of data. These are CPU intensive.
Iterate over all the pages in the tablespace.
@param iter - Tablespace iterator
@param block - block to use for IO
@param callback - Callback to inspect and update page contents
@retval DB_SUCCESS or error code */
static
dberr_t
fil_iterate(
/*========*/
const fil_iterator_t& iter,
buf_block_t* block,
PageCallback& callback)
{
os_offset_t offset;
ulint page_no = 0;
ulint space_id = callback.get_space_id();
ulint n_bytes = iter.n_io_buffers * iter.page_size;
ut_ad(!srv_read_only_mode);
/* TODO: For compressed tables we do a lot of useless
copying for non-index pages. Unfortunately, it is
required by buf_zip_decompress() */
const bool row_compressed = callback.get_zip_size() > 0;
for (offset = iter.start; offset < iter.end; offset += n_bytes) {
byte* io_buffer = iter.io_buffer;
block->frame = io_buffer;
if (row_compressed) {
page_zip_des_init(&block->page.zip);
page_zip_set_size(&block->page.zip, iter.page_size);
block->page.zip.data = block->frame + UNIV_PAGE_SIZE;
ut_d(block->page.zip.m_external = true);
ut_ad(iter.page_size == callback.get_zip_size());
/* Zip IO is done in the compressed page buffer. */
io_buffer = block->page.zip.data;
}
/* We have to read the exact number of bytes. Otherwise the
InnoDB IO functions croak on failed reads. */
n_bytes = static_cast<ulint>(
ut_min(static_cast<os_offset_t>(n_bytes),
iter.end - offset));
ut_ad(n_bytes > 0);
ut_ad(!(n_bytes % iter.page_size));
const bool encrypted = iter.crypt_data != NULL
&& iter.crypt_data->should_encrypt();
/* Use additional crypt io buffer if tablespace is encrypted */
byte* const readptr = encrypted
? iter.crypt_io_buffer : io_buffer;
byte* const writeptr = readptr;
if (!os_file_read(iter.file, readptr, offset, (ulint) n_bytes)) {
ib_logf(IB_LOG_LEVEL_ERROR, "os_file_read() failed");
return(DB_IO_ERROR);
}
bool updated = false;
os_offset_t page_off = offset;
ulint n_pages_read = (ulint) n_bytes / iter.page_size;
bool decrypted = false;
for (ulint i = 0; i < n_pages_read; ++i) {
ulint size = iter.page_size;
dberr_t err = DB_SUCCESS;
byte* src = readptr + (i * size);
byte* dst = io_buffer + (i * size);
bool frame_changed = false;
ulint page_type = mach_read_from_2(src+FIL_PAGE_TYPE);
const bool page_compressed
= page_type == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED
|| page_type == FIL_PAGE_PAGE_COMPRESSED;
/* If tablespace is encrypted, we need to decrypt
the page. Note that tablespaces are not in
fil_system during import. */
if (encrypted) {
decrypted = fil_space_decrypt(
iter.crypt_data,
dst, //dst
iter.page_size,
src, // src
&err); // src
if (err != DB_SUCCESS) {
return(err);
}
if (decrypted) {
updated = true;
} else {
if (!page_compressed && !row_compressed) {
block->frame = src;
frame_changed = true;
} else {
memcpy(dst, src, size);
}
}
}
/* If the original page is page_compressed, we need
to decompress page before we can update it. */
if (page_compressed) {
fil_decompress_page(NULL, dst, ulong(size),
NULL);
updated = true;
}
buf_block_set_file_page(block, space_id, page_no++);
if ((err = callback(page_off, block)) != DB_SUCCESS) {
return(err);
} else if (!updated) {
updated = buf_block_get_state(block)
== BUF_BLOCK_FILE_PAGE;
}
buf_block_set_state(block, BUF_BLOCK_NOT_USED);
buf_block_set_state(block, BUF_BLOCK_READY_FOR_USE);
/* If tablespace is encrypted we use additional
temporary scratch area where pages are read
for decrypting readptr == crypt_io_buffer != io_buffer.
Destination for decryption is a buffer pool block
block->frame == dst == io_buffer that is updated.
Pages that did not require decryption even when
tablespace is marked as encrypted are not copied
instead block->frame is set to src == readptr.
For encryption we again use temporary scratch area
writeptr != io_buffer == dst
that is then written to the tablespace
(1) For normal tables io_buffer == dst == writeptr
(2) For only page compressed tables
io_buffer == dst == writeptr
(3) For encrypted (and page compressed)
readptr != io_buffer == dst != writeptr
*/
ut_ad(!encrypted && !page_compressed ?
src == dst && dst == writeptr + (i * size):1);
ut_ad(page_compressed && !encrypted ?
src == dst && dst == writeptr + (i * size):1);
ut_ad(encrypted ?
src != dst && dst != writeptr + (i * size):1);
if (encrypted) {
memcpy(writeptr + (i * size),
row_compressed ? block->page.zip.data :
block->frame, size);
}
if (frame_changed) {
block->frame = dst;
}
src = io_buffer + (i * size);
if (page_compressed) {
ulint len = 0;
fil_compress_page(
NULL,
src,
NULL,
size,
0,/* FIXME: compression level */
512,/* FIXME: use proper block size */
encrypted,
&len);
updated = true;
}
/* If tablespace is encrypted, encrypt page before we
write it back. Note that we should not encrypt the
buffer that is in buffer pool. */
/* NOTE: At this stage of IMPORT the
buffer pool is not being used at all! */
if (decrypted && encrypted) {
byte *dest = writeptr + (i * size);
ulint space = mach_read_from_4(
src + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID);
ulint offset = mach_read_from_4(src + FIL_PAGE_OFFSET);
ib_uint64_t lsn = mach_read_from_8(src + FIL_PAGE_LSN);
byte* tmp = fil_encrypt_buf(
iter.crypt_data,
space,
offset,
lsn,
src,
iter.page_size == UNIV_PAGE_SIZE ? 0 : iter.page_size,
dest);
if (tmp == src) {
/* TODO: remove unnecessary memcpy's */
memcpy(dest, src, size);
}
updated = true;
}
page_off += iter.page_size;
block->frame += iter.page_size;
}
/* A page was updated in the set, write back to disk. */
if (updated
&& !os_file_write(
iter.filepath, iter.file, writeptr,
offset, (ulint) n_bytes)) {
ib_logf(IB_LOG_LEVEL_ERROR, "os_file_write() failed");
return(DB_IO_ERROR);
}
}
return(DB_SUCCESS);
}
/********************************************************************//**
Iterate over all the pages in the tablespace.
@param table - the table definiton in the server
@param n_io_buffers - number of blocks to read and write together
@param callback - functor that will do the page updates
@return DB_SUCCESS or error code */
UNIV_INTERN
dberr_t
fil_tablespace_iterate(
/*===================*/
dict_table_t* table,
ulint n_io_buffers,
PageCallback& callback)
{
dberr_t err;
pfs_os_file_t file;
char* filepath;
ut_a(n_io_buffers > 0);
ut_ad(!srv_read_only_mode);
DBUG_EXECUTE_IF("ib_import_trigger_corruption_1",
return(DB_CORRUPTION););
if (DICT_TF_HAS_DATA_DIR(table->flags)) {
dict_get_and_save_data_dir_path(table, false);
ut_a(table->data_dir_path);
filepath = os_file_make_remote_pathname(
table->data_dir_path, table->name, "ibd");
} else {
filepath = fil_make_ibd_name(table->name, false);
}
{
ibool success;
file = os_file_create_simple_no_error_handling(
innodb_file_data_key, filepath,
OS_FILE_OPEN, OS_FILE_READ_WRITE, &success, FALSE);
DBUG_EXECUTE_IF("fil_tablespace_iterate_failure",
{
static bool once;
if (!once || ut_rnd_interval(0, 10) == 5) {
once = true;
success = FALSE;
os_file_close(file);
}
});
if (!success) {
/* The following call prints an error message */
os_file_get_last_error(true);
ib_logf(IB_LOG_LEVEL_ERROR,
"Trying to import a tablespace, but could not "
"open the tablespace file %s", filepath);
mem_free(filepath);
return(DB_TABLESPACE_NOT_FOUND);
} else {
err = DB_SUCCESS;
}
}
callback.set_file(filepath, file);
os_offset_t file_size = os_file_get_size(file);
ut_a(file_size != (os_offset_t) -1);
/* The block we will use for every physical page */
buf_block_t block;
memset(&block, 0x0, sizeof(block));
/* Allocate a page to read in the tablespace header, so that we
can determine the page size and zip_size (if it is compressed).
We allocate an extra page in case it is a compressed table. One
page is to ensure alignement. */
void* page_ptr = mem_alloc(3 * UNIV_PAGE_SIZE);
byte* page = static_cast<byte*>(ut_align(page_ptr, UNIV_PAGE_SIZE));
fil_buf_block_init(&block, page);
/* Read the first page and determine the page and zip size. */
if (!os_file_read(file, page, 0, UNIV_PAGE_SIZE)) {
err = DB_IO_ERROR;
} else if ((err = callback.init(file_size, &block)) == DB_SUCCESS) {
fil_iterator_t iter;
iter.file = file;
iter.start = 0;
iter.end = file_size;
iter.filepath = filepath;
iter.file_size = file_size;
iter.n_io_buffers = n_io_buffers;
iter.page_size = callback.get_page_size();
/* In MariaDB/MySQL 5.6 tablespace does not exist
during import, therefore we can't use space directly
here. */
ulint crypt_data_offset = fsp_header_get_crypt_offset(
callback.get_zip_size());
/* read (optional) crypt data */
iter.crypt_data = fil_space_read_crypt_data(
0, page, crypt_data_offset);
/* Compressed pages can't be optimised for block IO for now.
We do the IMPORT page by page. */
if (callback.get_zip_size() > 0) {
iter.n_io_buffers = 1;
ut_a(iter.page_size == callback.get_zip_size());
}
/** If tablespace is encrypted, it needs extra buffers */
if (iter.crypt_data != NULL) {
/* decrease io buffers so that memory
* consumption doesnt double
* note: the +1 is to avoid n_io_buffers getting down to 0 */
iter.n_io_buffers = (iter.n_io_buffers + 1) / 2;
}
/** Add an extra page for compressed page scratch area. */
void* io_buffer = mem_alloc(
(2 + iter.n_io_buffers) * UNIV_PAGE_SIZE);
iter.io_buffer = static_cast<byte*>(
ut_align(io_buffer, UNIV_PAGE_SIZE));
void* crypt_io_buffer = NULL;
if (iter.crypt_data != NULL) {
crypt_io_buffer = mem_alloc(
(2 + iter.n_io_buffers) * UNIV_PAGE_SIZE);
iter.crypt_io_buffer = static_cast<byte*>(
ut_align(crypt_io_buffer, UNIV_PAGE_SIZE));
}
err = fil_iterate(iter, &block, callback);
mem_free(io_buffer);
if (crypt_io_buffer != NULL) {
mem_free(crypt_io_buffer);
iter.crypt_io_buffer = NULL;
fil_space_destroy_crypt_data(&iter.crypt_data);
}
}
if (err == DB_SUCCESS) {
ib_logf(IB_LOG_LEVEL_INFO, "Sync to disk");
if (!os_file_flush(file)) {
ib_logf(IB_LOG_LEVEL_INFO, "os_file_flush() failed!");
err = DB_IO_ERROR;
} else {
ib_logf(IB_LOG_LEVEL_INFO, "Sync to disk - done!");
}
}
os_file_close(file);
mem_free(page_ptr);
mem_free(filepath);
return(err);
}
/**
Set the tablespace compressed table size.
@return DB_SUCCESS if it is valie or DB_CORRUPTION if not */
dberr_t
PageCallback::set_zip_size(const buf_frame_t* page) UNIV_NOTHROW
{
m_zip_size = fsp_header_get_zip_size(page);
if (!ut_is_2pow(m_zip_size) || m_zip_size > UNIV_ZIP_SIZE_MAX) {
return(DB_CORRUPTION);
}
return(DB_SUCCESS);
}
/********************************************************************//**
Delete the tablespace file and any related files like .cfg.
This should not be called for temporary tables. */
......
......@@ -1309,107 +1309,6 @@ fil_delete_file(
/*============*/
const char* path); /*!< in: filepath of the ibd tablespace */
/** Callback functor. */
struct PageCallback {
/**
Default constructor */
PageCallback()
:
m_zip_size(),
m_page_size(),
m_filepath() UNIV_NOTHROW {}
virtual ~PageCallback() UNIV_NOTHROW {}
/**
Called for page 0 in the tablespace file at the start.
@param file_size - size of the file in bytes
@param block - contents of the first page in the tablespace file
@retval DB_SUCCESS or error code.*/
virtual dberr_t init(
os_offset_t file_size,
const buf_block_t* block) UNIV_NOTHROW = 0;
/**
Called for every page in the tablespace. If the page was not
updated then its state must be set to BUF_PAGE_NOT_USED. For
compressed tables the page descriptor memory will be at offset:
block->frame + UNIV_PAGE_SIZE;
@param offset - physical offset within the file
@param block - block read from file, note it is not from the buffer pool
@retval DB_SUCCESS or error code. */
virtual dberr_t operator()(
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW = 0;
/**
Set the name of the physical file and the file handle that is used
to open it for the file that is being iterated over.
@param filename - then physical name of the tablespace file.
@param file - OS file handle */
void set_file(const char* filename, pfs_os_file_t file) UNIV_NOTHROW
{
m_file = file;
m_filepath = filename;
}
/**
@return the space id of the tablespace */
virtual ulint get_space_id() const UNIV_NOTHROW = 0;
/** The compressed page size
@return the compressed page size */
ulint get_zip_size() const
{
return(m_zip_size);
}
/**
Set the tablespace compressed table size.
@return DB_SUCCESS if it is valie or DB_CORRUPTION if not */
dberr_t set_zip_size(const buf_frame_t* page) UNIV_NOTHROW;
/** The compressed page size
@return the compressed page size */
ulint get_page_size() const
{
return(m_page_size);
}
/** Compressed table page size */
ulint m_zip_size;
/** The tablespace page size. */
ulint m_page_size;
/** File handle to the tablespace */
pfs_os_file_t m_file;
/** Physical file path. */
const char* m_filepath;
protected:
// Disable copying
PageCallback(const PageCallback&);
PageCallback& operator=(const PageCallback&);
};
/********************************************************************//**
Iterate over all the pages in the tablespace.
@param table - the table definiton in the server
@param n_io_buffers - number of blocks to read and write together
@param callback - functor that will do the page updates
@return DB_SUCCESS or error code */
UNIV_INTERN
dberr_t
fil_tablespace_iterate(
/*===================*/
dict_table_t* table,
ulint n_io_buffers,
PageCallback& callback)
MY_ATTRIBUTE((nonnull, warn_unused_result));
/*******************************************************************//**
Checks if a single-table tablespace for a given table name exists in the
tablespace memory cache.
......
......@@ -406,6 +406,11 @@ mem_heap_create_block_func(
heap->total_size += len;
}
/* Poison all available memory. Individual chunks will be unpoisoned on
every mem_heap_alloc() call. */
compile_time_assert(MEM_BLOCK_HEADER_SIZE >= sizeof *block);
UNIV_MEM_FREE(block + 1, len - sizeof *block);
ut_ad((ulint)MEM_BLOCK_HEADER_SIZE < len);
return(block);
......
......@@ -2,7 +2,7 @@
Copyright (c) 1995, 2017, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2009, Percona Inc.
Copyright (c) 2013, 2017, MariaDB Corporation.
Copyright (c) 2013, 2018, MariaDB Corporation.
Portions of this file contain modifications contributed and copyrighted
by Percona Inc.. Those modifications are
......@@ -3169,15 +3169,21 @@ os_file_read_func(
overlapped.hEvent = win_get_syncio_event();
ret = ReadFile(file, buf, n, NULL, &overlapped);
if (ret) {
ret = GetOverlappedResult(file, &overlapped, (DWORD *)&len, FALSE);
}
else if(GetLastError() == ERROR_IO_PENDING) {
ret = GetOverlappedResult(file, &overlapped, (DWORD *)&len, TRUE);
ret = GetOverlappedResult(file, &overlapped, &len, FALSE);
} else if (GetLastError() == ERROR_IO_PENDING) {
ret = GetOverlappedResult(file, &overlapped, &len, TRUE);
}
MONITOR_ATOMIC_DEC_LOW(MONITOR_OS_PENDING_READS, monitor);
if (ret && len == n) {
if (!ret) {
} else if (len == n) {
return(TRUE);
} else {
ib_logf(IB_LOG_LEVEL_ERROR,
"Tried to read " ULINTPF " bytes at offset "
UINT64PF ". Was only able to read %lu.",
n, offset, ret);
return FALSE;
}
#else /* __WIN__ */
ibool retry;
......@@ -3204,6 +3210,7 @@ os_file_read_func(
"Tried to read " ULINTPF " bytes at offset "
UINT64PF ". Was only able to read %ld.",
n, offset, (lint) ret);
return FALSE;
}
#endif /* __WIN__ */
retry = os_file_handle_error(NULL, "read", __FILE__, __LINE__);
......@@ -3272,15 +3279,21 @@ os_file_read_no_error_handling_func(
overlapped.hEvent = win_get_syncio_event();
ret = ReadFile(file, buf, n, NULL, &overlapped);
if (ret) {
ret = GetOverlappedResult(file, &overlapped, (DWORD *)&len, FALSE);
}
else if(GetLastError() == ERROR_IO_PENDING) {
ret = GetOverlappedResult(file, &overlapped, (DWORD *)&len, TRUE);
ret = GetOverlappedResult(file, &overlapped, &len, FALSE);
} else if (GetLastError() == ERROR_IO_PENDING) {
ret = GetOverlappedResult(file, &overlapped, &len, TRUE);
}
MONITOR_ATOMIC_DEC_LOW(MONITOR_OS_PENDING_READS, monitor);
if (ret && len == n) {
if (!ret) {
} else if (len == n) {
return(TRUE);
} else {
ib_logf(IB_LOG_LEVEL_ERROR,
"Tried to read " ULINTPF " bytes at offset "
UINT64PF ". Was only able to read %lu.",
n, offset, len);
return FALSE;
}
#else /* __WIN__ */
ibool retry;
......@@ -3303,6 +3316,7 @@ os_file_read_no_error_handling_func(
"Tried to read " ULINTPF " bytes at offset "
UINT64PF ". Was only able to read %ld.",
n, offset, (lint) ret);
return FALSE;
}
#endif /* __WIN__ */
retry = os_file_handle_error_no_exit(NULL, "read", FALSE, __FILE__, __LINE__);
......@@ -3383,10 +3397,9 @@ os_file_write_func(
overlapped.hEvent = win_get_syncio_event();
ret = WriteFile(file, buf, n, NULL, &overlapped);
if (ret) {
ret = GetOverlappedResult(file, &overlapped, (DWORD *)&len, FALSE);
}
else if ( GetLastError() == ERROR_IO_PENDING) {
ret = GetOverlappedResult(file, &overlapped, (DWORD *)&len, TRUE);
ret = GetOverlappedResult(file, &overlapped, &len, FALSE);
} else if (GetLastError() == ERROR_IO_PENDING) {
ret = GetOverlappedResult(file, &overlapped, &len, TRUE);
}
MONITOR_ATOMIC_DEC_LOW(MONITOR_OS_PENDING_WRITES, monitor);
......@@ -6588,8 +6601,7 @@ os_file_trim(
DWORD tmp;
if (ret) {
ret = GetOverlappedResult(slot->file, &overlapped, &tmp, FALSE);
}
else if (GetLastError() == ERROR_IO_PENDING) {
} else if (GetLastError() == ERROR_IO_PENDING) {
ret = GetOverlappedResult(slot->file, &overlapped, &tmp, TRUE);
}
if (!ret) {
......
......@@ -40,13 +40,11 @@ Created 2012-02-08 by Sunny Bains.
#include "row0mysql.h"
#include "srv0start.h"
#include "row0quiesce.h"
#include "buf0buf.h"
#include "fil0pagecompress.h"
#include <vector>
/** The size of the buffer to use for IO. Note: os_file_read() doesn't expect
reads to fail. If you set the buffer size to be greater than a multiple of the
file size then it will assert. TODO: Fix this limitation of the IO functions.
/** The size of the buffer to use for IO.
@param n - page size of the tablespace.
@retval number of pages */
#define IO_BUFFER_SIZE(n) ((1024 * 1024) / n)
......@@ -362,7 +360,8 @@ class IndexPurge {
/** Functor that is called for each physical page that is read from the
tablespace file. */
class AbstractCallback : public PageCallback {
class AbstractCallback
{
public:
/** Constructor
@param trx - covering transaction */
......@@ -395,32 +394,62 @@ class AbstractCallback : public PageCallback {
return(get_zip_size() > 0);
}
protected:
/**
Get the data page depending on the table type, compressed or not.
@param block - block read from disk
@retval the buffer frame */
buf_frame_t* get_frame(buf_block_t* block) const UNIV_NOTHROW
Set the name of the physical file and the file handle that is used
to open it for the file that is being iterated over.
@param filename - then physical name of the tablespace file.
@param file - OS file handle */
void set_file(const char* filename, pfs_os_file_t file) UNIV_NOTHROW
{
if (is_compressed_table()) {
return(block->page.zip.data);
m_file = file;
m_filepath = filename;
}
return(buf_block_get_frame(block));
/** The compressed page size
@return the compressed page size */
ulint get_zip_size() const
{
return(m_zip_size);
}
/** Check for session interrupt. If required we could
even flush to disk here every N pages.
@retval DB_SUCCESS or error code */
dberr_t periodic_check() UNIV_NOTHROW
/** The compressed page size
@return the compressed page size */
ulint get_page_size() const
{
if (trx_is_interrupted(m_trx)) {
return(DB_INTERRUPTED);
return(m_page_size);
}
return(DB_SUCCESS);
const char* filename() const { return m_filepath; }
/**
Called for every page in the tablespace. If the page was not
updated then its state must be set to BUF_PAGE_NOT_USED. For
compressed tables the page descriptor memory will be at offset:
block->frame + UNIV_PAGE_SIZE;
@param offset - physical offset within the file
@param block - block read from file, note it is not from the buffer pool
@retval DB_SUCCESS or error code. */
virtual dberr_t operator()(
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW = 0;
/**
@return the space id of the tablespace */
virtual ulint get_space_id() const UNIV_NOTHROW = 0;
bool is_interrupted() const { return trx_is_interrupted(m_trx); }
/**
Get the data page depending on the table type, compressed or not.
@param block - block read from disk
@retval the buffer frame */
static byte* get_frame(const buf_block_t* block)
{
return block->page.zip.data
? block->page.zip.data : block->frame;
}
protected:
/**
Get the physical offset of the extent descriptor within the page.
@param page_no - page number of the extent descriptor
......@@ -510,6 +539,18 @@ class AbstractCallback : public PageCallback {
}
protected:
/** Compressed table page size */
ulint m_zip_size;
/** The tablespace page size. */
ulint m_page_size;
/** File handle to the tablespace */
pfs_os_file_t m_file;
/** Physical file path. */
const char* m_filepath;
/** Covering transaction. */
trx_t* m_trx;
......@@ -566,9 +607,9 @@ AbstractCallback::init(
/* Since we don't know whether it is a compressed table
or not, the data is always read into the block->frame. */
dberr_t err = set_zip_size(block->frame);
m_zip_size = fsp_header_get_zip_size(page);
if (err != DB_SUCCESS) {
if (!ut_is_2pow(m_zip_size) || m_zip_size > UNIV_ZIP_SIZE_MAX) {
return(DB_CORRUPTION);
}
......@@ -605,11 +646,7 @@ AbstractCallback::init(
m_free_limit = mach_read_from_4(page + FSP_FREE_LIMIT);
m_space = mach_read_from_4(page + FSP_HEADER_OFFSET + FSP_SPACE_ID);
if ((err = set_current_xdes(0, page)) != DB_SUCCESS) {
return(err);
}
return(DB_SUCCESS);
return set_current_xdes(0, page);
}
/**
......@@ -682,11 +719,7 @@ FetchIndexRootPages::operator() (
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW
{
dberr_t err;
if ((err = periodic_check()) != DB_SUCCESS) {
return(err);
}
if (is_interrupted()) return DB_INTERRUPTED;
const page_t* page = get_frame(block);
......@@ -699,9 +732,9 @@ FetchIndexRootPages::operator() (
block->page.offset,
(ulint) (offset / m_page_size));
err = DB_CORRUPTION;
return DB_CORRUPTION;
} else if (page_type == FIL_PAGE_TYPE_XDES) {
err = set_current_xdes(block->page.offset, page);
return set_current_xdes(block->page.offset, page);
} else if (page_type == FIL_PAGE_INDEX
&& !is_free(block->page.offset)
&& is_root_page(page)) {
......@@ -726,7 +759,7 @@ FetchIndexRootPages::operator() (
}
}
return(err);
return DB_SUCCESS;
}
/**
......@@ -850,14 +883,6 @@ class PageConverter : public AbstractCallback {
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW;
private:
/** Status returned by PageConverter::validate() */
enum import_page_status_t {
IMPORT_PAGE_STATUS_OK, /*!< Page is OK */
IMPORT_PAGE_STATUS_ALL_ZERO, /*!< Page is all zeros */
IMPORT_PAGE_STATUS_CORRUPTED /*!< Page is corrupted */
};
/**
Update the page, set the space id, max trx id and index id.
@param block - block read from file
......@@ -867,17 +892,6 @@ class PageConverter : public AbstractCallback {
buf_block_t* block,
ulint& page_type) UNIV_NOTHROW;
#if defined UNIV_DEBUG
/**
@return true error condition is enabled. */
bool trigger_corruption() UNIV_NOTHROW
{
return(false);
}
#else
#define trigger_corruption() (false)
#endif /* UNIV_DEBUG */
/**
Update the space, index id, trx id.
@param block - block to convert
......@@ -890,15 +904,6 @@ class PageConverter : public AbstractCallback {
@retval DB_SUCCESS or error code */
dberr_t update_records(buf_block_t* block) UNIV_NOTHROW;
/**
Validate the page, check for corruption.
@param offset - physical offset within file.
@param page - page read from file.
@return 0 on success, 1 if all zero, 2 if corrupted */
import_page_status_t validate(
os_offset_t offset,
buf_block_t* page) UNIV_NOTHROW;
/**
Validate the space flags and update tablespace header page.
@param block - block read from file, not from the buffer pool.
......@@ -1316,8 +1321,8 @@ row_import::match_schema(
return(DB_ERROR);
} else if (m_table->n_cols != m_n_cols) {
ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
"Number of columns don't match, table has %u "
"columns but the tablespace meta-data file has "
"Number of columns don't match, table has %u"
" columns but the tablespace meta-data file has "
ULINTPF " columns",
m_table->n_cols, m_n_cols);
......@@ -1597,6 +1602,7 @@ IndexPurge::purge() UNIV_NOTHROW
Constructor
* @param cfg - config of table being imported.
* @param trx - transaction covering the import */
inline
PageConverter::PageConverter(
row_import* cfg,
trx_t* trx)
......@@ -1621,6 +1627,7 @@ Adjust the BLOB reference for a single column that is externally stored
@param offsets - column offsets for the record
@param i - column ordinal value
@return DB_SUCCESS or error code */
inline
dberr_t
PageConverter::adjust_cluster_index_blob_column(
rec_t* rec,
......@@ -1673,6 +1680,7 @@ stored columns.
@param rec - record to update
@param offsets - column offsets for the record
@return DB_SUCCESS or error code */
inline
dberr_t
PageConverter::adjust_cluster_index_blob_columns(
rec_t* rec,
......@@ -1706,6 +1714,7 @@ BLOB reference, write the new space id.
@param rec - record to update
@param offsets - column offsets for the record
@return DB_SUCCESS or error code */
inline
dberr_t
PageConverter::adjust_cluster_index_blob_ref(
rec_t* rec,
......@@ -1729,6 +1738,7 @@ Purge delete-marked records, only if it is possible to do so without
re-organising the B+tree.
@param offsets - current row offsets.
@return true if purge succeeded */
inline
bool
PageConverter::purge(const ulint* offsets) UNIV_NOTHROW
{
......@@ -1753,6 +1763,7 @@ Adjust the BLOB references and sys fields for the current record.
@param offsets - column offsets for the record
@param deleted - true if row is delete marked
@return DB_SUCCESS or error code. */
inline
dberr_t
PageConverter::adjust_cluster_record(
const dict_index_t* index,
......@@ -1781,6 +1792,7 @@ Update the BLOB refrences and write UNDO log entries for
rows that can't be purged optimistically.
@param block - block to update
@retval DB_SUCCESS or error code */
inline
dberr_t
PageConverter::update_records(
buf_block_t* block) UNIV_NOTHROW
......@@ -1846,6 +1858,7 @@ PageConverter::update_records(
/**
Update the space, index id, trx id.
@return DB_SUCCESS or error code */
inline
dberr_t
PageConverter::update_index_page(
buf_block_t* block) UNIV_NOTHROW
......@@ -1915,6 +1928,7 @@ PageConverter::update_index_page(
Validate the space flags and update tablespace header page.
@param block - block read from file, not from the buffer pool.
@retval DB_SUCCESS or error code */
inline
dberr_t
PageConverter::update_header(
buf_block_t* block) UNIV_NOTHROW
......@@ -1954,6 +1968,7 @@ PageConverter::update_header(
Update the page, set the space id, max trx id and index id.
@param block - block read from file
@retval DB_SUCCESS or error code */
inline
dberr_t
PageConverter::update_page(
buf_block_t* block,
......@@ -1961,6 +1976,14 @@ PageConverter::update_page(
{
dberr_t err = DB_SUCCESS;
ut_ad(!block->page.zip.data == !is_compressed_table());
if (block->page.zip.data) {
m_page_zip_ptr = &block->page.zip;
} else {
ut_ad(!m_page_zip_ptr);
}
switch (page_type = fil_page_get_type(get_frame(block))) {
case FIL_PAGE_TYPE_FSP_HDR:
/* Work directly on the uncompressed page headers. */
......@@ -2015,96 +2038,25 @@ PageConverter::update_page(
return(DB_CORRUPTION);
}
/**
Validate the page
@param offset - physical offset within file.
@param page - page read from file.
@return status */
PageConverter::import_page_status_t
PageConverter::validate(
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW
{
buf_frame_t* page = get_frame(block);
/* Check that the page number corresponds to the offset in
the file. Flag as corrupt if it doesn't. Disable the check
for LSN in buf_page_is_corrupted() */
if (buf_page_is_corrupted(false, page, get_zip_size(), NULL)
|| (page_get_page_no(page) != offset / m_page_size
&& page_get_page_no(page) != 0)) {
return(IMPORT_PAGE_STATUS_CORRUPTED);
} else if (offset > 0 && page_get_page_no(page) == 0) {
ulint checksum;
checksum = mach_read_from_4(page + FIL_PAGE_SPACE_OR_CHKSUM);
if (checksum != 0) {
/* Checksum check passed in buf_page_is_corrupted(). */
ib_logf(IB_LOG_LEVEL_WARN,
"%s: Page %lu checksum " ULINTPF
" should be zero.",
m_filepath, (ulong) (offset / m_page_size),
checksum);
}
const byte* b = page + FIL_PAGE_OFFSET;
const byte* e = page + m_page_size
- FIL_PAGE_END_LSN_OLD_CHKSUM;
/* If the page number is zero and offset > 0 then
the entire page MUST consist of zeroes. If not then
we flag it as corrupt. */
while (b != e) {
if (*b++ && !trigger_corruption()) {
return(IMPORT_PAGE_STATUS_CORRUPTED);
}
}
/* The page is all zero: do nothing. */
return(IMPORT_PAGE_STATUS_ALL_ZERO);
}
return(IMPORT_PAGE_STATUS_OK);
}
/**
Called for every page in the tablespace. If the page was not
updated then its state must be set to BUF_PAGE_NOT_USED.
@param offset - physical offset within the file
@param block - block read from file, note it is not from the buffer pool
@retval DB_SUCCESS or error code. */
dberr_t
PageConverter::operator() (
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW
PageConverter::operator() (os_offset_t, buf_block_t* block) UNIV_NOTHROW
{
ulint page_type;
dberr_t err = DB_SUCCESS;
if ((err = periodic_check()) != DB_SUCCESS) {
return(err);
}
if (is_compressed_table()) {
m_page_zip_ptr = &block->page.zip;
} else {
ut_ad(m_page_zip_ptr == 0);
}
switch(validate(offset, block)) {
case IMPORT_PAGE_STATUS_OK:
/* If we already had an old page with matching number
in the buffer pool, evict it now, because
we no longer evict the pages on DISCARD TABLESPACE. */
buf_page_get_gen(get_space_id(), get_zip_size(), block->page.offset,
RW_NO_LATCH, NULL, BUF_EVICT_IF_IN_POOL,
__FILE__, __LINE__, NULL);
/* We have to decompress the compressed pages before
we can work on them */
ulint page_type;
if ((err = update_page(block, page_type)) != DB_SUCCESS) {
break;
}
dberr_t err = update_page(block, page_type);
if (err != DB_SUCCESS) return err;
/* Note: For compressed pages this function will write to the
zip descriptor and for uncompressed pages it will write to
......@@ -2113,43 +2065,19 @@ PageConverter::operator() (
pages. */
if (!is_compressed_table() || page_type == FIL_PAGE_INDEX) {
buf_flush_init_for_writing(
!is_compressed_table()
? block->frame : block->page.zip.data,
!is_compressed_table() ? 0 : m_page_zip_ptr,
get_frame(block),
block->page.zip.data ? &block->page.zip : NULL,
m_current_lsn);
} else {
/* Calculate and update the checksum of non-btree
pages for compressed tables explicitly here. */
buf_flush_update_zip_checksum(
get_frame(block), get_zip_size(),
m_current_lsn);
}
break;
case IMPORT_PAGE_STATUS_ALL_ZERO:
/* The page is all zero: leave it as is. */
break;
case IMPORT_PAGE_STATUS_CORRUPTED:
ib_logf(IB_LOG_LEVEL_WARN,
"%s: Page %lu at offset " UINT64PF " looks corrupted.",
m_filepath, (ulong) (offset / m_page_size), offset);
err = DB_CORRUPTION;
}
/* If we already had and old page with matching number
in the buffer pool, evict it now, because
we no longer evict the pages on DISCARD TABLESPACE. */
buf_page_get_gen(get_space_id(), get_zip_size(), block->page.offset,
RW_NO_LATCH, NULL, BUF_EVICT_IF_IN_POOL,
__FILE__, __LINE__, NULL);
return(err);
return DB_SUCCESS;
}
/*****************************************************************//**
......@@ -3424,6 +3352,460 @@ row_import_update_discarded_flag(
return(err);
}
struct fil_iterator_t {
pfs_os_file_t file; /*!< File handle */
const char* filepath; /*!< File path name */
os_offset_t start; /*!< From where to start */
os_offset_t end; /*!< Where to stop */
os_offset_t file_size; /*!< File size in bytes */
ulint page_size; /*!< Page size */
ulint n_io_buffers; /*!< Number of pages to use
for IO */
byte* io_buffer; /*!< Buffer to use for IO */
fil_space_crypt_t *crypt_data; /*!< Crypt data (if encrypted) */
byte* crypt_io_buffer; /*!< IO buffer when encrypted */
};
/********************************************************************//**
TODO: This can be made parallel trivially by chunking up the file and creating
a callback per thread. . Main benefit will be to use multiple CPUs for
checksums and compressed tables. We have to do compressed tables block by
block right now. Secondly we need to decompress/compress and copy too much
of data. These are CPU intensive.
Iterate over all the pages in the tablespace.
@param iter - Tablespace iterator
@param block - block to use for IO
@param callback - Callback to inspect and update page contents
@retval DB_SUCCESS or error code */
static
dberr_t
fil_iterate(
/*========*/
const fil_iterator_t& iter,
buf_block_t* block,
AbstractCallback& callback)
{
os_offset_t offset;
ulint n_bytes = iter.n_io_buffers * iter.page_size;
ut_ad(!srv_read_only_mode);
/* TODO: For ROW_FORMAT=COMPRESSED tables we do a lot of useless
copying for non-index pages. Unfortunately, it is
required by buf_zip_decompress() */
for (offset = iter.start; offset < iter.end; offset += n_bytes) {
if (callback.is_interrupted()) {
return DB_INTERRUPTED;
}
byte* io_buffer = iter.io_buffer;
block->frame = io_buffer;
if (block->page.zip.data) {
/* Zip IO is done in the compressed page buffer. */
io_buffer = block->page.zip.data;
ut_ad(PAGE_ZIP_MATCH(block->frame, &block->page.zip));
}
/* We have to read the exact number of bytes. Otherwise the
InnoDB IO functions croak on failed reads. */
n_bytes = ulint(ut_min(os_offset_t(n_bytes),
iter.end - offset));
ut_ad(n_bytes > 0);
ut_ad(!(n_bytes % iter.page_size));
const bool encrypted = iter.crypt_data != NULL
&& iter.crypt_data->should_encrypt();
/* Use additional crypt io buffer if tablespace is encrypted */
byte* const readptr = encrypted
? iter.crypt_io_buffer : io_buffer;
byte* const writeptr = readptr;
if (!os_file_read_no_error_handling(iter.file, readptr,
offset, n_bytes)) {
ib_logf(IB_LOG_LEVEL_ERROR, "os_file_read() failed");
return DB_IO_ERROR;
}
bool updated = false;
os_offset_t page_off = offset;
ulint n_pages_read = (ulint) n_bytes / iter.page_size;
bool decrypted = false;
const ulint size = iter.page_size;
block->page.offset = page_off / size;
for (ulint i = 0; i < n_pages_read;
++i, page_off += size, block->frame += size,
block->page.offset++) {
dberr_t err = DB_SUCCESS;
byte* src = readptr + (i * size);
byte* dst = io_buffer + (i * size);
bool frame_changed = false;
ulint page_type = mach_read_from_2(src+FIL_PAGE_TYPE);
const bool page_compressed
= page_type
== FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED
|| page_type == FIL_PAGE_PAGE_COMPRESSED;
const ulint page_no = page_get_page_no(src);
if (!page_no && page_off) {
const ulint* b = reinterpret_cast<const ulint*>
(src);
const ulint* const e = b + size / sizeof *b;
do {
if (*b++) {
goto page_corrupted;
}
} while (b != e);
/* Proceed to the next page,
because this one is all zero. */
continue;
}
if (page_no != page_off / size) {
goto page_corrupted;
}
if (encrypted) {
decrypted = fil_space_decrypt(
iter.crypt_data, dst,
iter.page_size, src, &err);
if (err != DB_SUCCESS) {
return err;
}
if (decrypted) {
updated = true;
} else {
if (!page_compressed
&& !block->page.zip.data) {
block->frame = src;
frame_changed = true;
} else {
memcpy(dst, src, size);
}
}
}
/* If the original page is page_compressed, we need
to decompress it before adjusting further. */
if (page_compressed) {
fil_decompress_page(NULL, dst, ulong(size),
NULL);
updated = true;
} else if (buf_page_is_corrupted(
false,
encrypted && !frame_changed
? dst : src,
callback.get_zip_size(), NULL)) {
page_corrupted:
ib_logf(IB_LOG_LEVEL_WARN,
"%s: Page %lu at offset "
UINT64PF " looks corrupted.",
callback.filename(),
ulong(offset / size), offset);
return DB_CORRUPTION;
}
if ((err = callback(page_off, block)) != DB_SUCCESS) {
return err;
} else if (!updated) {
updated = buf_block_get_state(block)
== BUF_BLOCK_FILE_PAGE;
}
/* If tablespace is encrypted we use additional
temporary scratch area where pages are read
for decrypting readptr == crypt_io_buffer != io_buffer.
Destination for decryption is a buffer pool block
block->frame == dst == io_buffer that is updated.
Pages that did not require decryption even when
tablespace is marked as encrypted are not copied
instead block->frame is set to src == readptr.
For encryption we again use temporary scratch area
writeptr != io_buffer == dst
that is then written to the tablespace
(1) For normal tables io_buffer == dst == writeptr
(2) For only page compressed tables
io_buffer == dst == writeptr
(3) For encrypted (and page compressed)
readptr != io_buffer == dst != writeptr
*/
ut_ad(!encrypted && !page_compressed ?
src == dst && dst == writeptr + (i * size):1);
ut_ad(page_compressed && !encrypted ?
src == dst && dst == writeptr + (i * size):1);
ut_ad(encrypted ?
src != dst && dst != writeptr + (i * size):1);
if (encrypted) {
memcpy(writeptr + (i * size),
callback.get_frame(block), size);
}
if (frame_changed) {
block->frame = dst;
}
src = io_buffer + (i * size);
if (page_compressed) {
ulint len = 0;
fil_compress_page(
NULL,
src,
NULL,
size,
0,/* FIXME: compression level */
512,/* FIXME: use proper block size */
encrypted,
&len);
updated = true;
}
/* If tablespace is encrypted, encrypt page before we
write it back. Note that we should not encrypt the
buffer that is in buffer pool. */
/* NOTE: At this stage of IMPORT the
buffer pool is not being used at all! */
if (decrypted && encrypted) {
byte *dest = writeptr + (i * size);
ulint space = mach_read_from_4(
src + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID);
ulint offset = mach_read_from_4(src + FIL_PAGE_OFFSET);
ib_uint64_t lsn = mach_read_from_8(src + FIL_PAGE_LSN);
byte* tmp = fil_encrypt_buf(
iter.crypt_data,
space,
offset,
lsn,
src,
iter.page_size == UNIV_PAGE_SIZE ? 0 : iter.page_size,
dest);
if (tmp == src) {
/* TODO: remove unnecessary memcpy's */
memcpy(dest, src, size);
}
updated = true;
}
}
/* A page was updated in the set, write back to disk. */
if (updated
&& !os_file_write(
iter.filepath, iter.file, writeptr,
offset, (ulint) n_bytes)) {
ib_logf(IB_LOG_LEVEL_ERROR, "os_file_write() failed");
return DB_IO_ERROR;
}
}
return DB_SUCCESS;
}
/********************************************************************//**
Iterate over all the pages in the tablespace.
@param table - the table definiton in the server
@param n_io_buffers - number of blocks to read and write together
@param callback - functor that will do the page updates
@return DB_SUCCESS or error code */
static
dberr_t
fil_tablespace_iterate(
/*===================*/
dict_table_t* table,
ulint n_io_buffers,
AbstractCallback& callback)
{
dberr_t err;
pfs_os_file_t file;
char* filepath;
ut_a(n_io_buffers > 0);
ut_ad(!srv_read_only_mode);
DBUG_EXECUTE_IF("ib_import_trigger_corruption_1",
return(DB_CORRUPTION););
if (DICT_TF_HAS_DATA_DIR(table->flags)) {
dict_get_and_save_data_dir_path(table, false);
ut_a(table->data_dir_path);
filepath = os_file_make_remote_pathname(
table->data_dir_path, table->name, "ibd");
} else {
filepath = fil_make_ibd_name(table->name, false);
}
{
ibool success;
file = os_file_create_simple_no_error_handling(
innodb_file_data_key, filepath,
OS_FILE_OPEN, OS_FILE_READ_WRITE, &success, FALSE);
DBUG_EXECUTE_IF("fil_tablespace_iterate_failure",
{
static bool once;
if (!once || ut_rnd_interval(0, 10) == 5) {
once = true;
success = FALSE;
os_file_close(file);
}
});
if (!success) {
/* The following call prints an error message */
os_file_get_last_error(true);
ib_logf(IB_LOG_LEVEL_ERROR,
"Trying to import a tablespace, but could not "
"open the tablespace file %s", filepath);
mem_free(filepath);
return(DB_TABLESPACE_NOT_FOUND);
} else {
err = DB_SUCCESS;
}
}
callback.set_file(filepath, file);
os_offset_t file_size = os_file_get_size(file);
ut_a(file_size != (os_offset_t) -1);
/* Allocate a page to read in the tablespace header, so that we
can determine the page size and zip_size (if it is compressed).
We allocate an extra page in case it is a compressed table. One
page is to ensure alignement. */
void* page_ptr = mem_alloc(3 * UNIV_PAGE_SIZE);
byte* page = static_cast<byte*>(ut_align(page_ptr, UNIV_PAGE_SIZE));
/* The block we will use for every physical page */
buf_block_t block;
memset(&block, 0, sizeof block);
block.frame = page;
block.page.space = callback.get_space_id();
block.page.io_fix = BUF_IO_NONE;
block.page.buf_fix_count = 1;
block.page.state = BUF_BLOCK_FILE_PAGE;
/* Read the first page and determine the page and zip size. */
if (!os_file_read_no_error_handling(file, page, 0, UNIV_PAGE_SIZE)) {
err = DB_IO_ERROR;
} else if ((err = callback.init(file_size, &block)) == DB_SUCCESS) {
if (const ulint zip_size = callback.get_zip_size()) {
page_zip_set_size(&block.page.zip, zip_size);
/* ROW_FORMAT=COMPRESSED is not optimised for block IO
for now. We do the IMPORT page by page. */
n_io_buffers = 1;
}
fil_iterator_t iter;
iter.file = file;
iter.start = 0;
iter.end = file_size;
iter.filepath = filepath;
iter.file_size = file_size;
iter.n_io_buffers = n_io_buffers;
iter.page_size = callback.get_page_size();
/* In MariaDB/MySQL 5.6 tablespace does not exist
during import, therefore we can't use space directly
here. */
ulint crypt_data_offset = fsp_header_get_crypt_offset(
callback.get_zip_size());
/* read (optional) crypt data */
iter.crypt_data = fil_space_read_crypt_data(
0, page, crypt_data_offset);
/** If tablespace is encrypted, it needs extra buffers */
if (iter.crypt_data != NULL) {
/* decrease io buffers so that memory
* consumption doesnt double
* note: the +1 is to avoid n_io_buffers getting down to 0 */
iter.n_io_buffers = (iter.n_io_buffers + 1) / 2;
}
/** Add an extra page for compressed page scratch area. */
void* io_buffer = mem_alloc(
(2 + iter.n_io_buffers) * UNIV_PAGE_SIZE);
iter.io_buffer = static_cast<byte*>(
ut_align(io_buffer, UNIV_PAGE_SIZE));
void* crypt_io_buffer = NULL;
if (iter.crypt_data != NULL) {
crypt_io_buffer = mem_alloc(
(2 + iter.n_io_buffers) * UNIV_PAGE_SIZE);
iter.crypt_io_buffer = static_cast<byte*>(
ut_align(crypt_io_buffer, UNIV_PAGE_SIZE));
}
if (block.page.zip.ssize) {
ut_ad(iter.n_io_buffers == 1);
block.frame = iter.io_buffer;
block.page.zip.data = block.frame + UNIV_PAGE_SIZE;
ut_d(block.page.zip.m_external = true);
}
err = fil_iterate(iter, &block, callback);
mem_free(io_buffer);
if (crypt_io_buffer != NULL) {
mem_free(crypt_io_buffer);
iter.crypt_io_buffer = NULL;
fil_space_destroy_crypt_data(&iter.crypt_data);
}
}
if (err == DB_SUCCESS) {
ib_logf(IB_LOG_LEVEL_INFO, "Sync to disk");
if (!os_file_flush(file)) {
ib_logf(IB_LOG_LEVEL_INFO, "os_file_flush() failed!");
err = DB_IO_ERROR;
} else {
ib_logf(IB_LOG_LEVEL_INFO, "Sync to disk - done!");
}
}
os_file_close(file);
mem_free(page_ptr);
mem_free(filepath);
return(err);
}
/*****************************************************************//**
Imports a tablespace. The space id in the .ibd file must match the space id
of the table in the data dictionary.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment