Commit fe0f07d0 authored by Jens Axboe's avatar Jens Axboe Committed by Al Viro

direct-io: only inc/dec inode->i_dio_count for file systems

do_blockdev_direct_IO() increments and decrements the inode
->i_dio_count for each IO operation. It does this to protect against
truncate of a file. Block devices don't need this sort of protection.

For a capable multiqueue setup, this atomic int is the only shared
state between applications accessing the device for O_DIRECT, and it
presents a scaling wall for that. In my testing, as much as 30% of
system time is spent incrementing and decrementing this value. A mixed
read/write workload improved from ~2.5M IOPS to ~9.6M IOPS, with
better latencies too. Before:

clat percentiles (usec):
 |  1.00th=[   33],  5.00th=[   34], 10.00th=[   34], 20.00th=[   34],
 | 30.00th=[   34], 40.00th=[   34], 50.00th=[   35], 60.00th=[   35],
 | 70.00th=[   35], 80.00th=[   35], 90.00th=[   37], 95.00th=[   80],
 | 99.00th=[   98], 99.50th=[  151], 99.90th=[  155], 99.95th=[  155],
 | 99.99th=[  165]

After:

clat percentiles (usec):
 |  1.00th=[   95],  5.00th=[  108], 10.00th=[  129], 20.00th=[  149],
 | 30.00th=[  155], 40.00th=[  161], 50.00th=[  167], 60.00th=[  171],
 | 70.00th=[  177], 80.00th=[  185], 90.00th=[  201], 95.00th=[  270],
 | 99.00th=[  390], 99.50th=[  398], 99.90th=[  418], 99.95th=[  422],
 | 99.99th=[  438]

In other setups, Robert Elliott reported seeing good performance
improvements:

https://lkml.org/lkml/2015/4/3/557

The more applications accessing the device, the worse it gets.

Add a new direct-io flags, DIO_SKIP_DIO_COUNT, which tells
do_blockdev_direct_IO() that it need not worry about incrementing
or decrementing the inode i_dio_count for this caller.

Cc: Andrew Morton <akpm@linux-foundation.org>
Cc: Christoph Hellwig <hch@lst.de>
Cc: Theodore Ts'o <tytso@mit.edu>
Cc: Elliott, Robert (Server Storage) <elliott@hp.com>
Cc: Al Viro <viro@zeniv.linux.org.uk>
Signed-off-by: default avatarJens Axboe <axboe@fb.com>
Signed-off-by: default avatarAl Viro <viro@zeniv.linux.org.uk>
parent 8e3c5005
...@@ -152,7 +152,8 @@ blkdev_direct_IO(struct kiocb *iocb, struct iov_iter *iter, loff_t offset) ...@@ -152,7 +152,8 @@ blkdev_direct_IO(struct kiocb *iocb, struct iov_iter *iter, loff_t offset)
struct inode *inode = file->f_mapping->host; struct inode *inode = file->f_mapping->host;
return __blockdev_direct_IO(iocb, inode, I_BDEV(inode), iter, offset, return __blockdev_direct_IO(iocb, inode, I_BDEV(inode), iter, offset,
blkdev_get_block, NULL, NULL, 0); blkdev_get_block, NULL, NULL,
DIO_SKIP_DIO_COUNT);
} }
int __sync_blockdev(struct block_device *bdev, int wait) int __sync_blockdev(struct block_device *bdev, int wait)
......
...@@ -8129,7 +8129,7 @@ static ssize_t btrfs_direct_IO(struct kiocb *iocb, struct iov_iter *iter, ...@@ -8129,7 +8129,7 @@ static ssize_t btrfs_direct_IO(struct kiocb *iocb, struct iov_iter *iter,
if (check_direct_IO(BTRFS_I(inode)->root, iocb, iter, offset)) if (check_direct_IO(BTRFS_I(inode)->root, iocb, iter, offset))
return 0; return 0;
atomic_inc(&inode->i_dio_count); inode_dio_begin(inode);
smp_mb__after_atomic(); smp_mb__after_atomic();
/* /*
...@@ -8169,7 +8169,7 @@ static ssize_t btrfs_direct_IO(struct kiocb *iocb, struct iov_iter *iter, ...@@ -8169,7 +8169,7 @@ static ssize_t btrfs_direct_IO(struct kiocb *iocb, struct iov_iter *iter,
current->journal_info = &outstanding_extents; current->journal_info = &outstanding_extents;
} else if (test_bit(BTRFS_INODE_READDIO_NEED_LOCK, } else if (test_bit(BTRFS_INODE_READDIO_NEED_LOCK,
&BTRFS_I(inode)->runtime_flags)) { &BTRFS_I(inode)->runtime_flags)) {
inode_dio_done(inode); inode_dio_end(inode);
flags = DIO_LOCKING | DIO_SKIP_HOLES; flags = DIO_LOCKING | DIO_SKIP_HOLES;
wakeup = false; wakeup = false;
} }
...@@ -8188,7 +8188,7 @@ static ssize_t btrfs_direct_IO(struct kiocb *iocb, struct iov_iter *iter, ...@@ -8188,7 +8188,7 @@ static ssize_t btrfs_direct_IO(struct kiocb *iocb, struct iov_iter *iter,
} }
out: out:
if (wakeup) if (wakeup)
inode_dio_done(inode); inode_dio_end(inode);
if (relock) if (relock)
mutex_lock(&inode->i_mutex); mutex_lock(&inode->i_mutex);
......
...@@ -209,7 +209,7 @@ ssize_t dax_do_io(struct kiocb *iocb, struct inode *inode, ...@@ -209,7 +209,7 @@ ssize_t dax_do_io(struct kiocb *iocb, struct inode *inode,
} }
/* Protects against truncate */ /* Protects against truncate */
atomic_inc(&inode->i_dio_count); inode_dio_begin(inode);
retval = dax_io(inode, iter, pos, end, get_block, &bh); retval = dax_io(inode, iter, pos, end, get_block, &bh);
...@@ -219,7 +219,7 @@ ssize_t dax_do_io(struct kiocb *iocb, struct inode *inode, ...@@ -219,7 +219,7 @@ ssize_t dax_do_io(struct kiocb *iocb, struct inode *inode,
if ((retval > 0) && end_io) if ((retval > 0) && end_io)
end_io(iocb, pos, retval, bh.b_private); end_io(iocb, pos, retval, bh.b_private);
inode_dio_done(inode); inode_dio_end(inode);
out: out:
return retval; return retval;
} }
......
...@@ -253,7 +253,9 @@ static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret, ...@@ -253,7 +253,9 @@ static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret,
if (dio->end_io && dio->result) if (dio->end_io && dio->result)
dio->end_io(dio->iocb, offset, transferred, dio->private); dio->end_io(dio->iocb, offset, transferred, dio->private);
inode_dio_done(dio->inode); if (!(dio->flags & DIO_SKIP_DIO_COUNT))
inode_dio_end(dio->inode);
if (is_async) { if (is_async) {
if (dio->rw & WRITE) { if (dio->rw & WRITE) {
int err; int err;
...@@ -1195,7 +1197,8 @@ do_blockdev_direct_IO(struct kiocb *iocb, struct inode *inode, ...@@ -1195,7 +1197,8 @@ do_blockdev_direct_IO(struct kiocb *iocb, struct inode *inode,
/* /*
* Will be decremented at I/O completion time. * Will be decremented at I/O completion time.
*/ */
atomic_inc(&inode->i_dio_count); if (!(dio->flags & DIO_SKIP_DIO_COUNT))
inode_dio_begin(inode);
retval = 0; retval = 0;
sdio.blkbits = blkbits; sdio.blkbits = blkbits;
......
...@@ -682,11 +682,11 @@ ssize_t ext4_ind_direct_IO(struct kiocb *iocb, struct iov_iter *iter, ...@@ -682,11 +682,11 @@ ssize_t ext4_ind_direct_IO(struct kiocb *iocb, struct iov_iter *iter,
* via ext4_inode_block_unlocked_dio(). Check inode's state * via ext4_inode_block_unlocked_dio(). Check inode's state
* while holding extra i_dio_count ref. * while holding extra i_dio_count ref.
*/ */
atomic_inc(&inode->i_dio_count); inode_dio_begin(inode);
smp_mb(); smp_mb();
if (unlikely(ext4_test_inode_state(inode, if (unlikely(ext4_test_inode_state(inode,
EXT4_STATE_DIOREAD_LOCK))) { EXT4_STATE_DIOREAD_LOCK))) {
inode_dio_done(inode); inode_dio_end(inode);
goto locked; goto locked;
} }
if (IS_DAX(inode)) if (IS_DAX(inode))
...@@ -697,7 +697,7 @@ ssize_t ext4_ind_direct_IO(struct kiocb *iocb, struct iov_iter *iter, ...@@ -697,7 +697,7 @@ ssize_t ext4_ind_direct_IO(struct kiocb *iocb, struct iov_iter *iter,
inode->i_sb->s_bdev, iter, inode->i_sb->s_bdev, iter,
offset, ext4_get_block, NULL, offset, ext4_get_block, NULL,
NULL, 0); NULL, 0);
inode_dio_done(inode); inode_dio_end(inode);
} else { } else {
locked: locked:
if (IS_DAX(inode)) if (IS_DAX(inode))
......
...@@ -2977,7 +2977,7 @@ static ssize_t ext4_ext_direct_IO(struct kiocb *iocb, struct iov_iter *iter, ...@@ -2977,7 +2977,7 @@ static ssize_t ext4_ext_direct_IO(struct kiocb *iocb, struct iov_iter *iter,
* overwrite DIO as i_dio_count needs to be incremented under i_mutex. * overwrite DIO as i_dio_count needs to be incremented under i_mutex.
*/ */
if (iov_iter_rw(iter) == WRITE) if (iov_iter_rw(iter) == WRITE)
atomic_inc(&inode->i_dio_count); inode_dio_begin(inode);
/* If we do a overwrite dio, i_mutex locking can be released */ /* If we do a overwrite dio, i_mutex locking can be released */
overwrite = *((int *)iocb->private); overwrite = *((int *)iocb->private);
...@@ -3079,7 +3079,7 @@ static ssize_t ext4_ext_direct_IO(struct kiocb *iocb, struct iov_iter *iter, ...@@ -3079,7 +3079,7 @@ static ssize_t ext4_ext_direct_IO(struct kiocb *iocb, struct iov_iter *iter,
retake_lock: retake_lock:
if (iov_iter_rw(iter) == WRITE) if (iov_iter_rw(iter) == WRITE)
inode_dio_done(inode); inode_dio_end(inode);
/* take i_mutex locking again if we do a ovewrite dio */ /* take i_mutex locking again if we do a ovewrite dio */
if (overwrite) { if (overwrite) {
up_read(&EXT4_I(inode)->i_data_sem); up_read(&EXT4_I(inode)->i_data_sem);
......
...@@ -1945,20 +1945,6 @@ void inode_dio_wait(struct inode *inode) ...@@ -1945,20 +1945,6 @@ void inode_dio_wait(struct inode *inode)
} }
EXPORT_SYMBOL(inode_dio_wait); EXPORT_SYMBOL(inode_dio_wait);
/*
* inode_dio_done - signal finish of a direct I/O requests
* @inode: inode the direct I/O happens on
*
* This is called once we've finished processing a direct I/O request,
* and is used to wake up callers waiting for direct I/O to be quiesced.
*/
void inode_dio_done(struct inode *inode)
{
if (atomic_dec_and_test(&inode->i_dio_count))
wake_up_bit(&inode->i_state, __I_DIO_WAKEUP);
}
EXPORT_SYMBOL(inode_dio_done);
/* /*
* inode_set_flags - atomically set some inode flags * inode_set_flags - atomically set some inode flags
* *
......
...@@ -386,7 +386,7 @@ static void nfs_direct_complete(struct nfs_direct_req *dreq, bool write) ...@@ -386,7 +386,7 @@ static void nfs_direct_complete(struct nfs_direct_req *dreq, bool write)
if (write) if (write)
nfs_zap_mapping(inode, inode->i_mapping); nfs_zap_mapping(inode, inode->i_mapping);
inode_dio_done(inode); inode_dio_end(inode);
if (dreq->iocb) { if (dreq->iocb) {
long res = (long) dreq->error; long res = (long) dreq->error;
...@@ -486,7 +486,7 @@ static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq, ...@@ -486,7 +486,7 @@ static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq,
&nfs_direct_read_completion_ops); &nfs_direct_read_completion_ops);
get_dreq(dreq); get_dreq(dreq);
desc.pg_dreq = dreq; desc.pg_dreq = dreq;
atomic_inc(&inode->i_dio_count); inode_dio_begin(inode);
while (iov_iter_count(iter)) { while (iov_iter_count(iter)) {
struct page **pagevec; struct page **pagevec;
...@@ -538,7 +538,7 @@ static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq, ...@@ -538,7 +538,7 @@ static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq,
* generic layer handle the completion. * generic layer handle the completion.
*/ */
if (requested_bytes == 0) { if (requested_bytes == 0) {
inode_dio_done(inode); inode_dio_end(inode);
nfs_direct_req_release(dreq); nfs_direct_req_release(dreq);
return result < 0 ? result : -EIO; return result < 0 ? result : -EIO;
} }
...@@ -872,7 +872,7 @@ static ssize_t nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq, ...@@ -872,7 +872,7 @@ static ssize_t nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq,
&nfs_direct_write_completion_ops); &nfs_direct_write_completion_ops);
desc.pg_dreq = dreq; desc.pg_dreq = dreq;
get_dreq(dreq); get_dreq(dreq);
atomic_inc(&inode->i_dio_count); inode_dio_begin(inode);
NFS_I(inode)->write_io += iov_iter_count(iter); NFS_I(inode)->write_io += iov_iter_count(iter);
while (iov_iter_count(iter)) { while (iov_iter_count(iter)) {
...@@ -928,7 +928,7 @@ static ssize_t nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq, ...@@ -928,7 +928,7 @@ static ssize_t nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq,
* generic layer handle the completion. * generic layer handle the completion.
*/ */
if (requested_bytes == 0) { if (requested_bytes == 0) {
inode_dio_done(inode); inode_dio_end(inode);
nfs_direct_req_release(dreq); nfs_direct_req_release(dreq);
return result < 0 ? result : -EIO; return result < 0 ? result : -EIO;
} }
......
...@@ -2635,6 +2635,9 @@ enum { ...@@ -2635,6 +2635,9 @@ enum {
/* filesystem can handle aio writes beyond i_size */ /* filesystem can handle aio writes beyond i_size */
DIO_ASYNC_EXTEND = 0x04, DIO_ASYNC_EXTEND = 0x04,
/* inode/fs/bdev does not need truncate protection */
DIO_SKIP_DIO_COUNT = 0x08,
}; };
void dio_end_io(struct bio *bio, int error); void dio_end_io(struct bio *bio, int error);
...@@ -2657,7 +2660,31 @@ static inline ssize_t blockdev_direct_IO(struct kiocb *iocb, ...@@ -2657,7 +2660,31 @@ static inline ssize_t blockdev_direct_IO(struct kiocb *iocb,
#endif #endif
void inode_dio_wait(struct inode *inode); void inode_dio_wait(struct inode *inode);
void inode_dio_done(struct inode *inode);
/*
* inode_dio_begin - signal start of a direct I/O requests
* @inode: inode the direct I/O happens on
*
* This is called once we've finished processing a direct I/O request,
* and is used to wake up callers waiting for direct I/O to be quiesced.
*/
static inline void inode_dio_begin(struct inode *inode)
{
atomic_inc(&inode->i_dio_count);
}
/*
* inode_dio_end - signal finish of a direct I/O requests
* @inode: inode the direct I/O happens on
*
* This is called once we've finished processing a direct I/O request,
* and is used to wake up callers waiting for direct I/O to be quiesced.
*/
static inline void inode_dio_end(struct inode *inode)
{
if (atomic_dec_and_test(&inode->i_dio_count))
wake_up_bit(&inode->i_state, __I_DIO_WAKEUP);
}
extern void inode_set_flags(struct inode *inode, unsigned int flags, extern void inode_set_flags(struct inode *inode, unsigned int flags,
unsigned int mask); unsigned int mask);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment