Commit d7817382 authored by Daniel Black's avatar Daniel Black

MDEV-27900: aio handle partial reads/writes

As btrfs showed, a partial read of data in AIO /O_DIRECT circumstances can
really confuse MariaDB.

Filipe Manana (SuSE)[1] showed how database programmers can assume
O_DIRECT is all or nothing.

While a fix was done in the kernel side, we can do better in our code by
requesting that the rest of the block be read/written synchronously if
we do only get a partial read/write.

Per the APIs, a partial read/write can occur before an error, so
reattempting the request will leave the caller with a concrete error to
handle.

[1] https://lore.kernel.org/linux-btrfs/CABVffENfbsC6HjGbskRZGR2NvxbnQi17gAuW65eOM+QRzsr8Bg@mail.gmail.com/T/#mb2738e675e48e0e0778a2e8d1537dec5ec0d3d3a

Also spell synchronously correctly in other files.
parent 4cfb6edd
...@@ -297,7 +297,7 @@ buf_read_page_low( ...@@ -297,7 +297,7 @@ buf_read_page_low(
/* Trx sys header is so low in the latching order that we play /* Trx sys header is so low in the latching order that we play
safe and do not leave the i/o-completion to an asynchronous safe and do not leave the i/o-completion to an asynchronous
i/o-thread. Change buffer pages must always be read with i/o-thread. Change buffer pages must always be read with
syncronous i/o, to make sure they do not get involved in synchronous i/o, to make sure they do not get involved in
thread deadlocks. */ thread deadlocks. */
sync = true; sync = true;
} }
......
...@@ -2807,7 +2807,7 @@ os_file_set_eof( ...@@ -2807,7 +2807,7 @@ os_file_set_eof(
#endif /* !_WIN32*/ #endif /* !_WIN32*/
/** Does a syncronous read or write depending upon the type specified /** Does a synchronous read or write depending upon the type specified
In case of partial reads/writes the function tries In case of partial reads/writes the function tries
NUM_RETRIES_ON_PARTIAL_IO times to read/write the complete data. NUM_RETRIES_ON_PARTIAL_IO times to read/write the complete data.
@param[in] type, IO flags @param[in] type, IO flags
......
...@@ -131,6 +131,8 @@ class aio_linux final : public aio ...@@ -131,6 +131,8 @@ class aio_linux final : public aio
{ {
iocb->m_ret_len= event.res; iocb->m_ret_len= event.res;
iocb->m_err= 0; iocb->m_err= 0;
if (iocb->m_ret_len != iocb->m_len)
finish_synchronous(iocb);
} }
iocb->m_internal_task.m_func= iocb->m_callback; iocb->m_internal_task.m_func= iocb->m_callback;
iocb->m_internal_task.m_arg= iocb; iocb->m_internal_task.m_arg= iocb;
......
...@@ -136,32 +136,7 @@ class simulated_aio : public aio ...@@ -136,32 +136,7 @@ class simulated_aio : public aio
static void simulated_aio_callback(void *param) static void simulated_aio_callback(void *param)
{ {
aiocb *cb= (aiocb *) param; aiocb *cb= (aiocb *) param;
#ifdef _WIN32 synchronous(cb);
size_t ret_len;
#else
ssize_t ret_len;
#endif
int err= 0;
switch (cb->m_opcode)
{
case aio_opcode::AIO_PREAD:
ret_len= pread(cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset);
break;
case aio_opcode::AIO_PWRITE:
ret_len= pwrite(cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset);
break;
default:
abort();
}
#ifdef _WIN32
if (static_cast<int>(ret_len) < 0)
err= GetLastError();
#else
if (ret_len < 0)
err= errno;
#endif
cb->m_ret_len = ret_len;
cb->m_err = err;
cb->m_internal_task.m_func= cb->m_callback; cb->m_internal_task.m_func= cb->m_callback;
thread_pool *pool= (thread_pool *)cb->m_internal; thread_pool *pool= (thread_pool *)cb->m_internal;
pool->submit_task(&cb->m_internal_task); pool->submit_task(&cb->m_internal_task);
......
...@@ -156,7 +156,7 @@ class aio ...@@ -156,7 +156,7 @@ class aio
{ {
public: public:
/** /**
Submit asyncronous IO. Submit asynchronous IO.
On completion, cb->m_callback is executed. On completion, cb->m_callback is executed.
*/ */
virtual int submit_io(aiocb *cb)= 0; virtual int submit_io(aiocb *cb)= 0;
...@@ -165,6 +165,10 @@ class aio ...@@ -165,6 +165,10 @@ class aio
/** "Unind" file to AIO handler (used on Windows only) */ /** "Unind" file to AIO handler (used on Windows only) */
virtual int unbind(const native_file_handle &fd)= 0; virtual int unbind(const native_file_handle &fd)= 0;
virtual ~aio(){}; virtual ~aio(){};
protected:
static void synchronous(aiocb *cb);
/** finish a partial read/write callback synchronously */
static void finish_synchronous(aiocb *cb);
}; };
class timer class timer
......
...@@ -47,6 +47,58 @@ namespace tpool ...@@ -47,6 +47,58 @@ namespace tpool
static const std::chrono::milliseconds LONG_TASK_DURATION = std::chrono::milliseconds(500); static const std::chrono::milliseconds LONG_TASK_DURATION = std::chrono::milliseconds(500);
static const int OVERSUBSCRIBE_FACTOR = 2; static const int OVERSUBSCRIBE_FACTOR = 2;
/**
Process the cb synchronously
*/
void aio::synchronous(aiocb *cb)
{
#ifdef _WIN32
size_t ret_len;
#else
ssize_t ret_len;
#endif
int err= 0;
switch (cb->m_opcode)
{
case aio_opcode::AIO_PREAD:
ret_len= pread(cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset);
break;
case aio_opcode::AIO_PWRITE:
ret_len= pwrite(cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset);
break;
default:
abort();
}
#ifdef _WIN32
if (static_cast<int>(ret_len) < 0)
err= GetLastError();
#else
if (ret_len < 0)
{
err= errno;
ret_len= 0;
}
#endif
cb->m_ret_len = ret_len;
cb->m_err = err;
if (!err && cb->m_ret_len != cb->m_len)
finish_synchronous(cb);
}
/**
A partial read/write has occured, continue synchronously.
*/
void aio::finish_synchronous(aiocb *cb)
{
assert(cb->m_ret_len != (unsigned int) cb->m_len && !cb->m_err);
/* partial read/write */
cb->m_buffer= (char *) cb->m_buffer + cb->m_ret_len;
cb->m_len-= (unsigned int) cb->m_ret_len;
cb->m_offset+= cb->m_ret_len;
synchronous(cb);
}
/** /**
Implementation of generic threadpool. Implementation of generic threadpool.
This threadpool consists of the following components This threadpool consists of the following components
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment