Commit da77005f authored by Trond Myklebust's avatar Trond Myklebust

SUNRPC: Remove the global temporary write buffer in net/sunrpc/cache.c

While we do want to protect against multiple concurrent readers and writers
on each upcall/downcall pipe, we don't want to limit concurrent reading and
writing to separate caches.

This patch therefore replaces the static buffer 'write_buf', which can only
be used by one writer at a time, with use of the page cache as the
temporary buffer for downcalls. We still fall back to using the the old
global buffer if the downcall is larger than PAGE_CACHE_SIZE, since this is
apparently needed by the SPKM security context initialisation.

It then replaces the use of the global 'queue_io_mutex' with the
inode->i_mutex in cache_read() and cache_write().
Signed-off-by: default avatarTrond Myklebust <Trond.Myklebust@netapp.com>
parent 5b7a1b9f
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
#include <linux/net.h> #include <linux/net.h>
#include <linux/workqueue.h> #include <linux/workqueue.h>
#include <linux/mutex.h> #include <linux/mutex.h>
#include <linux/pagemap.h>
#include <asm/ioctls.h> #include <asm/ioctls.h>
#include <linux/sunrpc/types.h> #include <linux/sunrpc/types.h>
#include <linux/sunrpc/cache.h> #include <linux/sunrpc/cache.h>
...@@ -702,13 +703,14 @@ cache_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos) ...@@ -702,13 +703,14 @@ cache_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos)
{ {
struct cache_reader *rp = filp->private_data; struct cache_reader *rp = filp->private_data;
struct cache_request *rq; struct cache_request *rq;
struct cache_detail *cd = PDE(filp->f_path.dentry->d_inode)->data; struct inode *inode = filp->f_path.dentry->d_inode;
struct cache_detail *cd = PDE(inode)->data;
int err; int err;
if (count == 0) if (count == 0)
return 0; return 0;
mutex_lock(&queue_io_mutex); /* protect against multiple concurrent mutex_lock(&inode->i_mutex); /* protect against multiple concurrent
* readers on this file */ * readers on this file */
again: again:
spin_lock(&queue_lock); spin_lock(&queue_lock);
...@@ -721,7 +723,7 @@ cache_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos) ...@@ -721,7 +723,7 @@ cache_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos)
} }
if (rp->q.list.next == &cd->queue) { if (rp->q.list.next == &cd->queue) {
spin_unlock(&queue_lock); spin_unlock(&queue_lock);
mutex_unlock(&queue_io_mutex); mutex_unlock(&inode->i_mutex);
BUG_ON(rp->offset); BUG_ON(rp->offset);
return 0; return 0;
} }
...@@ -768,38 +770,81 @@ cache_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos) ...@@ -768,38 +770,81 @@ cache_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos)
} }
if (err == -EAGAIN) if (err == -EAGAIN)
goto again; goto again;
mutex_unlock(&queue_io_mutex); mutex_unlock(&inode->i_mutex);
return err ? err : count; return err ? err : count;
} }
static char write_buf[8192]; /* protected by queue_io_mutex */ static ssize_t cache_do_downcall(char *kaddr, const char __user *buf,
size_t count, struct cache_detail *cd)
{
ssize_t ret;
static ssize_t if (copy_from_user(kaddr, buf, count))
cache_write(struct file *filp, const char __user *buf, size_t count, return -EFAULT;
loff_t *ppos) kaddr[count] = '\0';
ret = cd->cache_parse(cd, kaddr, count);
if (!ret)
ret = count;
return ret;
}
static ssize_t cache_slow_downcall(const char __user *buf,
size_t count, struct cache_detail *cd)
{ {
int err; static char write_buf[8192]; /* protected by queue_io_mutex */
struct cache_detail *cd = PDE(filp->f_path.dentry->d_inode)->data; ssize_t ret = -EINVAL;
if (count == 0)
return 0;
if (count >= sizeof(write_buf)) if (count >= sizeof(write_buf))
return -EINVAL; goto out;
mutex_lock(&queue_io_mutex); mutex_lock(&queue_io_mutex);
ret = cache_do_downcall(write_buf, buf, count, cd);
if (copy_from_user(write_buf, buf, count)) {
mutex_unlock(&queue_io_mutex); mutex_unlock(&queue_io_mutex);
return -EFAULT; out:
} return ret;
write_buf[count] = '\0'; }
if (cd->cache_parse)
err = cd->cache_parse(cd, write_buf, count);
else
err = -EINVAL;
mutex_unlock(&queue_io_mutex); static ssize_t cache_downcall(struct address_space *mapping,
return err ? err : count; const char __user *buf,
size_t count, struct cache_detail *cd)
{
struct page *page;
char *kaddr;
ssize_t ret = -ENOMEM;
if (count >= PAGE_CACHE_SIZE)
goto out_slow;
page = find_or_create_page(mapping, 0, GFP_KERNEL);
if (!page)
goto out_slow;
kaddr = kmap(page);
ret = cache_do_downcall(kaddr, buf, count, cd);
kunmap(page);
unlock_page(page);
page_cache_release(page);
return ret;
out_slow:
return cache_slow_downcall(buf, count, cd);
}
static ssize_t
cache_write(struct file *filp, const char __user *buf, size_t count,
loff_t *ppos)
{
struct address_space *mapping = filp->f_mapping;
struct inode *inode = filp->f_path.dentry->d_inode;
struct cache_detail *cd = PDE(inode)->data;
ssize_t ret = -EINVAL;
if (!cd->cache_parse)
goto out;
mutex_lock(&inode->i_mutex);
ret = cache_downcall(mapping, buf, count, cd);
mutex_unlock(&inode->i_mutex);
out:
return ret;
} }
static DECLARE_WAIT_QUEUE_HEAD(queue_wait); static DECLARE_WAIT_QUEUE_HEAD(queue_wait);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment