Commit 1fe8529d authored by Trond Myklebust's avatar Trond Myklebust

RPC: add fair queueing to the RPC scheduler.

If a wait queue is defined as a "priority queue" then requests are dequeued
in blocks of 16 in order to work well with write gathering + readahead on the
server.
There are 3 levels of priority. The high priority tasks get scheduled 16 times
for each time the default level gets scheduled. The lowest level gets
scheduled once every 4 times the normal level gets scheduled.
Original patch contributed by Shantanu Goel.
parent 3208896d
...@@ -105,7 +105,7 @@ nfs4_alloc_client(struct in_addr *addr) ...@@ -105,7 +105,7 @@ nfs4_alloc_client(struct in_addr *addr)
INIT_WORK(&clp->cl_renewd, nfs4_renew_state, clp); INIT_WORK(&clp->cl_renewd, nfs4_renew_state, clp);
INIT_LIST_HEAD(&clp->cl_superblocks); INIT_LIST_HEAD(&clp->cl_superblocks);
init_waitqueue_head(&clp->cl_waitq); init_waitqueue_head(&clp->cl_waitq);
INIT_RPC_WAITQ(&clp->cl_rpcwaitq, "NFS4 client"); rpc_init_wait_queue(&clp->cl_rpcwaitq, "NFS4 client");
clp->cl_state = 1 << NFS4CLNT_NEW; clp->cl_state = 1 << NFS4CLNT_NEW;
} }
return clp; return clp;
......
...@@ -234,6 +234,7 @@ static void nfs_read_rpcsetup(struct nfs_page *req, struct nfs_read_data *data, ...@@ -234,6 +234,7 @@ static void nfs_read_rpcsetup(struct nfs_page *req, struct nfs_read_data *data,
NFS_PROTO(inode)->read_setup(data); NFS_PROTO(inode)->read_setup(data);
data->task.tk_cookie = (unsigned long)inode;
data->task.tk_calldata = data; data->task.tk_calldata = data;
/* Release requests */ /* Release requests */
data->task.tk_release = nfs_readdata_release; data->task.tk_release = nfs_readdata_release;
......
...@@ -173,15 +173,14 @@ static void nfs_mark_uptodate(struct page *page, unsigned int base, unsigned int ...@@ -173,15 +173,14 @@ static void nfs_mark_uptodate(struct page *page, unsigned int base, unsigned int
* Write a page synchronously. * Write a page synchronously.
* Offset is the data offset within the page. * Offset is the data offset within the page.
*/ */
static int static int nfs_writepage_sync(struct file *file, struct inode *inode,
nfs_writepage_sync(struct file *file, struct inode *inode, struct page *page, struct page *page, unsigned int offset, unsigned int count,
unsigned int offset, unsigned int count) int how)
{ {
unsigned int wsize = NFS_SERVER(inode)->wsize; unsigned int wsize = NFS_SERVER(inode)->wsize;
int result, written = 0; int result, written = 0;
int swapfile = IS_SWAPFILE(inode);
struct nfs_write_data wdata = { struct nfs_write_data wdata = {
.flags = swapfile ? NFS_RPC_SWAPFLAGS : 0, .flags = how,
.cred = NULL, .cred = NULL,
.inode = inode, .inode = inode,
.args = { .args = {
...@@ -205,7 +204,7 @@ nfs_writepage_sync(struct file *file, struct inode *inode, struct page *page, ...@@ -205,7 +204,7 @@ nfs_writepage_sync(struct file *file, struct inode *inode, struct page *page,
nfs_begin_data_update(inode); nfs_begin_data_update(inode);
do { do {
if (count < wsize && !swapfile) if (count < wsize)
wdata.args.count = count; wdata.args.count = count;
wdata.args.offset = page_offset(page) + wdata.args.pgbase; wdata.args.offset = page_offset(page) + wdata.args.pgbase;
...@@ -260,6 +259,15 @@ static int nfs_writepage_async(struct file *file, struct inode *inode, ...@@ -260,6 +259,15 @@ static int nfs_writepage_async(struct file *file, struct inode *inode,
return status; return status;
} }
static int wb_priority(struct writeback_control *wbc)
{
if (wbc->for_reclaim)
return FLUSH_HIGHPRI;
if (wbc->for_kupdate)
return FLUSH_LOWPRI;
return 0;
}
/* /*
* Write an mmapped page to the server. * Write an mmapped page to the server.
*/ */
...@@ -270,6 +278,7 @@ int nfs_writepage(struct page *page, struct writeback_control *wbc) ...@@ -270,6 +278,7 @@ int nfs_writepage(struct page *page, struct writeback_control *wbc)
unsigned offset = PAGE_CACHE_SIZE; unsigned offset = PAGE_CACHE_SIZE;
loff_t i_size = i_size_read(inode); loff_t i_size = i_size_read(inode);
int inode_referenced = 0; int inode_referenced = 0;
int priority = wb_priority(wbc);
int err; int err;
/* /*
...@@ -285,7 +294,7 @@ int nfs_writepage(struct page *page, struct writeback_control *wbc) ...@@ -285,7 +294,7 @@ int nfs_writepage(struct page *page, struct writeback_control *wbc)
end_index = i_size >> PAGE_CACHE_SHIFT; end_index = i_size >> PAGE_CACHE_SHIFT;
/* Ensure we've flushed out any previous writes */ /* Ensure we've flushed out any previous writes */
nfs_wb_page(inode,page); nfs_wb_page_priority(inode, page, priority);
/* easy case */ /* easy case */
if (page->index < end_index) if (page->index < end_index)
...@@ -307,7 +316,7 @@ int nfs_writepage(struct page *page, struct writeback_control *wbc) ...@@ -307,7 +316,7 @@ int nfs_writepage(struct page *page, struct writeback_control *wbc)
err = WRITEPAGE_ACTIVATE; err = WRITEPAGE_ACTIVATE;
} }
} else { } else {
err = nfs_writepage_sync(NULL, inode, page, 0, offset); err = nfs_writepage_sync(NULL, inode, page, 0, offset, priority);
if (err == offset) if (err == offset)
err = 0; err = 0;
} }
...@@ -338,7 +347,7 @@ int nfs_writepages(struct address_space *mapping, struct writeback_control *wbc) ...@@ -338,7 +347,7 @@ int nfs_writepages(struct address_space *mapping, struct writeback_control *wbc)
return 0; return 0;
nfs_wait_on_write_congestion(mapping, 0); nfs_wait_on_write_congestion(mapping, 0);
} }
err = nfs_flush_inode(inode, 0, 0, 0); err = nfs_flush_inode(inode, 0, 0, wb_priority(wbc));
if (err < 0) if (err < 0)
goto out; goto out;
wbc->nr_to_write -= err; wbc->nr_to_write -= err;
...@@ -347,7 +356,7 @@ int nfs_writepages(struct address_space *mapping, struct writeback_control *wbc) ...@@ -347,7 +356,7 @@ int nfs_writepages(struct address_space *mapping, struct writeback_control *wbc)
if (err < 0) if (err < 0)
goto out; goto out;
} }
err = nfs_commit_inode(inode, 0, 0, 0); err = nfs_commit_inode(inode, 0, 0, wb_priority(wbc));
if (err > 0) if (err > 0)
wbc->nr_to_write -= err; wbc->nr_to_write -= err;
out: out:
...@@ -719,8 +728,8 @@ nfs_flush_incompatible(struct file *file, struct page *page) ...@@ -719,8 +728,8 @@ nfs_flush_incompatible(struct file *file, struct page *page)
* XXX: Keep an eye on generic_file_read to make sure it doesn't do bad * XXX: Keep an eye on generic_file_read to make sure it doesn't do bad
* things with a page scheduled for an RPC call (e.g. invalidate it). * things with a page scheduled for an RPC call (e.g. invalidate it).
*/ */
int int nfs_updatepage(struct file *file, struct page *page,
nfs_updatepage(struct file *file, struct page *page, unsigned int offset, unsigned int count) unsigned int offset, unsigned int count)
{ {
struct dentry *dentry = file->f_dentry; struct dentry *dentry = file->f_dentry;
struct inode *inode = page->mapping->host; struct inode *inode = page->mapping->host;
...@@ -732,7 +741,7 @@ nfs_updatepage(struct file *file, struct page *page, unsigned int offset, unsign ...@@ -732,7 +741,7 @@ nfs_updatepage(struct file *file, struct page *page, unsigned int offset, unsign
count, (long long)(page_offset(page) +offset)); count, (long long)(page_offset(page) +offset));
if (IS_SYNC(inode)) { if (IS_SYNC(inode)) {
status = nfs_writepage_sync(file, inode, page, offset, count); status = nfs_writepage_sync(file, inode, page, offset, count, 0);
if (status > 0) { if (status > 0) {
if (offset == 0 && status == PAGE_CACHE_SIZE) if (offset == 0 && status == PAGE_CACHE_SIZE)
SetPageUptodate(page); SetPageUptodate(page);
...@@ -819,6 +828,17 @@ static void nfs_writepage_release(struct nfs_page *req) ...@@ -819,6 +828,17 @@ static void nfs_writepage_release(struct nfs_page *req)
nfs_unlock_request(req); nfs_unlock_request(req);
} }
static inline int flush_task_priority(int how)
{
switch (how & (FLUSH_HIGHPRI|FLUSH_LOWPRI)) {
case FLUSH_HIGHPRI:
return RPC_PRIORITY_HIGH;
case FLUSH_LOWPRI:
return RPC_PRIORITY_LOW;
}
return RPC_PRIORITY_NORMAL;
}
/* /*
* Set up the argument/result storage required for the RPC call. * Set up the argument/result storage required for the RPC call.
*/ */
...@@ -851,6 +871,8 @@ static void nfs_write_rpcsetup(struct nfs_page *req, ...@@ -851,6 +871,8 @@ static void nfs_write_rpcsetup(struct nfs_page *req,
NFS_PROTO(inode)->write_setup(data, how); NFS_PROTO(inode)->write_setup(data, how);
data->task.tk_priority = flush_task_priority(how);
data->task.tk_cookie = (unsigned long)inode;
data->task.tk_calldata = data; data->task.tk_calldata = data;
/* Release requests */ /* Release requests */
data->task.tk_release = nfs_writedata_release; data->task.tk_release = nfs_writedata_release;
...@@ -1212,6 +1234,8 @@ static void nfs_commit_rpcsetup(struct list_head *head, ...@@ -1212,6 +1234,8 @@ static void nfs_commit_rpcsetup(struct list_head *head,
NFS_PROTO(inode)->commit_setup(data, how); NFS_PROTO(inode)->commit_setup(data, how);
data->task.tk_priority = flush_task_priority(how);
data->task.tk_cookie = (unsigned long)inode;
data->task.tk_calldata = data; data->task.tk_calldata = data;
/* Release requests */ /* Release requests */
data->task.tk_release = nfs_commit_release; data->task.tk_release = nfs_commit_release;
......
...@@ -69,6 +69,8 @@ ...@@ -69,6 +69,8 @@
#define FLUSH_SYNC 1 /* file being synced, or contention */ #define FLUSH_SYNC 1 /* file being synced, or contention */
#define FLUSH_WAIT 2 /* wait for completion */ #define FLUSH_WAIT 2 /* wait for completion */
#define FLUSH_STABLE 4 /* commit to stable storage */ #define FLUSH_STABLE 4 /* commit to stable storage */
#define FLUSH_LOWPRI 8 /* low priority background flush */
#define FLUSH_HIGHPRI 16 /* high priority memory reclaim flush */
#ifdef __KERNEL__ #ifdef __KERNEL__
...@@ -374,14 +376,18 @@ nfs_wb_all(struct inode *inode) ...@@ -374,14 +376,18 @@ nfs_wb_all(struct inode *inode)
/* /*
* Write back all requests on one page - we do this before reading it. * Write back all requests on one page - we do this before reading it.
*/ */
static inline int static inline int nfs_wb_page_priority(struct inode *inode, struct page* page, int how)
nfs_wb_page(struct inode *inode, struct page* page)
{ {
int error = nfs_sync_inode(inode, page->index, 1, int error = nfs_sync_inode(inode, page->index, 1,
FLUSH_WAIT | FLUSH_STABLE); how | FLUSH_WAIT | FLUSH_STABLE);
return (error < 0) ? error : 0; return (error < 0) ? error : 0;
} }
static inline int nfs_wb_page(struct inode *inode, struct page* page)
{
return nfs_wb_page_priority(inode, page, 0);
}
/* Hack for future NFS swap support */ /* Hack for future NFS swap support */
#ifndef IS_SWAPFILE #ifndef IS_SWAPFILE
# define IS_SWAPFILE(inode) (0) # define IS_SWAPFILE(inode) (0)
......
...@@ -49,6 +49,8 @@ struct rpc_task { ...@@ -49,6 +49,8 @@ struct rpc_task {
tk_cred_retry, tk_cred_retry,
tk_suid_retry; tk_suid_retry;
unsigned long tk_cookie; /* Cookie for batching tasks */
/* /*
* timeout_fn to be executed by timer bottom half * timeout_fn to be executed by timer bottom half
* callback to be executed after waking up * callback to be executed after waking up
...@@ -72,7 +74,9 @@ struct rpc_task { ...@@ -72,7 +74,9 @@ struct rpc_task {
unsigned long tk_timeout; /* timeout for rpc_sleep() */ unsigned long tk_timeout; /* timeout for rpc_sleep() */
unsigned short tk_flags; /* misc flags */ unsigned short tk_flags; /* misc flags */
unsigned char tk_active : 1;/* Task has been activated */ unsigned char tk_active : 1;/* Task has been activated */
unsigned char tk_priority : 2;/* Task priority */
unsigned long tk_runstate; /* Task run status */ unsigned long tk_runstate; /* Task run status */
struct list_head tk_links; /* links to related tasks */
#ifdef RPC_DEBUG #ifdef RPC_DEBUG
unsigned short tk_pid; /* debugging aid */ unsigned short tk_pid; /* debugging aid */
#endif #endif
...@@ -137,29 +141,59 @@ typedef void (*rpc_action)(struct rpc_task *); ...@@ -137,29 +141,59 @@ typedef void (*rpc_action)(struct rpc_task *);
smp_mb__after_clear_bit(); \ smp_mb__after_clear_bit(); \
} while(0) } while(0)
/*
* Task priorities.
* Note: if you change these, you must also change
* the task initialization definitions below.
*/
#define RPC_PRIORITY_LOW 0
#define RPC_PRIORITY_NORMAL 1
#define RPC_PRIORITY_HIGH 2
#define RPC_NR_PRIORITY (RPC_PRIORITY_HIGH+1)
/* /*
* RPC synchronization objects * RPC synchronization objects
*/ */
struct rpc_wait_queue { struct rpc_wait_queue {
struct list_head tasks; struct list_head tasks[RPC_NR_PRIORITY]; /* task queue for each priority level */
unsigned long cookie; /* cookie of last task serviced */
unsigned char maxpriority; /* maximum priority (0 if queue is not a priority queue) */
unsigned char priority; /* current priority */
unsigned char count; /* # task groups remaining serviced so far */
unsigned char nr; /* # tasks remaining for cookie */
#ifdef RPC_DEBUG #ifdef RPC_DEBUG
char * name; const char * name;
#endif #endif
}; };
/*
* This is the # requests to send consecutively
* from a single cookie. The aim is to improve
* performance of NFS operations such as read/write.
*/
#define RPC_BATCH_COUNT 16
#ifndef RPC_DEBUG #ifndef RPC_DEBUG
# define RPC_WAITQ_INIT(var,qname) ((struct rpc_wait_queue) {LIST_HEAD_INIT(var)}) # define RPC_WAITQ_INIT(var,qname) { \
# define RPC_WAITQ(var,qname) struct rpc_wait_queue var = RPC_WAITQ_INIT(var.tasks,qname) .tasks = { \
# define INIT_RPC_WAITQ(ptr,qname) do { \ [0] = LIST_HEAD_INIT(var.tasks[0]), \
INIT_LIST_HEAD(&(ptr)->tasks); \ [1] = LIST_HEAD_INIT(var.tasks[1]), \
} while(0) [2] = LIST_HEAD_INIT(var.tasks[2]), \
}, \
}
#else #else
# define RPC_WAITQ_INIT(var,qname) ((struct rpc_wait_queue) {LIST_HEAD_INIT(var.tasks), qname}) # define RPC_WAITQ_INIT(var,qname) { \
# define RPC_WAITQ(var,qname) struct rpc_wait_queue var = RPC_WAITQ_INIT(var,qname) .tasks = { \
# define INIT_RPC_WAITQ(ptr,qname) do { \ [0] = LIST_HEAD_INIT(var.tasks[0]), \
INIT_LIST_HEAD(&(ptr)->tasks); (ptr)->name = qname; \ [1] = LIST_HEAD_INIT(var.tasks[1]), \
} while(0) [2] = LIST_HEAD_INIT(var.tasks[2]), \
}, \
.name = qname, \
}
#endif #endif
# define RPC_WAITQ(var,qname) struct rpc_wait_queue var = RPC_WAITQ_INIT(var,qname)
#define RPC_IS_PRIORITY(q) ((q)->maxpriority > 0)
/* /*
* Function prototypes * Function prototypes
...@@ -175,6 +209,8 @@ void rpc_run_child(struct rpc_task *parent, struct rpc_task *child, ...@@ -175,6 +209,8 @@ void rpc_run_child(struct rpc_task *parent, struct rpc_task *child,
rpc_action action); rpc_action action);
int rpc_add_wait_queue(struct rpc_wait_queue *, struct rpc_task *); int rpc_add_wait_queue(struct rpc_wait_queue *, struct rpc_task *);
void rpc_remove_wait_queue(struct rpc_task *); void rpc_remove_wait_queue(struct rpc_task *);
void rpc_init_priority_wait_queue(struct rpc_wait_queue *, const char *);
void rpc_init_wait_queue(struct rpc_wait_queue *, const char *);
void rpc_sleep_on(struct rpc_wait_queue *, struct rpc_task *, void rpc_sleep_on(struct rpc_wait_queue *, struct rpc_task *,
rpc_action action, rpc_action timer); rpc_action action, rpc_action timer);
void rpc_add_timer(struct rpc_task *, rpc_action); void rpc_add_timer(struct rpc_task *, rpc_action);
...@@ -194,16 +230,14 @@ void rpc_show_tasks(void); ...@@ -194,16 +230,14 @@ void rpc_show_tasks(void);
int rpc_init_mempool(void); int rpc_init_mempool(void);
void rpc_destroy_mempool(void); void rpc_destroy_mempool(void);
static __inline__ void static inline void rpc_exit(struct rpc_task *task, int status)
rpc_exit(struct rpc_task *task, int status)
{ {
task->tk_status = status; task->tk_status = status;
task->tk_action = NULL; task->tk_action = NULL;
} }
#ifdef RPC_DEBUG #ifdef RPC_DEBUG
static __inline__ char * static inline const char * rpc_qname(struct rpc_wait_queue *q)
rpc_qname(struct rpc_wait_queue *q)
{ {
return ((q && q->name) ? q->name : "unknown"); return ((q && q->name) ? q->name : "unknown");
} }
......
...@@ -365,7 +365,7 @@ gss_upcall(struct rpc_clnt *clnt, struct rpc_task *task, struct rpc_cred *cred) ...@@ -365,7 +365,7 @@ gss_upcall(struct rpc_clnt *clnt, struct rpc_task *task, struct rpc_cred *cred)
gss_msg = gss_new; gss_msg = gss_new;
memset(gss_new, 0, sizeof(*gss_new)); memset(gss_new, 0, sizeof(*gss_new));
INIT_LIST_HEAD(&gss_new->list); INIT_LIST_HEAD(&gss_new->list);
INIT_RPC_WAITQ(&gss_new->waitq, "RPCSEC_GSS upcall waitq"); rpc_init_wait_queue(&gss_new->waitq, "RPCSEC_GSS upcall waitq");
atomic_set(&gss_new->count, 2); atomic_set(&gss_new->count, 2);
msg = &gss_new->msg; msg = &gss_new->msg;
msg->data = &gss_new->uid; msg->data = &gss_new->uid;
......
...@@ -144,7 +144,7 @@ rpc_create_client(struct rpc_xprt *xprt, char *servname, ...@@ -144,7 +144,7 @@ rpc_create_client(struct rpc_xprt *xprt, char *servname,
clnt->cl_vers = version->number; clnt->cl_vers = version->number;
clnt->cl_prot = xprt->prot; clnt->cl_prot = xprt->prot;
clnt->cl_stats = program->stats; clnt->cl_stats = program->stats;
INIT_RPC_WAITQ(&clnt->cl_pmap_default.pm_bindwait, "bindwait"); rpc_init_wait_queue(&clnt->cl_pmap_default.pm_bindwait, "bindwait");
if (!clnt->cl_port) if (!clnt->cl_port)
clnt->cl_autobind = 1; clnt->cl_autobind = 1;
......
...@@ -161,6 +161,26 @@ rpc_delete_timer(struct rpc_task *task) ...@@ -161,6 +161,26 @@ rpc_delete_timer(struct rpc_task *task)
dprintk("RPC: %4d deleting timer\n", task->tk_pid); dprintk("RPC: %4d deleting timer\n", task->tk_pid);
} }
/*
* Add new request to a priority queue.
*/
static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, struct rpc_task *task)
{
struct list_head *q;
struct rpc_task *t;
q = &queue->tasks[task->tk_priority];
if (unlikely(task->tk_priority > queue->maxpriority))
q = &queue->tasks[queue->maxpriority];
list_for_each_entry(t, q, tk_list) {
if (t->tk_cookie == task->tk_cookie) {
list_add_tail(&task->tk_list, &t->tk_links);
return;
}
}
list_add_tail(&task->tk_list, q);
}
/* /*
* Add new request to wait queue. * Add new request to wait queue.
* *
...@@ -169,8 +189,7 @@ rpc_delete_timer(struct rpc_task *task) ...@@ -169,8 +189,7 @@ rpc_delete_timer(struct rpc_task *task)
* improve overall performance. * improve overall performance.
* Everyone else gets appended to the queue to ensure proper FIFO behavior. * Everyone else gets appended to the queue to ensure proper FIFO behavior.
*/ */
static inline int static int __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
__rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
{ {
if (task->tk_rpcwait == queue) if (task->tk_rpcwait == queue)
return 0; return 0;
...@@ -179,10 +198,12 @@ __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task) ...@@ -179,10 +198,12 @@ __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
printk(KERN_WARNING "RPC: doubly enqueued task!\n"); printk(KERN_WARNING "RPC: doubly enqueued task!\n");
return -EWOULDBLOCK; return -EWOULDBLOCK;
} }
if (RPC_IS_SWAPPER(task)) if (RPC_IS_PRIORITY(queue))
list_add(&task->tk_list, &queue->tasks); __rpc_add_wait_queue_priority(queue, task);
else if (RPC_IS_SWAPPER(task))
list_add(&task->tk_list, &queue->tasks[0]);
else else
list_add_tail(&task->tk_list, &queue->tasks); list_add_tail(&task->tk_list, &queue->tasks[0]);
task->tk_rpcwait = queue; task->tk_rpcwait = queue;
dprintk("RPC: %4d added to queue %p \"%s\"\n", dprintk("RPC: %4d added to queue %p \"%s\"\n",
...@@ -191,8 +212,7 @@ __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task) ...@@ -191,8 +212,7 @@ __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
return 0; return 0;
} }
int int rpc_add_wait_queue(struct rpc_wait_queue *q, struct rpc_task *task)
rpc_add_wait_queue(struct rpc_wait_queue *q, struct rpc_task *task)
{ {
int result; int result;
...@@ -202,19 +222,36 @@ rpc_add_wait_queue(struct rpc_wait_queue *q, struct rpc_task *task) ...@@ -202,19 +222,36 @@ rpc_add_wait_queue(struct rpc_wait_queue *q, struct rpc_task *task)
return result; return result;
} }
/*
* Remove request from a priority queue.
*/
static void __rpc_remove_wait_queue_priority(struct rpc_task *task)
{
struct rpc_task *t;
if (!list_empty(&task->tk_links)) {
t = list_entry(task->tk_links.next, struct rpc_task, tk_list);
list_move(&t->tk_list, &task->tk_list);
list_splice_init(&task->tk_links, &t->tk_links);
}
list_del(&task->tk_list);
}
/* /*
* Remove request from queue. * Remove request from queue.
* Note: must be called with spin lock held. * Note: must be called with spin lock held.
*/ */
static inline void static void __rpc_remove_wait_queue(struct rpc_task *task)
__rpc_remove_wait_queue(struct rpc_task *task)
{ {
struct rpc_wait_queue *queue = task->tk_rpcwait; struct rpc_wait_queue *queue = task->tk_rpcwait;
if (!queue) if (!queue)
return; return;
list_del(&task->tk_list); if (RPC_IS_PRIORITY(queue))
__rpc_remove_wait_queue_priority(task);
else
list_del(&task->tk_list);
task->tk_rpcwait = NULL; task->tk_rpcwait = NULL;
dprintk("RPC: %4d removed from queue %p \"%s\"\n", dprintk("RPC: %4d removed from queue %p \"%s\"\n",
...@@ -231,6 +268,48 @@ rpc_remove_wait_queue(struct rpc_task *task) ...@@ -231,6 +268,48 @@ rpc_remove_wait_queue(struct rpc_task *task)
spin_unlock_bh(&rpc_queue_lock); spin_unlock_bh(&rpc_queue_lock);
} }
static inline void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority)
{
queue->priority = priority;
queue->count = 1 << (priority * 2);
}
static inline void rpc_set_waitqueue_cookie(struct rpc_wait_queue *queue, unsigned long cookie)
{
queue->cookie = cookie;
queue->nr = RPC_BATCH_COUNT;
}
static inline void rpc_reset_waitqueue_priority(struct rpc_wait_queue *queue)
{
rpc_set_waitqueue_priority(queue, queue->maxpriority);
rpc_set_waitqueue_cookie(queue, 0);
}
static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, int maxprio)
{
int i;
for (i = 0; i < ARRAY_SIZE(queue->tasks); i++)
INIT_LIST_HEAD(&queue->tasks[i]);
queue->maxpriority = maxprio;
rpc_reset_waitqueue_priority(queue);
#ifdef RPC_DEBUG
queue->name = qname;
#endif
}
void rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname)
{
__rpc_init_priority_wait_queue(queue, qname, RPC_PRIORITY_HIGH);
}
void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
{
__rpc_init_priority_wait_queue(queue, qname, 0);
}
EXPORT_SYMBOL(rpc_init_wait_queue);
/* /*
* Make an RPC task runnable. * Make an RPC task runnable.
* *
...@@ -402,18 +481,73 @@ rpc_wake_up_task(struct rpc_task *task) ...@@ -402,18 +481,73 @@ rpc_wake_up_task(struct rpc_task *task)
spin_unlock_bh(&rpc_queue_lock); spin_unlock_bh(&rpc_queue_lock);
} }
/*
* Wake up the next task on a priority queue.
*/
static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queue)
{
struct list_head *q;
struct rpc_task *task;
/*
* Service a batch of tasks from a single cookie.
*/
q = &queue->tasks[queue->priority];
if (!list_empty(q)) {
task = list_entry(q->next, struct rpc_task, tk_list);
if (queue->cookie == task->tk_cookie) {
if (--queue->nr)
goto out;
list_move_tail(&task->tk_list, q);
}
/*
* Check if we need to switch queues.
*/
if (--queue->count)
goto new_cookie;
}
/*
* Service the next queue.
*/
do {
if (q == &queue->tasks[0])
q = &queue->tasks[queue->maxpriority];
else
q = q - 1;
if (!list_empty(q)) {
task = list_entry(q->next, struct rpc_task, tk_list);
goto new_queue;
}
} while (q != &queue->tasks[queue->priority]);
rpc_reset_waitqueue_priority(queue);
return NULL;
new_queue:
rpc_set_waitqueue_priority(queue, (unsigned int)(q - &queue->tasks[0]));
new_cookie:
rpc_set_waitqueue_cookie(queue, task->tk_cookie);
out:
__rpc_wake_up_task(task);
return task;
}
/* /*
* Wake up the next task on the wait queue. * Wake up the next task on the wait queue.
*/ */
struct rpc_task * struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue)
rpc_wake_up_next(struct rpc_wait_queue *queue)
{ {
struct rpc_task *task = NULL; struct rpc_task *task = NULL;
dprintk("RPC: wake_up_next(%p \"%s\")\n", queue, rpc_qname(queue)); dprintk("RPC: wake_up_next(%p \"%s\")\n", queue, rpc_qname(queue));
spin_lock_bh(&rpc_queue_lock); spin_lock_bh(&rpc_queue_lock);
task_for_first(task, &queue->tasks) if (RPC_IS_PRIORITY(queue))
__rpc_wake_up_task(task); task = __rpc_wake_up_next_priority(queue);
else {
task_for_first(task, &queue->tasks[0])
__rpc_wake_up_task(task);
}
spin_unlock_bh(&rpc_queue_lock); spin_unlock_bh(&rpc_queue_lock);
return task; return task;
...@@ -425,15 +559,22 @@ rpc_wake_up_next(struct rpc_wait_queue *queue) ...@@ -425,15 +559,22 @@ rpc_wake_up_next(struct rpc_wait_queue *queue)
* *
* Grabs rpc_queue_lock * Grabs rpc_queue_lock
*/ */
void void rpc_wake_up(struct rpc_wait_queue *queue)
rpc_wake_up(struct rpc_wait_queue *queue)
{ {
struct rpc_task *task; struct rpc_task *task;
struct list_head *head;
spin_lock_bh(&rpc_queue_lock); spin_lock_bh(&rpc_queue_lock);
while (!list_empty(&queue->tasks)) head = &queue->tasks[queue->maxpriority];
task_for_first(task, &queue->tasks) for (;;) {
while (!list_empty(head)) {
task = list_entry(head->next, struct rpc_task, tk_list);
__rpc_wake_up_task(task); __rpc_wake_up_task(task);
}
if (head == &queue->tasks[0])
break;
head--;
}
spin_unlock_bh(&rpc_queue_lock); spin_unlock_bh(&rpc_queue_lock);
} }
...@@ -444,17 +585,22 @@ rpc_wake_up(struct rpc_wait_queue *queue) ...@@ -444,17 +585,22 @@ rpc_wake_up(struct rpc_wait_queue *queue)
* *
* Grabs rpc_queue_lock * Grabs rpc_queue_lock
*/ */
void void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
{ {
struct list_head *head;
struct rpc_task *task; struct rpc_task *task;
spin_lock_bh(&rpc_queue_lock); spin_lock_bh(&rpc_queue_lock);
while (!list_empty(&queue->tasks)) { head = &queue->tasks[queue->maxpriority];
task_for_first(task, &queue->tasks) { for (;;) {
while (!list_empty(head)) {
task = list_entry(head->next, struct rpc_task, tk_list);
task->tk_status = status; task->tk_status = status;
__rpc_wake_up_task(task); __rpc_wake_up_task(task);
} }
if (head == &queue->tasks[0])
break;
head--;
} }
spin_unlock_bh(&rpc_queue_lock); spin_unlock_bh(&rpc_queue_lock);
} }
...@@ -642,7 +788,7 @@ __rpc_schedule(void) ...@@ -642,7 +788,7 @@ __rpc_schedule(void)
while (1) { while (1) {
spin_lock_bh(&rpc_queue_lock); spin_lock_bh(&rpc_queue_lock);
task_for_first(task, &schedq.tasks) { task_for_first(task, &schedq.tasks[0]) {
__rpc_remove_wait_queue(task); __rpc_remove_wait_queue(task);
spin_unlock_bh(&rpc_queue_lock); spin_unlock_bh(&rpc_queue_lock);
...@@ -706,9 +852,7 @@ rpc_free(struct rpc_task *task) ...@@ -706,9 +852,7 @@ rpc_free(struct rpc_task *task)
/* /*
* Creation and deletion of RPC task structures * Creation and deletion of RPC task structures
*/ */
inline void void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, rpc_action callback, int flags)
rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt,
rpc_action callback, int flags)
{ {
memset(task, 0, sizeof(*task)); memset(task, 0, sizeof(*task));
init_timer(&task->tk_timer); init_timer(&task->tk_timer);
...@@ -726,6 +870,10 @@ rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, ...@@ -726,6 +870,10 @@ rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt,
task->tk_cred_retry = 2; task->tk_cred_retry = 2;
task->tk_suid_retry = 1; task->tk_suid_retry = 1;
task->tk_priority = RPC_PRIORITY_NORMAL;
task->tk_cookie = (unsigned long)current;
INIT_LIST_HEAD(&task->tk_links);
/* Add to global list of all tasks */ /* Add to global list of all tasks */
spin_lock(&rpc_sched_lock); spin_lock(&rpc_sched_lock);
list_add(&task->tk_task, &all_tasks); list_add(&task->tk_task, &all_tasks);
...@@ -863,7 +1011,7 @@ rpc_find_parent(struct rpc_task *child) ...@@ -863,7 +1011,7 @@ rpc_find_parent(struct rpc_task *child)
struct list_head *le; struct list_head *le;
parent = (struct rpc_task *) child->tk_calldata; parent = (struct rpc_task *) child->tk_calldata;
task_for_each(task, le, &childq.tasks) task_for_each(task, le, &childq.tasks[0])
if (task == parent) if (task == parent)
return parent; return parent;
...@@ -943,7 +1091,7 @@ static DECLARE_MUTEX_LOCKED(rpciod_running); ...@@ -943,7 +1091,7 @@ static DECLARE_MUTEX_LOCKED(rpciod_running);
static inline int static inline int
rpciod_task_pending(void) rpciod_task_pending(void)
{ {
return !list_empty(&schedq.tasks); return !list_empty(&schedq.tasks[0]);
} }
......
...@@ -1458,10 +1458,10 @@ xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to) ...@@ -1458,10 +1458,10 @@ xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to)
} else } else
xprt_default_timeout(&xprt->timeout, xprt->prot); xprt_default_timeout(&xprt->timeout, xprt->prot);
INIT_RPC_WAITQ(&xprt->pending, "xprt_pending"); rpc_init_wait_queue(&xprt->pending, "xprt_pending");
INIT_RPC_WAITQ(&xprt->sending, "xprt_sending"); rpc_init_wait_queue(&xprt->sending, "xprt_sending");
INIT_RPC_WAITQ(&xprt->resend, "xprt_resend"); rpc_init_wait_queue(&xprt->resend, "xprt_resend");
INIT_RPC_WAITQ(&xprt->backlog, "xprt_backlog"); rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
/* initialize free list */ /* initialize free list */
for (req = &xprt->slot[entries-1]; req >= &xprt->slot[0]; req--) for (req = &xprt->slot[entries-1]; req >= &xprt->slot[0]; req--)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment