Commit 8d51075e authored by Trond Myklebust's avatar Trond Myklebust

RPC: Add support for sharing the same RPC transport and

credential caches between different mountpoints by allowing
cloning of the rpc_client struct.
parent 18e4528e
......@@ -73,6 +73,7 @@ struct rpc_auth {
* differ from the flavor in
* au_ops->au_flavor in gss
* case) */
atomic_t au_count; /* Reference counter */
/* per-flavor data */
};
......
......@@ -26,6 +26,8 @@ struct rpc_portmap {
__u32 pm_vers;
__u32 pm_prot;
__u16 pm_port;
unsigned char pm_binding : 1; /* doing a getport() */
struct rpc_wait_queue pm_bindwait; /* waiting on getport() */
};
struct rpc_inode;
......@@ -34,6 +36,7 @@ struct rpc_inode;
* The high-level client handle
*/
struct rpc_clnt {
atomic_t cl_count; /* Number of clones */
atomic_t cl_users; /* number of references */
struct rpc_xprt * cl_xprt; /* transport */
struct rpc_procinfo * cl_procinfo; /* procedure info */
......@@ -48,26 +51,27 @@ struct rpc_clnt {
cl_intr : 1,/* interruptible */
cl_chatty : 1,/* be verbose */
cl_autobind : 1,/* use getport() */
cl_binding : 1,/* doing a getport() */
cl_droppriv : 1,/* enable NFS suid hack */
cl_oneshot : 1,/* dispose after use */
cl_dead : 1;/* abandoned */
struct rpc_rtt cl_rtt; /* RTO estimator data */
struct rpc_portmap cl_pmap; /* port mapping */
struct rpc_wait_queue cl_bindwait; /* waiting on getport() */
struct rpc_rtt * cl_rtt; /* RTO estimator data */
struct rpc_portmap * cl_pmap; /* port mapping */
int cl_nodelen; /* nodename length */
char cl_nodename[UNX_MAXNODENAME];
char cl_pathname[30];/* Path in rpc_pipe_fs */
struct dentry * cl_dentry; /* inode */
struct rpc_clnt * cl_parent; /* Points to parent of clones */
struct rpc_rtt cl_rtt_default;
struct rpc_portmap cl_pmap_default;
char cl_inline_name[32];
};
#define cl_timeout cl_xprt->timeout
#define cl_prog cl_pmap.pm_prog
#define cl_vers cl_pmap.pm_vers
#define cl_port cl_pmap.pm_port
#define cl_prot cl_pmap.pm_prot
#define cl_prog cl_pmap->pm_prog
#define cl_vers cl_pmap->pm_vers
#define cl_port cl_pmap->pm_port
#define cl_prot cl_pmap->pm_prot
/*
* General RPC program info
......@@ -108,6 +112,7 @@ struct rpc_procinfo {
struct rpc_clnt *rpc_create_client(struct rpc_xprt *xprt, char *servname,
struct rpc_program *info,
u32 version, rpc_authflavor_t authflavor);
struct rpc_clnt *rpc_clone_client(struct rpc_clnt *);
int rpc_shutdown_client(struct rpc_clnt *);
int rpc_destroy_client(struct rpc_clnt *);
void rpc_release_client(struct rpc_clnt *);
......
......@@ -61,6 +61,7 @@ rpcauth_unregister(struct rpc_authops *ops)
struct rpc_auth *
rpcauth_create(rpc_authflavor_t pseudoflavor, struct rpc_clnt *clnt)
{
struct rpc_auth *auth;
struct rpc_authops *ops;
u32 flavor = pseudoflavor_to_flavor(pseudoflavor);
......@@ -68,13 +69,21 @@ rpcauth_create(rpc_authflavor_t pseudoflavor, struct rpc_clnt *clnt)
return NULL;
if (!try_module_get(ops->owner))
return NULL;
clnt->cl_auth = ops->create(clnt, pseudoflavor);
return clnt->cl_auth;
auth = ops->create(clnt, pseudoflavor);
if (!auth)
return NULL;
atomic_set(&auth->au_count, 1);
if (clnt->cl_auth)
rpcauth_destroy(clnt->cl_auth);
clnt->cl_auth = auth;
return auth;
}
void
rpcauth_destroy(struct rpc_auth *auth)
{
if (!atomic_dec_and_test(&auth->au_count))
return;
auth->au_ops->destroy(auth);
module_put(auth->au_ops->owner);
kfree(auth);
......
......@@ -102,6 +102,7 @@ rpc_create_client(struct rpc_xprt *xprt, char *servname,
{
struct rpc_version *version;
struct rpc_clnt *clnt = NULL;
int len;
dprintk("RPC: creating %s client for %s (xprt %p)\n",
program->name, servname, xprt);
......@@ -116,23 +117,37 @@ rpc_create_client(struct rpc_xprt *xprt, char *servname,
goto out_no_clnt;
memset(clnt, 0, sizeof(*clnt));
atomic_set(&clnt->cl_users, 0);
atomic_set(&clnt->cl_count, 1);
clnt->cl_parent = clnt;
clnt->cl_server = clnt->cl_inline_name;
len = strlen(servname) + 1;
if (len > sizeof(clnt->cl_inline_name)) {
char *buf = kmalloc(len, GFP_KERNEL);
if (buf != 0)
clnt->cl_server = buf;
else
len = sizeof(clnt->cl_inline_name);
}
strlcpy(clnt->cl_server, servname, len);
clnt->cl_xprt = xprt;
clnt->cl_procinfo = version->procs;
clnt->cl_maxproc = version->nrprocs;
clnt->cl_server = servname;
clnt->cl_protname = program->name;
clnt->cl_pmap = &clnt->cl_pmap_default;
clnt->cl_port = xprt->addr.sin_port;
clnt->cl_prog = program->number;
clnt->cl_vers = version->number;
clnt->cl_prot = xprt->prot;
clnt->cl_stats = program->stats;
INIT_RPC_WAITQ(&clnt->cl_bindwait, "bindwait");
INIT_RPC_WAITQ(&clnt->cl_pmap_default.pm_bindwait, "bindwait");
if (!clnt->cl_port)
clnt->cl_autobind = 1;
rpc_init_rtt(&clnt->cl_rtt, xprt->timeout.to_initval);
clnt->cl_rtt = &clnt->cl_rtt_default;
rpc_init_rtt(&clnt->cl_rtt_default, xprt->timeout.to_initval);
if (rpc_setup_pipedir(clnt, program->pipe_dir_name) < 0)
goto out_no_path;
......@@ -157,11 +172,39 @@ rpc_create_client(struct rpc_xprt *xprt, char *servname,
out_no_auth:
rpc_rmdir(clnt->cl_pathname);
out_no_path:
if (clnt->cl_server != clnt->cl_inline_name)
kfree(clnt->cl_server);
kfree(clnt);
clnt = NULL;
goto out;
}
/*
* This function clones the RPC client structure. It allows us to share the
* same transport while varying parameters such as the authentication
* flavour.
*/
struct rpc_clnt *
rpc_clone_client(struct rpc_clnt *clnt)
{
struct rpc_clnt *new;
new = (struct rpc_clnt *)kmalloc(sizeof(*new), GFP_KERNEL);
if (!new)
goto out_no_clnt;
memcpy(new, clnt, sizeof(*new));
atomic_set(&new->cl_count, 1);
atomic_set(&new->cl_users, 0);
atomic_inc(&new->cl_parent->cl_count);
if (new->cl_auth)
atomic_inc(&new->cl_auth->au_count);
out:
return new;
out_no_clnt:
printk(KERN_INFO "RPC: out of memory in %s\n", __FUNCTION__);
goto out;
}
/*
* Properly shut down an RPC client, terminating all outstanding
* requests. Note that we must be certain that cl_oneshot and
......@@ -201,19 +244,29 @@ rpc_shutdown_client(struct rpc_clnt *clnt)
int
rpc_destroy_client(struct rpc_clnt *clnt)
{
if (!atomic_dec_and_test(&clnt->cl_count))
return 1;
BUG_ON(atomic_read(&clnt->cl_users) != 0);
dprintk("RPC: destroying %s client for %s\n",
clnt->cl_protname, clnt->cl_server);
if (clnt->cl_auth) {
rpcauth_destroy(clnt->cl_auth);
clnt->cl_auth = NULL;
}
if (clnt->cl_parent != clnt) {
rpc_destroy_client(clnt->cl_parent);
goto out_free;
}
if (clnt->cl_pathname[0])
rpc_rmdir(clnt->cl_pathname);
if (clnt->cl_xprt) {
xprt_destroy(clnt->cl_xprt);
clnt->cl_xprt = NULL;
}
if (clnt->cl_server != clnt->cl_inline_name)
kfree(clnt->cl_server);
out_free:
kfree(clnt);
return 0;
}
......
......@@ -41,7 +41,7 @@ static spinlock_t pmap_lock = SPIN_LOCK_UNLOCKED;
void
rpc_getport(struct rpc_task *task, struct rpc_clnt *clnt)
{
struct rpc_portmap *map = &clnt->cl_pmap;
struct rpc_portmap *map = clnt->cl_pmap;
struct sockaddr_in *sap = &clnt->cl_xprt->addr;
struct rpc_message msg = {
.rpc_proc = &pmap_procedures[PMAP_GETPORT],
......@@ -57,12 +57,12 @@ rpc_getport(struct rpc_task *task, struct rpc_clnt *clnt)
map->pm_prog, map->pm_vers, map->pm_prot);
spin_lock(&pmap_lock);
if (clnt->cl_binding) {
rpc_sleep_on(&clnt->cl_bindwait, task, NULL, 0);
if (map->pm_binding) {
rpc_sleep_on(&map->pm_bindwait, task, NULL, 0);
spin_unlock(&pmap_lock);
return;
}
clnt->cl_binding = 1;
map->pm_binding = 1;
spin_unlock(&pmap_lock);
task->tk_status = -EACCES; /* why set this? returns -EIO below */
......@@ -85,8 +85,8 @@ rpc_getport(struct rpc_task *task, struct rpc_clnt *clnt)
bailout:
spin_lock(&pmap_lock);
clnt->cl_binding = 0;
rpc_wake_up(&clnt->cl_bindwait);
map->pm_binding = 0;
rpc_wake_up(&map->pm_bindwait);
spin_unlock(&pmap_lock);
task->tk_status = -EIO;
task->tk_action = NULL;
......@@ -129,6 +129,7 @@ static void
pmap_getport_done(struct rpc_task *task)
{
struct rpc_clnt *clnt = task->tk_client;
struct rpc_portmap *map = clnt->cl_pmap;
dprintk("RPC: %4d pmap_getport_done(status %d, port %d)\n",
task->tk_pid, task->tk_status, clnt->cl_port);
......@@ -145,8 +146,8 @@ pmap_getport_done(struct rpc_task *task)
clnt->cl_xprt->addr.sin_port = clnt->cl_port;
}
spin_lock(&pmap_lock);
clnt->cl_binding = 0;
rpc_wake_up(&clnt->cl_bindwait);
map->pm_binding = 0;
rpc_wake_up(&map->pm_bindwait);
spin_unlock(&pmap_lock);
}
......
......@@ -41,6 +41,7 @@ EXPORT_SYMBOL(rpc_release_task);
/* RPC client functions */
EXPORT_SYMBOL(rpc_create_client);
EXPORT_SYMBOL(rpc_clone_client);
EXPORT_SYMBOL(rpc_destroy_client);
EXPORT_SYMBOL(rpc_shutdown_client);
EXPORT_SYMBOL(rpc_release_client);
......@@ -66,6 +67,7 @@ EXPORT_SYMBOL(xprt_set_timeout);
/* Client credential cache */
EXPORT_SYMBOL(rpcauth_register);
EXPORT_SYMBOL(rpcauth_unregister);
EXPORT_SYMBOL(rpcauth_create);
EXPORT_SYMBOL(rpcauth_lookupcred);
EXPORT_SYMBOL(rpcauth_lookup_credcache);
EXPORT_SYMBOL(rpcauth_free_credcache);
......
......@@ -584,9 +584,9 @@ xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied)
__xprt_put_cong(xprt, req);
if (timer) {
if (req->rq_ntrans == 1)
rpc_update_rtt(&clnt->cl_rtt, timer,
rpc_update_rtt(clnt->cl_rtt, timer,
(long)jiffies - req->rq_xtime);
rpc_set_timeo(&clnt->cl_rtt, timer, req->rq_ntrans - 1);
rpc_set_timeo(clnt->cl_rtt, timer, req->rq_ntrans - 1);
}
}
......@@ -1224,8 +1224,8 @@ xprt_transmit(struct rpc_task *task)
spin_lock_bh(&xprt->sock_lock);
if (!xprt->nocong) {
int timer = task->tk_msg.rpc_proc->p_timer;
task->tk_timeout = rpc_calc_rto(&clnt->cl_rtt, timer);
task->tk_timeout <<= rpc_ntimeo(&clnt->cl_rtt, timer);
task->tk_timeout = rpc_calc_rto(clnt->cl_rtt, timer);
task->tk_timeout <<= rpc_ntimeo(clnt->cl_rtt, timer);
task->tk_timeout <<= clnt->cl_timeout.to_retries
- req->rq_timeout.to_retries;
if (task->tk_timeout > req->rq_timeout.to_maxval)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment