Commit 6a965666 authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'notifications-pipe-prep-20191115' of...

Merge tag 'notifications-pipe-prep-20191115' of git://git.kernel.org/pub/scm/linux/kernel/git/dhowells/linux-fs

Pull pipe rework from David Howells:
 "This is my set of preparatory patches for building a general
  notification queue on top of pipes. It makes a number of significant
  changes:

   - It removes the nr_exclusive argument from __wake_up_sync_key() as
     this is always 1. This prepares for the next step:

   - Adds wake_up_interruptible_sync_poll_locked() so that poll can be
     woken up from a function that's holding the poll waitqueue
     spinlock.

   - Change the pipe buffer ring to be managed in terms of unbounded
     head and tail indices rather than bounded index and length. This
     means that reading the pipe only needs to modify one index, not
     two.

   - A selection of helper functions are provided to query the state of
     the pipe buffer, plus a couple to apply updates to the pipe
     indices.

   - The pipe ring is allowed to have kernel-reserved slots. This allows
     many notification messages to be spliced in by the kernel without
     allowing userspace to pin too many pages if it writes to the same
     pipe.

   - Advance the head and tail indices inside the pipe waitqueue lock
     and use wake_up_interruptible_sync_poll_locked() to poke poll
     without having to take the lock twice.

   - Rearrange pipe_write() to preallocate the buffer it is going to
     write into and then drop the spinlock. This allows kernel
     notifications to then be added the ring whilst it is filling the
     buffer it allocated. The read side is stalled because the pipe
     mutex is still held.

   - Don't wake up readers on a pipe if there was already data in it
     when we added more.

   - Don't wake up writers on a pipe if the ring wasn't full before we
     removed a buffer"

* tag 'notifications-pipe-prep-20191115' of git://git.kernel.org/pub/scm/linux/kernel/git/dhowells/linux-fs:
  pipe: Remove sync on wake_ups
  pipe: Increase the writer-wakeup threshold to reduce context-switch count
  pipe: Check for ring full inside of the spinlock in pipe_write()
  pipe: Remove redundant wakeup from pipe_write()
  pipe: Rearrange sequence in pipe_write() to preallocate slot
  pipe: Conditionalise wakeup in pipe_read()
  pipe: Advance tail pointer inside of wait spinlock in pipe_read()
  pipe: Allow pipes to have kernel-reserved slots
  pipe: Use head and tail pointers for the ring, not cursor and length
  Add wake_up_interruptible_sync_poll_locked()
  Remove the nr_exclusive argument from __wake_up_sync_key()
  pipe: Reduce #inclusion of pipe_fs_i.h
parents 32ef9553 3c0edea9
...@@ -919,6 +919,7 @@ static ssize_t port_fops_splice_write(struct pipe_inode_info *pipe, ...@@ -919,6 +919,7 @@ static ssize_t port_fops_splice_write(struct pipe_inode_info *pipe,
.pos = *ppos, .pos = *ppos,
.u.data = &sgl, .u.data = &sgl,
}; };
unsigned int occupancy;
/* /*
* Rproc_serial does not yet support splice. To support splice * Rproc_serial does not yet support splice. To support splice
...@@ -929,21 +930,18 @@ static ssize_t port_fops_splice_write(struct pipe_inode_info *pipe, ...@@ -929,21 +930,18 @@ static ssize_t port_fops_splice_write(struct pipe_inode_info *pipe,
if (is_rproc_serial(port->out_vq->vdev)) if (is_rproc_serial(port->out_vq->vdev))
return -EINVAL; return -EINVAL;
/*
* pipe->nrbufs == 0 means there are no data to transfer,
* so this returns just 0 for no data.
*/
pipe_lock(pipe); pipe_lock(pipe);
if (!pipe->nrbufs) {
ret = 0; ret = 0;
if (pipe_empty(pipe->head, pipe->tail))
goto error_out; goto error_out;
}
ret = wait_port_writable(port, filp->f_flags & O_NONBLOCK); ret = wait_port_writable(port, filp->f_flags & O_NONBLOCK);
if (ret < 0) if (ret < 0)
goto error_out; goto error_out;
buf = alloc_buf(port->portdev->vdev, 0, pipe->nrbufs); occupancy = pipe_occupancy(pipe->head, pipe->tail);
buf = alloc_buf(port->portdev->vdev, 0, occupancy);
if (!buf) { if (!buf) {
ret = -ENOMEM; ret = -ENOMEM;
goto error_out; goto error_out;
...@@ -951,7 +949,7 @@ static ssize_t port_fops_splice_write(struct pipe_inode_info *pipe, ...@@ -951,7 +949,7 @@ static ssize_t port_fops_splice_write(struct pipe_inode_info *pipe,
sgl.n = 0; sgl.n = 0;
sgl.len = 0; sgl.len = 0;
sgl.size = pipe->nrbufs; sgl.size = occupancy;
sgl.sg = buf->sg; sgl.sg = buf->sg;
sg_init_table(sgl.sg, sgl.size); sg_init_table(sgl.sg, sgl.size);
ret = __splice_from_pipe(pipe, &sd, pipe_to_sg); ret = __splice_from_pipe(pipe, &sd, pipe_to_sg);
......
...@@ -59,7 +59,6 @@ ...@@ -59,7 +59,6 @@
#include <linux/kmod.h> #include <linux/kmod.h>
#include <linux/fsnotify.h> #include <linux/fsnotify.h>
#include <linux/fs_struct.h> #include <linux/fs_struct.h>
#include <linux/pipe_fs_i.h>
#include <linux/oom.h> #include <linux/oom.h>
#include <linux/compat.h> #include <linux/compat.h>
#include <linux/vmalloc.h> #include <linux/vmalloc.h>
......
...@@ -705,7 +705,7 @@ static int fuse_copy_fill(struct fuse_copy_state *cs) ...@@ -705,7 +705,7 @@ static int fuse_copy_fill(struct fuse_copy_state *cs)
cs->pipebufs++; cs->pipebufs++;
cs->nr_segs--; cs->nr_segs--;
} else { } else {
if (cs->nr_segs == cs->pipe->buffers) if (cs->nr_segs >= cs->pipe->max_usage)
return -EIO; return -EIO;
page = alloc_page(GFP_HIGHUSER); page = alloc_page(GFP_HIGHUSER);
...@@ -881,7 +881,7 @@ static int fuse_ref_page(struct fuse_copy_state *cs, struct page *page, ...@@ -881,7 +881,7 @@ static int fuse_ref_page(struct fuse_copy_state *cs, struct page *page,
struct pipe_buffer *buf; struct pipe_buffer *buf;
int err; int err;
if (cs->nr_segs == cs->pipe->buffers) if (cs->nr_segs >= cs->pipe->max_usage)
return -EIO; return -EIO;
err = unlock_request(cs->req); err = unlock_request(cs->req);
...@@ -1343,7 +1343,7 @@ static ssize_t fuse_dev_splice_read(struct file *in, loff_t *ppos, ...@@ -1343,7 +1343,7 @@ static ssize_t fuse_dev_splice_read(struct file *in, loff_t *ppos,
if (!fud) if (!fud)
return -EPERM; return -EPERM;
bufs = kvmalloc_array(pipe->buffers, sizeof(struct pipe_buffer), bufs = kvmalloc_array(pipe->max_usage, sizeof(struct pipe_buffer),
GFP_KERNEL); GFP_KERNEL);
if (!bufs) if (!bufs)
return -ENOMEM; return -ENOMEM;
...@@ -1355,7 +1355,7 @@ static ssize_t fuse_dev_splice_read(struct file *in, loff_t *ppos, ...@@ -1355,7 +1355,7 @@ static ssize_t fuse_dev_splice_read(struct file *in, loff_t *ppos,
if (ret < 0) if (ret < 0)
goto out; goto out;
if (pipe->nrbufs + cs.nr_segs > pipe->buffers) { if (pipe_occupancy(pipe->head, pipe->tail) + cs.nr_segs > pipe->max_usage) {
ret = -EIO; ret = -EIO;
goto out; goto out;
} }
...@@ -1937,6 +1937,7 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe, ...@@ -1937,6 +1937,7 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
struct file *out, loff_t *ppos, struct file *out, loff_t *ppos,
size_t len, unsigned int flags) size_t len, unsigned int flags)
{ {
unsigned int head, tail, mask, count;
unsigned nbuf; unsigned nbuf;
unsigned idx; unsigned idx;
struct pipe_buffer *bufs; struct pipe_buffer *bufs;
...@@ -1951,8 +1952,12 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe, ...@@ -1951,8 +1952,12 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
pipe_lock(pipe); pipe_lock(pipe);
bufs = kvmalloc_array(pipe->nrbufs, sizeof(struct pipe_buffer), head = pipe->head;
GFP_KERNEL); tail = pipe->tail;
mask = pipe->ring_size - 1;
count = head - tail;
bufs = kvmalloc_array(count, sizeof(struct pipe_buffer), GFP_KERNEL);
if (!bufs) { if (!bufs) {
pipe_unlock(pipe); pipe_unlock(pipe);
return -ENOMEM; return -ENOMEM;
...@@ -1960,8 +1965,8 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe, ...@@ -1960,8 +1965,8 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
nbuf = 0; nbuf = 0;
rem = 0; rem = 0;
for (idx = 0; idx < pipe->nrbufs && rem < len; idx++) for (idx = tail; idx < head && rem < len; idx++)
rem += pipe->bufs[(pipe->curbuf + idx) & (pipe->buffers - 1)].len; rem += pipe->bufs[idx & mask].len;
ret = -EINVAL; ret = -EINVAL;
if (rem < len) if (rem < len)
...@@ -1972,16 +1977,16 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe, ...@@ -1972,16 +1977,16 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
struct pipe_buffer *ibuf; struct pipe_buffer *ibuf;
struct pipe_buffer *obuf; struct pipe_buffer *obuf;
BUG_ON(nbuf >= pipe->buffers); BUG_ON(nbuf >= pipe->ring_size);
BUG_ON(!pipe->nrbufs); BUG_ON(tail == head);
ibuf = &pipe->bufs[pipe->curbuf]; ibuf = &pipe->bufs[tail & mask];
obuf = &bufs[nbuf]; obuf = &bufs[nbuf];
if (rem >= ibuf->len) { if (rem >= ibuf->len) {
*obuf = *ibuf; *obuf = *ibuf;
ibuf->ops = NULL; ibuf->ops = NULL;
pipe->curbuf = (pipe->curbuf + 1) & (pipe->buffers - 1); tail++;
pipe->nrbufs--; pipe->tail = tail;
} else { } else {
if (!pipe_buf_get(pipe, ibuf)) if (!pipe_buf_get(pipe, ibuf))
goto out_free; goto out_free;
......
...@@ -11,7 +11,6 @@ ...@@ -11,7 +11,6 @@
#include <linux/pagemap.h> #include <linux/pagemap.h>
#include <asm/byteorder.h> #include <asm/byteorder.h>
#include <linux/swap.h> #include <linux/swap.h>
#include <linux/pipe_fs_i.h>
#include <linux/mpage.h> #include <linux/mpage.h>
#include <linux/quotaops.h> #include <linux/quotaops.h>
#include <linux/blkdev.h> #include <linux/blkdev.h>
......
This diff is collapsed.
This diff is collapsed.
...@@ -30,9 +30,10 @@ struct pipe_buffer { ...@@ -30,9 +30,10 @@ struct pipe_buffer {
* struct pipe_inode_info - a linux kernel pipe * struct pipe_inode_info - a linux kernel pipe
* @mutex: mutex protecting the whole thing * @mutex: mutex protecting the whole thing
* @wait: reader/writer wait point in case of empty/full pipe * @wait: reader/writer wait point in case of empty/full pipe
* @nrbufs: the number of non-empty pipe buffers in this pipe * @head: The point of buffer production
* @buffers: total number of buffers (should be a power of 2) * @tail: The point of buffer consumption
* @curbuf: the current pipe buffer entry * @max_usage: The maximum number of slots that may be used in the ring
* @ring_size: total number of buffers (should be a power of 2)
* @tmp_page: cached released page * @tmp_page: cached released page
* @readers: number of current readers of this pipe * @readers: number of current readers of this pipe
* @writers: number of current writers of this pipe * @writers: number of current writers of this pipe
...@@ -48,7 +49,10 @@ struct pipe_buffer { ...@@ -48,7 +49,10 @@ struct pipe_buffer {
struct pipe_inode_info { struct pipe_inode_info {
struct mutex mutex; struct mutex mutex;
wait_queue_head_t wait; wait_queue_head_t wait;
unsigned int nrbufs, curbuf, buffers; unsigned int head;
unsigned int tail;
unsigned int max_usage;
unsigned int ring_size;
unsigned int readers; unsigned int readers;
unsigned int writers; unsigned int writers;
unsigned int files; unsigned int files;
...@@ -104,6 +108,58 @@ struct pipe_buf_operations { ...@@ -104,6 +108,58 @@ struct pipe_buf_operations {
bool (*get)(struct pipe_inode_info *, struct pipe_buffer *); bool (*get)(struct pipe_inode_info *, struct pipe_buffer *);
}; };
/**
* pipe_empty - Return true if the pipe is empty
* @head: The pipe ring head pointer
* @tail: The pipe ring tail pointer
*/
static inline bool pipe_empty(unsigned int head, unsigned int tail)
{
return head == tail;
}
/**
* pipe_occupancy - Return number of slots used in the pipe
* @head: The pipe ring head pointer
* @tail: The pipe ring tail pointer
*/
static inline unsigned int pipe_occupancy(unsigned int head, unsigned int tail)
{
return head - tail;
}
/**
* pipe_full - Return true if the pipe is full
* @head: The pipe ring head pointer
* @tail: The pipe ring tail pointer
* @limit: The maximum amount of slots available.
*/
static inline bool pipe_full(unsigned int head, unsigned int tail,
unsigned int limit)
{
return pipe_occupancy(head, tail) >= limit;
}
/**
* pipe_space_for_user - Return number of slots available to userspace
* @head: The pipe ring head pointer
* @tail: The pipe ring tail pointer
* @pipe: The pipe info structure
*/
static inline unsigned int pipe_space_for_user(unsigned int head, unsigned int tail,
struct pipe_inode_info *pipe)
{
unsigned int p_occupancy, p_space;
p_occupancy = pipe_occupancy(head, tail);
if (p_occupancy >= pipe->max_usage)
return 0;
p_space = pipe->ring_size - p_occupancy;
if (p_space > pipe->max_usage)
p_space = pipe->max_usage;
return p_space;
}
/** /**
* pipe_buf_get - get a reference to a pipe_buffer * pipe_buf_get - get a reference to a pipe_buffer
* @pipe: the pipe that the buffer belongs to * @pipe: the pipe that the buffer belongs to
......
...@@ -45,8 +45,8 @@ struct iov_iter { ...@@ -45,8 +45,8 @@ struct iov_iter {
union { union {
unsigned long nr_segs; unsigned long nr_segs;
struct { struct {
int idx; unsigned int head;
int start_idx; unsigned int start_head;
}; };
}; };
}; };
......
...@@ -201,9 +201,10 @@ void __wake_up(struct wait_queue_head *wq_head, unsigned int mode, int nr, void ...@@ -201,9 +201,10 @@ void __wake_up(struct wait_queue_head *wq_head, unsigned int mode, int nr, void
void __wake_up_locked_key(struct wait_queue_head *wq_head, unsigned int mode, void *key); void __wake_up_locked_key(struct wait_queue_head *wq_head, unsigned int mode, void *key);
void __wake_up_locked_key_bookmark(struct wait_queue_head *wq_head, void __wake_up_locked_key_bookmark(struct wait_queue_head *wq_head,
unsigned int mode, void *key, wait_queue_entry_t *bookmark); unsigned int mode, void *key, wait_queue_entry_t *bookmark);
void __wake_up_sync_key(struct wait_queue_head *wq_head, unsigned int mode, int nr, void *key); void __wake_up_sync_key(struct wait_queue_head *wq_head, unsigned int mode, void *key);
void __wake_up_locked_sync_key(struct wait_queue_head *wq_head, unsigned int mode, void *key);
void __wake_up_locked(struct wait_queue_head *wq_head, unsigned int mode, int nr); void __wake_up_locked(struct wait_queue_head *wq_head, unsigned int mode, int nr);
void __wake_up_sync(struct wait_queue_head *wq_head, unsigned int mode, int nr); void __wake_up_sync(struct wait_queue_head *wq_head, unsigned int mode);
#define wake_up(x) __wake_up(x, TASK_NORMAL, 1, NULL) #define wake_up(x) __wake_up(x, TASK_NORMAL, 1, NULL)
#define wake_up_nr(x, nr) __wake_up(x, TASK_NORMAL, nr, NULL) #define wake_up_nr(x, nr) __wake_up(x, TASK_NORMAL, nr, NULL)
...@@ -214,7 +215,7 @@ void __wake_up_sync(struct wait_queue_head *wq_head, unsigned int mode, int nr); ...@@ -214,7 +215,7 @@ void __wake_up_sync(struct wait_queue_head *wq_head, unsigned int mode, int nr);
#define wake_up_interruptible(x) __wake_up(x, TASK_INTERRUPTIBLE, 1, NULL) #define wake_up_interruptible(x) __wake_up(x, TASK_INTERRUPTIBLE, 1, NULL)
#define wake_up_interruptible_nr(x, nr) __wake_up(x, TASK_INTERRUPTIBLE, nr, NULL) #define wake_up_interruptible_nr(x, nr) __wake_up(x, TASK_INTERRUPTIBLE, nr, NULL)
#define wake_up_interruptible_all(x) __wake_up(x, TASK_INTERRUPTIBLE, 0, NULL) #define wake_up_interruptible_all(x) __wake_up(x, TASK_INTERRUPTIBLE, 0, NULL)
#define wake_up_interruptible_sync(x) __wake_up_sync((x), TASK_INTERRUPTIBLE, 1) #define wake_up_interruptible_sync(x) __wake_up_sync((x), TASK_INTERRUPTIBLE)
/* /*
* Wakeup macros to be used to report events to the targets. * Wakeup macros to be used to report events to the targets.
...@@ -228,7 +229,9 @@ void __wake_up_sync(struct wait_queue_head *wq_head, unsigned int mode, int nr); ...@@ -228,7 +229,9 @@ void __wake_up_sync(struct wait_queue_head *wq_head, unsigned int mode, int nr);
#define wake_up_interruptible_poll(x, m) \ #define wake_up_interruptible_poll(x, m) \
__wake_up(x, TASK_INTERRUPTIBLE, 1, poll_to_key(m)) __wake_up(x, TASK_INTERRUPTIBLE, 1, poll_to_key(m))
#define wake_up_interruptible_sync_poll(x, m) \ #define wake_up_interruptible_sync_poll(x, m) \
__wake_up_sync_key((x), TASK_INTERRUPTIBLE, 1, poll_to_key(m)) __wake_up_sync_key((x), TASK_INTERRUPTIBLE, poll_to_key(m))
#define wake_up_interruptible_sync_poll_locked(x, m) \
__wake_up_locked_sync_key((x), TASK_INTERRUPTIBLE, poll_to_key(m))
#define ___wait_cond_timeout(condition) \ #define ___wait_cond_timeout(condition) \
({ \ ({ \
......
...@@ -1409,7 +1409,7 @@ static int child_wait_callback(wait_queue_entry_t *wait, unsigned mode, ...@@ -1409,7 +1409,7 @@ static int child_wait_callback(wait_queue_entry_t *wait, unsigned mode,
void __wake_up_parent(struct task_struct *p, struct task_struct *parent) void __wake_up_parent(struct task_struct *p, struct task_struct *parent)
{ {
__wake_up_sync_key(&parent->signal->wait_chldexit, __wake_up_sync_key(&parent->signal->wait_chldexit,
TASK_INTERRUPTIBLE, 1, p); TASK_INTERRUPTIBLE, p);
} }
static long do_wait(struct wait_opts *wo) static long do_wait(struct wait_opts *wo)
......
...@@ -169,7 +169,6 @@ EXPORT_SYMBOL_GPL(__wake_up_locked_key_bookmark); ...@@ -169,7 +169,6 @@ EXPORT_SYMBOL_GPL(__wake_up_locked_key_bookmark);
* __wake_up_sync_key - wake up threads blocked on a waitqueue. * __wake_up_sync_key - wake up threads blocked on a waitqueue.
* @wq_head: the waitqueue * @wq_head: the waitqueue
* @mode: which threads * @mode: which threads
* @nr_exclusive: how many wake-one or wake-many threads to wake up
* @key: opaque value to be passed to wakeup targets * @key: opaque value to be passed to wakeup targets
* *
* The sync wakeup differs that the waker knows that it will schedule * The sync wakeup differs that the waker knows that it will schedule
...@@ -183,26 +182,44 @@ EXPORT_SYMBOL_GPL(__wake_up_locked_key_bookmark); ...@@ -183,26 +182,44 @@ EXPORT_SYMBOL_GPL(__wake_up_locked_key_bookmark);
* accessing the task state. * accessing the task state.
*/ */
void __wake_up_sync_key(struct wait_queue_head *wq_head, unsigned int mode, void __wake_up_sync_key(struct wait_queue_head *wq_head, unsigned int mode,
int nr_exclusive, void *key) void *key)
{ {
int wake_flags = 1; /* XXX WF_SYNC */
if (unlikely(!wq_head)) if (unlikely(!wq_head))
return; return;
if (unlikely(nr_exclusive != 1)) __wake_up_common_lock(wq_head, mode, 1, WF_SYNC, key);
wake_flags = 0;
__wake_up_common_lock(wq_head, mode, nr_exclusive, wake_flags, key);
} }
EXPORT_SYMBOL_GPL(__wake_up_sync_key); EXPORT_SYMBOL_GPL(__wake_up_sync_key);
/**
* __wake_up_locked_sync_key - wake up a thread blocked on a locked waitqueue.
* @wq_head: the waitqueue
* @mode: which threads
* @key: opaque value to be passed to wakeup targets
*
* The sync wakeup differs in that the waker knows that it will schedule
* away soon, so while the target thread will be woken up, it will not
* be migrated to another CPU - ie. the two threads are 'synchronized'
* with each other. This can prevent needless bouncing between CPUs.
*
* On UP it can prevent extra preemption.
*
* If this function wakes up a task, it executes a full memory barrier before
* accessing the task state.
*/
void __wake_up_locked_sync_key(struct wait_queue_head *wq_head,
unsigned int mode, void *key)
{
__wake_up_common(wq_head, mode, 1, WF_SYNC, key, NULL);
}
EXPORT_SYMBOL_GPL(__wake_up_locked_sync_key);
/* /*
* __wake_up_sync - see __wake_up_sync_key() * __wake_up_sync - see __wake_up_sync_key()
*/ */
void __wake_up_sync(struct wait_queue_head *wq_head, unsigned int mode, int nr_exclusive) void __wake_up_sync(struct wait_queue_head *wq_head, unsigned int mode)
{ {
__wake_up_sync_key(wq_head, mode, nr_exclusive, NULL); __wake_up_sync_key(wq_head, mode, NULL);
} }
EXPORT_SYMBOL_GPL(__wake_up_sync); /* For internal use only */ EXPORT_SYMBOL_GPL(__wake_up_sync); /* For internal use only */
......
This diff is collapsed.
...@@ -28,7 +28,6 @@ ...@@ -28,7 +28,6 @@
#include <linux/icmpv6.h> #include <linux/icmpv6.h>
#include <linux/slab.h> #include <linux/slab.h>
#include <linux/mutex.h> #include <linux/mutex.h>
#include <linux/pipe_fs_i.h>
#include <net/cipso_ipv4.h> #include <net/cipso_ipv4.h>
#include <net/ip.h> #include <net/ip.h>
#include <net/ipv6.h> #include <net/ipv6.h>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment