Commit 0adfc56c authored by Linus Torvalds's avatar Linus Torvalds

Merge git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

* git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
  rbd: use watch/notify for changes in rbd header
  libceph: add lingering request and watch/notify event framework
  rbd: update email address in Documentation
  ceph: rename dentry_release -> d_release, fix comment
  ceph: add request to the tail of unsafe write list
  ceph: remove request from unsafe list if it is canceled/timed out
  ceph: move readahead default to fs/ceph from libceph
  ceph: add ino32 mount option
  ceph: update common header files
  ceph: remove debugfs debug cruft
  libceph: fix osd request queuing on osdmap updates
  ceph: preserve I_COMPLETE across rename
  libceph: Fix base64-decoding when input ends in newline.
parents f23eb2b2 59c2be1e
What: /sys/bus/rbd/ What: /sys/bus/rbd/
Date: November 2010 Date: November 2010
Contact: Yehuda Sadeh <yehuda@hq.newdream.net>, Contact: Yehuda Sadeh <yehuda@newdream.net>,
Sage Weil <sage@newdream.net> Sage Weil <sage@newdream.net>
Description: Description:
......
...@@ -31,6 +31,7 @@ ...@@ -31,6 +31,7 @@
#include <linux/ceph/osd_client.h> #include <linux/ceph/osd_client.h>
#include <linux/ceph/mon_client.h> #include <linux/ceph/mon_client.h>
#include <linux/ceph/decode.h> #include <linux/ceph/decode.h>
#include <linux/parser.h>
#include <linux/kernel.h> #include <linux/kernel.h>
#include <linux/device.h> #include <linux/device.h>
...@@ -54,6 +55,8 @@ ...@@ -54,6 +55,8 @@
#define DEV_NAME_LEN 32 #define DEV_NAME_LEN 32
#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
/* /*
* block device image metadata (in-memory version) * block device image metadata (in-memory version)
*/ */
...@@ -71,6 +74,12 @@ struct rbd_image_header { ...@@ -71,6 +74,12 @@ struct rbd_image_header {
char *snap_names; char *snap_names;
u64 *snap_sizes; u64 *snap_sizes;
u64 obj_version;
};
struct rbd_options {
int notify_timeout;
}; };
/* /*
...@@ -78,6 +87,7 @@ struct rbd_image_header { ...@@ -78,6 +87,7 @@ struct rbd_image_header {
*/ */
struct rbd_client { struct rbd_client {
struct ceph_client *client; struct ceph_client *client;
struct rbd_options *rbd_opts;
struct kref kref; struct kref kref;
struct list_head node; struct list_head node;
}; };
...@@ -124,6 +134,9 @@ struct rbd_device { ...@@ -124,6 +134,9 @@ struct rbd_device {
char pool_name[RBD_MAX_POOL_NAME_LEN]; char pool_name[RBD_MAX_POOL_NAME_LEN];
int poolid; int poolid;
struct ceph_osd_event *watch_event;
struct ceph_osd_request *watch_request;
char snap_name[RBD_MAX_SNAP_NAME_LEN]; char snap_name[RBD_MAX_SNAP_NAME_LEN];
u32 cur_snap; /* index+1 of current snapshot within snap context u32 cur_snap; /* index+1 of current snapshot within snap context
0 - for the head */ 0 - for the head */
...@@ -177,6 +190,8 @@ static void rbd_put_dev(struct rbd_device *rbd_dev) ...@@ -177,6 +190,8 @@ static void rbd_put_dev(struct rbd_device *rbd_dev)
put_device(&rbd_dev->dev); put_device(&rbd_dev->dev);
} }
static int __rbd_update_snaps(struct rbd_device *rbd_dev);
static int rbd_open(struct block_device *bdev, fmode_t mode) static int rbd_open(struct block_device *bdev, fmode_t mode)
{ {
struct gendisk *disk = bdev->bd_disk; struct gendisk *disk = bdev->bd_disk;
...@@ -211,7 +226,8 @@ static const struct block_device_operations rbd_bd_ops = { ...@@ -211,7 +226,8 @@ static const struct block_device_operations rbd_bd_ops = {
* Initialize an rbd client instance. * Initialize an rbd client instance.
* We own *opt. * We own *opt.
*/ */
static struct rbd_client *rbd_client_create(struct ceph_options *opt) static struct rbd_client *rbd_client_create(struct ceph_options *opt,
struct rbd_options *rbd_opts)
{ {
struct rbd_client *rbdc; struct rbd_client *rbdc;
int ret = -ENOMEM; int ret = -ENOMEM;
...@@ -233,6 +249,8 @@ static struct rbd_client *rbd_client_create(struct ceph_options *opt) ...@@ -233,6 +249,8 @@ static struct rbd_client *rbd_client_create(struct ceph_options *opt)
if (ret < 0) if (ret < 0)
goto out_err; goto out_err;
rbdc->rbd_opts = rbd_opts;
spin_lock(&node_lock); spin_lock(&node_lock);
list_add_tail(&rbdc->node, &rbd_client_list); list_add_tail(&rbdc->node, &rbd_client_list);
spin_unlock(&node_lock); spin_unlock(&node_lock);
...@@ -266,6 +284,59 @@ static struct rbd_client *__rbd_client_find(struct ceph_options *opt) ...@@ -266,6 +284,59 @@ static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
return NULL; return NULL;
} }
/*
* mount options
*/
enum {
Opt_notify_timeout,
Opt_last_int,
/* int args above */
Opt_last_string,
/* string args above */
};
static match_table_t rbdopt_tokens = {
{Opt_notify_timeout, "notify_timeout=%d"},
/* int args above */
/* string args above */
{-1, NULL}
};
static int parse_rbd_opts_token(char *c, void *private)
{
struct rbd_options *rbdopt = private;
substring_t argstr[MAX_OPT_ARGS];
int token, intval, ret;
token = match_token((char *)c, rbdopt_tokens, argstr);
if (token < 0)
return -EINVAL;
if (token < Opt_last_int) {
ret = match_int(&argstr[0], &intval);
if (ret < 0) {
pr_err("bad mount option arg (not int) "
"at '%s'\n", c);
return ret;
}
dout("got int token %d val %d\n", token, intval);
} else if (token > Opt_last_int && token < Opt_last_string) {
dout("got string token %d val %s\n", token,
argstr[0].from);
} else {
dout("got token %d\n", token);
}
switch (token) {
case Opt_notify_timeout:
rbdopt->notify_timeout = intval;
break;
default:
BUG_ON(token);
}
return 0;
}
/* /*
* Get a ceph client with specific addr and configuration, if one does * Get a ceph client with specific addr and configuration, if one does
* not exist create it. * not exist create it.
...@@ -276,11 +347,18 @@ static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr, ...@@ -276,11 +347,18 @@ static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
struct rbd_client *rbdc; struct rbd_client *rbdc;
struct ceph_options *opt; struct ceph_options *opt;
int ret; int ret;
struct rbd_options *rbd_opts;
rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
if (!rbd_opts)
return -ENOMEM;
rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
ret = ceph_parse_options(&opt, options, mon_addr, ret = ceph_parse_options(&opt, options, mon_addr,
mon_addr + strlen(mon_addr), NULL, NULL); mon_addr + strlen(mon_addr), parse_rbd_opts_token, rbd_opts);
if (ret < 0) if (ret < 0)
return ret; goto done_err;
spin_lock(&node_lock); spin_lock(&node_lock);
rbdc = __rbd_client_find(opt); rbdc = __rbd_client_find(opt);
...@@ -296,13 +374,18 @@ static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr, ...@@ -296,13 +374,18 @@ static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
} }
spin_unlock(&node_lock); spin_unlock(&node_lock);
rbdc = rbd_client_create(opt); rbdc = rbd_client_create(opt, rbd_opts);
if (IS_ERR(rbdc)) if (IS_ERR(rbdc)) {
return PTR_ERR(rbdc); ret = PTR_ERR(rbdc);
goto done_err;
}
rbd_dev->rbd_client = rbdc; rbd_dev->rbd_client = rbdc;
rbd_dev->client = rbdc->client; rbd_dev->client = rbdc->client;
return 0; return 0;
done_err:
kfree(rbd_opts);
return ret;
} }
/* /*
...@@ -318,6 +401,7 @@ static void rbd_client_release(struct kref *kref) ...@@ -318,6 +401,7 @@ static void rbd_client_release(struct kref *kref)
spin_unlock(&node_lock); spin_unlock(&node_lock);
ceph_destroy_client(rbdc->client); ceph_destroy_client(rbdc->client);
kfree(rbdc->rbd_opts);
kfree(rbdc); kfree(rbdc);
} }
...@@ -666,7 +750,9 @@ static int rbd_do_request(struct request *rq, ...@@ -666,7 +750,9 @@ static int rbd_do_request(struct request *rq,
struct ceph_osd_req_op *ops, struct ceph_osd_req_op *ops,
int num_reply, int num_reply,
void (*rbd_cb)(struct ceph_osd_request *req, void (*rbd_cb)(struct ceph_osd_request *req,
struct ceph_msg *msg)) struct ceph_msg *msg),
struct ceph_osd_request **linger_req,
u64 *ver)
{ {
struct ceph_osd_request *req; struct ceph_osd_request *req;
struct ceph_file_layout *layout; struct ceph_file_layout *layout;
...@@ -729,12 +815,20 @@ static int rbd_do_request(struct request *rq, ...@@ -729,12 +815,20 @@ static int rbd_do_request(struct request *rq,
req->r_oid, req->r_oid_len); req->r_oid, req->r_oid_len);
up_read(&header->snap_rwsem); up_read(&header->snap_rwsem);
if (linger_req) {
ceph_osdc_set_request_linger(&dev->client->osdc, req);
*linger_req = req;
}
ret = ceph_osdc_start_request(&dev->client->osdc, req, false); ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
if (ret < 0) if (ret < 0)
goto done_err; goto done_err;
if (!rbd_cb) { if (!rbd_cb) {
ret = ceph_osdc_wait_request(&dev->client->osdc, req); ret = ceph_osdc_wait_request(&dev->client->osdc, req);
if (ver)
*ver = le64_to_cpu(req->r_reassert_version.version);
dout("reassert_ver=%lld\n", le64_to_cpu(req->r_reassert_version.version));
ceph_osdc_put_request(req); ceph_osdc_put_request(req);
} }
return ret; return ret;
...@@ -789,6 +883,11 @@ static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg) ...@@ -789,6 +883,11 @@ static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
kfree(req_data); kfree(req_data);
} }
static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
{
ceph_osdc_put_request(req);
}
/* /*
* Do a synchronous ceph osd operation * Do a synchronous ceph osd operation
*/ */
...@@ -801,7 +900,9 @@ static int rbd_req_sync_op(struct rbd_device *dev, ...@@ -801,7 +900,9 @@ static int rbd_req_sync_op(struct rbd_device *dev,
int num_reply, int num_reply,
const char *obj, const char *obj,
u64 ofs, u64 len, u64 ofs, u64 len,
char *buf) char *buf,
struct ceph_osd_request **linger_req,
u64 *ver)
{ {
int ret; int ret;
struct page **pages; struct page **pages;
...@@ -833,7 +934,8 @@ static int rbd_req_sync_op(struct rbd_device *dev, ...@@ -833,7 +934,8 @@ static int rbd_req_sync_op(struct rbd_device *dev,
flags, flags,
ops, ops,
2, 2,
NULL); NULL,
linger_req, ver);
if (ret < 0) if (ret < 0)
goto done_ops; goto done_ops;
...@@ -893,7 +995,7 @@ static int rbd_do_op(struct request *rq, ...@@ -893,7 +995,7 @@ static int rbd_do_op(struct request *rq,
flags, flags,
ops, ops,
num_reply, num_reply,
rbd_req_cb); rbd_req_cb, 0, NULL);
done: done:
kfree(seg_name); kfree(seg_name);
return ret; return ret;
...@@ -940,18 +1042,174 @@ static int rbd_req_sync_read(struct rbd_device *dev, ...@@ -940,18 +1042,174 @@ static int rbd_req_sync_read(struct rbd_device *dev,
u64 snapid, u64 snapid,
const char *obj, const char *obj,
u64 ofs, u64 len, u64 ofs, u64 len,
char *buf) char *buf,
u64 *ver)
{ {
return rbd_req_sync_op(dev, NULL, return rbd_req_sync_op(dev, NULL,
(snapid ? snapid : CEPH_NOSNAP), (snapid ? snapid : CEPH_NOSNAP),
CEPH_OSD_OP_READ, CEPH_OSD_OP_READ,
CEPH_OSD_FLAG_READ, CEPH_OSD_FLAG_READ,
NULL, NULL,
1, obj, ofs, len, buf); 1, obj, ofs, len, buf, NULL, ver);
} }
/* /*
* Request sync osd read * Request sync osd watch
*/
static int rbd_req_sync_notify_ack(struct rbd_device *dev,
u64 ver,
u64 notify_id,
const char *obj)
{
struct ceph_osd_req_op *ops;
struct page **pages = NULL;
int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
if (ret < 0)
return ret;
ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
ops[0].watch.cookie = notify_id;
ops[0].watch.flag = 0;
ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
obj, 0, 0, NULL,
pages, 0,
CEPH_OSD_FLAG_READ,
ops,
1,
rbd_simple_req_cb, 0, NULL);
rbd_destroy_ops(ops);
return ret;
}
static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
{
struct rbd_device *dev = (struct rbd_device *)data;
if (!dev)
return;
dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
notify_id, (int)opcode);
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
__rbd_update_snaps(dev);
mutex_unlock(&ctl_mutex);
rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
}
/*
* Request sync osd watch
*/
static int rbd_req_sync_watch(struct rbd_device *dev,
const char *obj,
u64 ver)
{
struct ceph_osd_req_op *ops;
struct ceph_osd_client *osdc = &dev->client->osdc;
int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
if (ret < 0)
return ret;
ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
(void *)dev, &dev->watch_event);
if (ret < 0)
goto fail;
ops[0].watch.ver = cpu_to_le64(ver);
ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
ops[0].watch.flag = 1;
ret = rbd_req_sync_op(dev, NULL,
CEPH_NOSNAP,
0,
CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
ops,
1, obj, 0, 0, NULL,
&dev->watch_request, NULL);
if (ret < 0)
goto fail_event;
rbd_destroy_ops(ops);
return 0;
fail_event:
ceph_osdc_cancel_event(dev->watch_event);
dev->watch_event = NULL;
fail:
rbd_destroy_ops(ops);
return ret;
}
struct rbd_notify_info {
struct rbd_device *dev;
};
static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
{
struct rbd_device *dev = (struct rbd_device *)data;
if (!dev)
return;
dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
notify_id, (int)opcode);
}
/*
* Request sync osd notify
*/
static int rbd_req_sync_notify(struct rbd_device *dev,
const char *obj)
{
struct ceph_osd_req_op *ops;
struct ceph_osd_client *osdc = &dev->client->osdc;
struct ceph_osd_event *event;
struct rbd_notify_info info;
int payload_len = sizeof(u32) + sizeof(u32);
int ret;
ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
if (ret < 0)
return ret;
info.dev = dev;
ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
(void *)&info, &event);
if (ret < 0)
goto fail;
ops[0].watch.ver = 1;
ops[0].watch.flag = 1;
ops[0].watch.cookie = event->cookie;
ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
ops[0].watch.timeout = 12;
ret = rbd_req_sync_op(dev, NULL,
CEPH_NOSNAP,
0,
CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
ops,
1, obj, 0, 0, NULL, NULL, NULL);
if (ret < 0)
goto fail_event;
ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
dout("ceph_osdc_wait_event returned %d\n", ret);
rbd_destroy_ops(ops);
return 0;
fail_event:
ceph_osdc_cancel_event(event);
fail:
rbd_destroy_ops(ops);
return ret;
}
/*
* Request sync osd rollback
*/ */
static int rbd_req_sync_rollback_obj(struct rbd_device *dev, static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
u64 snapid, u64 snapid,
...@@ -969,13 +1227,10 @@ static int rbd_req_sync_rollback_obj(struct rbd_device *dev, ...@@ -969,13 +1227,10 @@ static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
0, 0,
CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
ops, ops,
1, obj, 0, 0, NULL); 1, obj, 0, 0, NULL, NULL, NULL);
rbd_destroy_ops(ops); rbd_destroy_ops(ops);
if (ret < 0)
return ret;
return ret; return ret;
} }
...@@ -987,7 +1242,8 @@ static int rbd_req_sync_exec(struct rbd_device *dev, ...@@ -987,7 +1242,8 @@ static int rbd_req_sync_exec(struct rbd_device *dev,
const char *cls, const char *cls,
const char *method, const char *method,
const char *data, const char *data,
int len) int len,
u64 *ver)
{ {
struct ceph_osd_req_op *ops; struct ceph_osd_req_op *ops;
int cls_len = strlen(cls); int cls_len = strlen(cls);
...@@ -1010,7 +1266,7 @@ static int rbd_req_sync_exec(struct rbd_device *dev, ...@@ -1010,7 +1266,7 @@ static int rbd_req_sync_exec(struct rbd_device *dev,
0, 0,
CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
ops, ops,
1, obj, 0, 0, NULL); 1, obj, 0, 0, NULL, NULL, ver);
rbd_destroy_ops(ops); rbd_destroy_ops(ops);
...@@ -1156,6 +1412,7 @@ static int rbd_read_header(struct rbd_device *rbd_dev, ...@@ -1156,6 +1412,7 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
struct rbd_image_header_ondisk *dh; struct rbd_image_header_ondisk *dh;
int snap_count = 0; int snap_count = 0;
u64 snap_names_len = 0; u64 snap_names_len = 0;
u64 ver;
while (1) { while (1) {
int len = sizeof(*dh) + int len = sizeof(*dh) +
...@@ -1171,7 +1428,7 @@ static int rbd_read_header(struct rbd_device *rbd_dev, ...@@ -1171,7 +1428,7 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
NULL, CEPH_NOSNAP, NULL, CEPH_NOSNAP,
rbd_dev->obj_md_name, rbd_dev->obj_md_name,
0, len, 0, len,
(char *)dh); (char *)dh, &ver);
if (rc < 0) if (rc < 0)
goto out_dh; goto out_dh;
...@@ -1188,6 +1445,7 @@ static int rbd_read_header(struct rbd_device *rbd_dev, ...@@ -1188,6 +1445,7 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
} }
break; break;
} }
header->obj_version = ver;
out_dh: out_dh:
kfree(dh); kfree(dh);
...@@ -1205,6 +1463,7 @@ static int rbd_header_add_snap(struct rbd_device *dev, ...@@ -1205,6 +1463,7 @@ static int rbd_header_add_snap(struct rbd_device *dev,
u64 new_snapid; u64 new_snapid;
int ret; int ret;
void *data, *data_start, *data_end; void *data, *data_start, *data_end;
u64 ver;
/* we should create a snapshot only if we're pointing at the head */ /* we should create a snapshot only if we're pointing at the head */
if (dev->cur_snap) if (dev->cur_snap)
...@@ -1227,7 +1486,7 @@ static int rbd_header_add_snap(struct rbd_device *dev, ...@@ -1227,7 +1486,7 @@ static int rbd_header_add_snap(struct rbd_device *dev,
ceph_encode_64_safe(&data, data_end, new_snapid, bad); ceph_encode_64_safe(&data, data_end, new_snapid, bad);
ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add", ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
data_start, data - data_start); data_start, data - data_start, &ver);
kfree(data_start); kfree(data_start);
...@@ -1259,6 +1518,7 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev) ...@@ -1259,6 +1518,7 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev)
int ret; int ret;
struct rbd_image_header h; struct rbd_image_header h;
u64 snap_seq; u64 snap_seq;
int follow_seq = 0;
ret = rbd_read_header(rbd_dev, &h); ret = rbd_read_header(rbd_dev, &h);
if (ret < 0) if (ret < 0)
...@@ -1267,6 +1527,11 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev) ...@@ -1267,6 +1527,11 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev)
down_write(&rbd_dev->header.snap_rwsem); down_write(&rbd_dev->header.snap_rwsem);
snap_seq = rbd_dev->header.snapc->seq; snap_seq = rbd_dev->header.snapc->seq;
if (rbd_dev->header.total_snaps &&
rbd_dev->header.snapc->snaps[0] == snap_seq)
/* pointing at the head, will need to follow that
if head moves */
follow_seq = 1;
kfree(rbd_dev->header.snapc); kfree(rbd_dev->header.snapc);
kfree(rbd_dev->header.snap_names); kfree(rbd_dev->header.snap_names);
...@@ -1277,6 +1542,9 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev) ...@@ -1277,6 +1542,9 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev)
rbd_dev->header.snap_names = h.snap_names; rbd_dev->header.snap_names = h.snap_names;
rbd_dev->header.snap_names_len = h.snap_names_len; rbd_dev->header.snap_names_len = h.snap_names_len;
rbd_dev->header.snap_sizes = h.snap_sizes; rbd_dev->header.snap_sizes = h.snap_sizes;
if (follow_seq)
rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
else
rbd_dev->header.snapc->seq = snap_seq; rbd_dev->header.snapc->seq = snap_seq;
ret = __rbd_init_snaps_header(rbd_dev); ret = __rbd_init_snaps_header(rbd_dev);
...@@ -1699,7 +1967,28 @@ static void rbd_bus_del_dev(struct rbd_device *rbd_dev) ...@@ -1699,7 +1967,28 @@ static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
device_unregister(&rbd_dev->dev); device_unregister(&rbd_dev->dev);
} }
static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count) static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
{
int ret, rc;
do {
ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
rbd_dev->header.obj_version);
if (ret == -ERANGE) {
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
rc = __rbd_update_snaps(rbd_dev);
mutex_unlock(&ctl_mutex);
if (rc < 0)
return rc;
}
} while (ret == -ERANGE);
return ret;
}
static ssize_t rbd_add(struct bus_type *bus,
const char *buf,
size_t count)
{ {
struct ceph_osd_client *osdc; struct ceph_osd_client *osdc;
struct rbd_device *rbd_dev; struct rbd_device *rbd_dev;
...@@ -1797,6 +2086,10 @@ static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count) ...@@ -1797,6 +2086,10 @@ static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count)
if (rc) if (rc)
goto err_out_bus; goto err_out_bus;
rc = rbd_init_watch_dev(rbd_dev);
if (rc)
goto err_out_bus;
return count; return count;
err_out_bus: err_out_bus:
...@@ -1849,6 +2142,12 @@ static void rbd_dev_release(struct device *dev) ...@@ -1849,6 +2142,12 @@ static void rbd_dev_release(struct device *dev)
struct rbd_device *rbd_dev = struct rbd_device *rbd_dev =
container_of(dev, struct rbd_device, dev); container_of(dev, struct rbd_device, dev);
if (rbd_dev->watch_request)
ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc,
rbd_dev->watch_request);
if (rbd_dev->watch_event)
ceph_osdc_cancel_event(rbd_dev->watch_event);
rbd_put_client(rbd_dev); rbd_put_client(rbd_dev);
/* clean up and free blkdev */ /* clean up and free blkdev */
...@@ -1914,14 +2213,24 @@ static ssize_t rbd_snap_add(struct device *dev, ...@@ -1914,14 +2213,24 @@ static ssize_t rbd_snap_add(struct device *dev,
ret = rbd_header_add_snap(rbd_dev, ret = rbd_header_add_snap(rbd_dev,
name, GFP_KERNEL); name, GFP_KERNEL);
if (ret < 0) if (ret < 0)
goto done_unlock; goto err_unlock;
ret = __rbd_update_snaps(rbd_dev); ret = __rbd_update_snaps(rbd_dev);
if (ret < 0) if (ret < 0)
goto done_unlock; goto err_unlock;
/* shouldn't hold ctl_mutex when notifying.. notify might
trigger a watch callback that would need to get that mutex */
mutex_unlock(&ctl_mutex);
/* make a best effort, don't error if failed */
rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
ret = count; ret = count;
done_unlock: kfree(name);
return ret;
err_unlock:
mutex_unlock(&ctl_mutex); mutex_unlock(&ctl_mutex);
kfree(name); kfree(name);
return ret; return ret;
......
...@@ -210,8 +210,6 @@ int ceph_fs_debugfs_init(struct ceph_fs_client *fsc) ...@@ -210,8 +210,6 @@ int ceph_fs_debugfs_init(struct ceph_fs_client *fsc)
if (!fsc->debugfs_congestion_kb) if (!fsc->debugfs_congestion_kb)
goto out; goto out;
dout("a\n");
snprintf(name, sizeof(name), "../../bdi/%s", snprintf(name, sizeof(name), "../../bdi/%s",
dev_name(fsc->backing_dev_info.dev)); dev_name(fsc->backing_dev_info.dev));
fsc->debugfs_bdi = fsc->debugfs_bdi =
...@@ -221,7 +219,6 @@ int ceph_fs_debugfs_init(struct ceph_fs_client *fsc) ...@@ -221,7 +219,6 @@ int ceph_fs_debugfs_init(struct ceph_fs_client *fsc)
if (!fsc->debugfs_bdi) if (!fsc->debugfs_bdi)
goto out; goto out;
dout("b\n");
fsc->debugfs_mdsmap = debugfs_create_file("mdsmap", fsc->debugfs_mdsmap = debugfs_create_file("mdsmap",
0600, 0600,
fsc->client->debugfs_dir, fsc->client->debugfs_dir,
...@@ -230,7 +227,6 @@ int ceph_fs_debugfs_init(struct ceph_fs_client *fsc) ...@@ -230,7 +227,6 @@ int ceph_fs_debugfs_init(struct ceph_fs_client *fsc)
if (!fsc->debugfs_mdsmap) if (!fsc->debugfs_mdsmap)
goto out; goto out;
dout("ca\n");
fsc->debugfs_mdsc = debugfs_create_file("mdsc", fsc->debugfs_mdsc = debugfs_create_file("mdsc",
0600, 0600,
fsc->client->debugfs_dir, fsc->client->debugfs_dir,
...@@ -239,7 +235,6 @@ int ceph_fs_debugfs_init(struct ceph_fs_client *fsc) ...@@ -239,7 +235,6 @@ int ceph_fs_debugfs_init(struct ceph_fs_client *fsc)
if (!fsc->debugfs_mdsc) if (!fsc->debugfs_mdsc)
goto out; goto out;
dout("da\n");
fsc->debugfs_caps = debugfs_create_file("caps", fsc->debugfs_caps = debugfs_create_file("caps",
0400, 0400,
fsc->client->debugfs_dir, fsc->client->debugfs_dir,
...@@ -248,7 +243,6 @@ int ceph_fs_debugfs_init(struct ceph_fs_client *fsc) ...@@ -248,7 +243,6 @@ int ceph_fs_debugfs_init(struct ceph_fs_client *fsc)
if (!fsc->debugfs_caps) if (!fsc->debugfs_caps)
goto out; goto out;
dout("ea\n");
fsc->debugfs_dentry_lru = debugfs_create_file("dentry_lru", fsc->debugfs_dentry_lru = debugfs_create_file("dentry_lru",
0600, 0600,
fsc->client->debugfs_dir, fsc->client->debugfs_dir,
......
...@@ -161,7 +161,7 @@ static int __dcache_readdir(struct file *filp, ...@@ -161,7 +161,7 @@ static int __dcache_readdir(struct file *filp,
filp->f_pos = di->offset; filp->f_pos = di->offset;
err = filldir(dirent, dentry->d_name.name, err = filldir(dirent, dentry->d_name.name,
dentry->d_name.len, di->offset, dentry->d_name.len, di->offset,
dentry->d_inode->i_ino, ceph_translate_ino(dentry->d_sb, dentry->d_inode->i_ino),
dentry->d_inode->i_mode >> 12); dentry->d_inode->i_mode >> 12);
if (last) { if (last) {
...@@ -245,15 +245,17 @@ static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir) ...@@ -245,15 +245,17 @@ static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir)
dout("readdir off 0 -> '.'\n"); dout("readdir off 0 -> '.'\n");
if (filldir(dirent, ".", 1, ceph_make_fpos(0, 0), if (filldir(dirent, ".", 1, ceph_make_fpos(0, 0),
inode->i_ino, inode->i_mode >> 12) < 0) ceph_translate_ino(inode->i_sb, inode->i_ino),
inode->i_mode >> 12) < 0)
return 0; return 0;
filp->f_pos = 1; filp->f_pos = 1;
off = 1; off = 1;
} }
if (filp->f_pos == 1) { if (filp->f_pos == 1) {
ino_t ino = filp->f_dentry->d_parent->d_inode->i_ino;
dout("readdir off 1 -> '..'\n"); dout("readdir off 1 -> '..'\n");
if (filldir(dirent, "..", 2, ceph_make_fpos(0, 1), if (filldir(dirent, "..", 2, ceph_make_fpos(0, 1),
filp->f_dentry->d_parent->d_inode->i_ino, ceph_translate_ino(inode->i_sb, ino),
inode->i_mode >> 12) < 0) inode->i_mode >> 12) < 0)
return 0; return 0;
filp->f_pos = 2; filp->f_pos = 2;
...@@ -377,7 +379,8 @@ static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir) ...@@ -377,7 +379,8 @@ static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir)
if (filldir(dirent, if (filldir(dirent,
rinfo->dir_dname[off - fi->offset], rinfo->dir_dname[off - fi->offset],
rinfo->dir_dname_len[off - fi->offset], rinfo->dir_dname_len[off - fi->offset],
pos, ino, ftype) < 0) { pos,
ceph_translate_ino(inode->i_sb, ino), ftype) < 0) {
dout("filldir stopping us...\n"); dout("filldir stopping us...\n");
return 0; return 0;
} }
...@@ -1024,14 +1027,13 @@ static int ceph_d_revalidate(struct dentry *dentry, struct nameidata *nd) ...@@ -1024,14 +1027,13 @@ static int ceph_d_revalidate(struct dentry *dentry, struct nameidata *nd)
} }
/* /*
* When a dentry is released, clear the dir I_COMPLETE if it was part * Release our ceph_dentry_info.
* of the current dir gen or if this is in the snapshot namespace.
*/ */
static void ceph_dentry_release(struct dentry *dentry) static void ceph_d_release(struct dentry *dentry)
{ {
struct ceph_dentry_info *di = ceph_dentry(dentry); struct ceph_dentry_info *di = ceph_dentry(dentry);
dout("dentry_release %p\n", dentry); dout("d_release %p\n", dentry);
if (di) { if (di) {
ceph_dentry_lru_del(dentry); ceph_dentry_lru_del(dentry);
if (di->lease_session) if (di->lease_session)
...@@ -1256,14 +1258,14 @@ const struct inode_operations ceph_dir_iops = { ...@@ -1256,14 +1258,14 @@ const struct inode_operations ceph_dir_iops = {
const struct dentry_operations ceph_dentry_ops = { const struct dentry_operations ceph_dentry_ops = {
.d_revalidate = ceph_d_revalidate, .d_revalidate = ceph_d_revalidate,
.d_release = ceph_dentry_release, .d_release = ceph_d_release,
}; };
const struct dentry_operations ceph_snapdir_dentry_ops = { const struct dentry_operations ceph_snapdir_dentry_ops = {
.d_revalidate = ceph_snapdir_d_revalidate, .d_revalidate = ceph_snapdir_d_revalidate,
.d_release = ceph_dentry_release, .d_release = ceph_d_release,
}; };
const struct dentry_operations ceph_snap_dentry_ops = { const struct dentry_operations ceph_snap_dentry_ops = {
.d_release = ceph_dentry_release, .d_release = ceph_d_release,
}; };
...@@ -564,11 +564,19 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data, ...@@ -564,11 +564,19 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data,
* start_request so that a tid has been assigned. * start_request so that a tid has been assigned.
*/ */
spin_lock(&ci->i_unsafe_lock); spin_lock(&ci->i_unsafe_lock);
list_add(&req->r_unsafe_item, &ci->i_unsafe_writes); list_add_tail(&req->r_unsafe_item,
&ci->i_unsafe_writes);
spin_unlock(&ci->i_unsafe_lock); spin_unlock(&ci->i_unsafe_lock);
ceph_get_cap_refs(ci, CEPH_CAP_FILE_WR); ceph_get_cap_refs(ci, CEPH_CAP_FILE_WR);
} }
ret = ceph_osdc_wait_request(&fsc->client->osdc, req); ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
if (ret < 0 && req->r_safe_callback) {
spin_lock(&ci->i_unsafe_lock);
list_del_init(&req->r_unsafe_item);
spin_unlock(&ci->i_unsafe_lock);
ceph_put_cap_refs(ci, CEPH_CAP_FILE_WR);
}
} }
if (file->f_flags & O_DIRECT) if (file->f_flags & O_DIRECT)
......
...@@ -36,6 +36,13 @@ static void ceph_vmtruncate_work(struct work_struct *work); ...@@ -36,6 +36,13 @@ static void ceph_vmtruncate_work(struct work_struct *work);
/* /*
* find or create an inode, given the ceph ino number * find or create an inode, given the ceph ino number
*/ */
static int ceph_set_ino_cb(struct inode *inode, void *data)
{
ceph_inode(inode)->i_vino = *(struct ceph_vino *)data;
inode->i_ino = ceph_vino_to_ino(*(struct ceph_vino *)data);
return 0;
}
struct inode *ceph_get_inode(struct super_block *sb, struct ceph_vino vino) struct inode *ceph_get_inode(struct super_block *sb, struct ceph_vino vino)
{ {
struct inode *inode; struct inode *inode;
...@@ -1030,9 +1037,6 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, ...@@ -1030,9 +1037,6 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
dout("fill_trace doing d_move %p -> %p\n", dout("fill_trace doing d_move %p -> %p\n",
req->r_old_dentry, dn); req->r_old_dentry, dn);
/* d_move screws up d_subdirs order */
ceph_i_clear(dir, CEPH_I_COMPLETE);
d_move(req->r_old_dentry, dn); d_move(req->r_old_dentry, dn);
dout(" src %p '%.*s' dst %p '%.*s'\n", dout(" src %p '%.*s' dst %p '%.*s'\n",
req->r_old_dentry, req->r_old_dentry,
...@@ -1044,12 +1048,15 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, ...@@ -1044,12 +1048,15 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
rehashing bug in vfs_rename_dir */ rehashing bug in vfs_rename_dir */
ceph_invalidate_dentry_lease(dn); ceph_invalidate_dentry_lease(dn);
/* take overwritten dentry's readdir offset */ /*
dout("dn %p gets %p offset %lld (old offset %lld)\n", * d_move() puts the renamed dentry at the end of
req->r_old_dentry, dn, ceph_dentry(dn)->offset, * d_subdirs. We need to assign it an appropriate
* directory offset so we can behave when holding
* I_COMPLETE.
*/
ceph_set_dentry_offset(req->r_old_dentry);
dout("dn %p gets new offset %lld\n", req->r_old_dentry,
ceph_dentry(req->r_old_dentry)->offset); ceph_dentry(req->r_old_dentry)->offset);
ceph_dentry(req->r_old_dentry)->offset =
ceph_dentry(dn)->offset;
dn = req->r_old_dentry; /* use old_dentry */ dn = req->r_old_dentry; /* use old_dentry */
in = dn->d_inode; in = dn->d_inode;
...@@ -1809,7 +1816,7 @@ int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry, ...@@ -1809,7 +1816,7 @@ int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry,
err = ceph_do_getattr(inode, CEPH_STAT_CAP_INODE_ALL); err = ceph_do_getattr(inode, CEPH_STAT_CAP_INODE_ALL);
if (!err) { if (!err) {
generic_fillattr(inode, stat); generic_fillattr(inode, stat);
stat->ino = inode->i_ino; stat->ino = ceph_translate_ino(inode->i_sb, inode->i_ino);
if (ceph_snap(inode) != CEPH_NOSNAP) if (ceph_snap(inode) != CEPH_NOSNAP)
stat->dev = ceph_snap(inode); stat->dev = ceph_snap(inode);
else else
......
...@@ -131,6 +131,7 @@ enum { ...@@ -131,6 +131,7 @@ enum {
Opt_rbytes, Opt_rbytes,
Opt_norbytes, Opt_norbytes,
Opt_noasyncreaddir, Opt_noasyncreaddir,
Opt_ino32,
}; };
static match_table_t fsopt_tokens = { static match_table_t fsopt_tokens = {
...@@ -150,6 +151,7 @@ static match_table_t fsopt_tokens = { ...@@ -150,6 +151,7 @@ static match_table_t fsopt_tokens = {
{Opt_rbytes, "rbytes"}, {Opt_rbytes, "rbytes"},
{Opt_norbytes, "norbytes"}, {Opt_norbytes, "norbytes"},
{Opt_noasyncreaddir, "noasyncreaddir"}, {Opt_noasyncreaddir, "noasyncreaddir"},
{Opt_ino32, "ino32"},
{-1, NULL} {-1, NULL}
}; };
...@@ -225,6 +227,9 @@ static int parse_fsopt_token(char *c, void *private) ...@@ -225,6 +227,9 @@ static int parse_fsopt_token(char *c, void *private)
case Opt_noasyncreaddir: case Opt_noasyncreaddir:
fsopt->flags |= CEPH_MOUNT_OPT_NOASYNCREADDIR; fsopt->flags |= CEPH_MOUNT_OPT_NOASYNCREADDIR;
break; break;
case Opt_ino32:
fsopt->flags |= CEPH_MOUNT_OPT_INO32;
break;
default: default:
BUG_ON(token); BUG_ON(token);
} }
...@@ -288,7 +293,7 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt, ...@@ -288,7 +293,7 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,
fsopt->sb_flags = flags; fsopt->sb_flags = flags;
fsopt->flags = CEPH_MOUNT_OPT_DEFAULT; fsopt->flags = CEPH_MOUNT_OPT_DEFAULT;
fsopt->rsize = CEPH_MOUNT_RSIZE_DEFAULT; fsopt->rsize = CEPH_RSIZE_DEFAULT;
fsopt->snapdir_name = kstrdup(CEPH_SNAPDIRNAME_DEFAULT, GFP_KERNEL); fsopt->snapdir_name = kstrdup(CEPH_SNAPDIRNAME_DEFAULT, GFP_KERNEL);
fsopt->caps_wanted_delay_min = CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT; fsopt->caps_wanted_delay_min = CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT;
fsopt->caps_wanted_delay_max = CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT; fsopt->caps_wanted_delay_max = CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT;
...@@ -370,7 +375,7 @@ static int ceph_show_options(struct seq_file *m, struct vfsmount *mnt) ...@@ -370,7 +375,7 @@ static int ceph_show_options(struct seq_file *m, struct vfsmount *mnt)
if (fsopt->wsize) if (fsopt->wsize)
seq_printf(m, ",wsize=%d", fsopt->wsize); seq_printf(m, ",wsize=%d", fsopt->wsize);
if (fsopt->rsize != CEPH_MOUNT_RSIZE_DEFAULT) if (fsopt->rsize != CEPH_RSIZE_DEFAULT)
seq_printf(m, ",rsize=%d", fsopt->rsize); seq_printf(m, ",rsize=%d", fsopt->rsize);
if (fsopt->congestion_kb != default_congestion_kb()) if (fsopt->congestion_kb != default_congestion_kb())
seq_printf(m, ",write_congestion_kb=%d", fsopt->congestion_kb); seq_printf(m, ",write_congestion_kb=%d", fsopt->congestion_kb);
......
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
#define CEPH_MOUNT_OPT_DIRSTAT (1<<4) /* `cat dirname` for stats */ #define CEPH_MOUNT_OPT_DIRSTAT (1<<4) /* `cat dirname` for stats */
#define CEPH_MOUNT_OPT_RBYTES (1<<5) /* dir st_bytes = rbytes */ #define CEPH_MOUNT_OPT_RBYTES (1<<5) /* dir st_bytes = rbytes */
#define CEPH_MOUNT_OPT_NOASYNCREADDIR (1<<7) /* no dcache readdir */ #define CEPH_MOUNT_OPT_NOASYNCREADDIR (1<<7) /* no dcache readdir */
#define CEPH_MOUNT_OPT_INO32 (1<<8) /* 32 bit inos */
#define CEPH_MOUNT_OPT_DEFAULT (CEPH_MOUNT_OPT_RBYTES) #define CEPH_MOUNT_OPT_DEFAULT (CEPH_MOUNT_OPT_RBYTES)
...@@ -35,6 +36,7 @@ ...@@ -35,6 +36,7 @@
#define ceph_test_mount_opt(fsc, opt) \ #define ceph_test_mount_opt(fsc, opt) \
(!!((fsc)->mount_options->flags & CEPH_MOUNT_OPT_##opt)) (!!((fsc)->mount_options->flags & CEPH_MOUNT_OPT_##opt))
#define CEPH_RSIZE_DEFAULT (512*1024) /* readahead */
#define CEPH_MAX_READDIR_DEFAULT 1024 #define CEPH_MAX_READDIR_DEFAULT 1024
#define CEPH_MAX_READDIR_BYTES_DEFAULT (512*1024) #define CEPH_MAX_READDIR_BYTES_DEFAULT (512*1024)
#define CEPH_SNAPDIRNAME_DEFAULT ".snap" #define CEPH_SNAPDIRNAME_DEFAULT ".snap"
...@@ -319,6 +321,16 @@ static inline struct ceph_inode_info *ceph_inode(struct inode *inode) ...@@ -319,6 +321,16 @@ static inline struct ceph_inode_info *ceph_inode(struct inode *inode)
return container_of(inode, struct ceph_inode_info, vfs_inode); return container_of(inode, struct ceph_inode_info, vfs_inode);
} }
static inline struct ceph_fs_client *ceph_inode_to_client(struct inode *inode)
{
return (struct ceph_fs_client *)inode->i_sb->s_fs_info;
}
static inline struct ceph_fs_client *ceph_sb_to_client(struct super_block *sb)
{
return (struct ceph_fs_client *)sb->s_fs_info;
}
static inline struct ceph_vino ceph_vino(struct inode *inode) static inline struct ceph_vino ceph_vino(struct inode *inode)
{ {
return ceph_inode(inode)->i_vino; return ceph_inode(inode)->i_vino;
...@@ -327,19 +339,49 @@ static inline struct ceph_vino ceph_vino(struct inode *inode) ...@@ -327,19 +339,49 @@ static inline struct ceph_vino ceph_vino(struct inode *inode)
/* /*
* ino_t is <64 bits on many architectures, blech. * ino_t is <64 bits on many architectures, blech.
* *
* don't include snap in ino hash, at least for now. * i_ino (kernel inode) st_ino (userspace)
* i386 32 32
* x86_64+ino32 64 32
* x86_64 64 64
*/
static inline u32 ceph_ino_to_ino32(ino_t ino)
{
ino ^= ino >> (sizeof(ino) * 8 - 32);
if (!ino)
ino = 1;
return ino;
}
/*
* kernel i_ino value
*/ */
static inline ino_t ceph_vino_to_ino(struct ceph_vino vino) static inline ino_t ceph_vino_to_ino(struct ceph_vino vino)
{ {
ino_t ino = (ino_t)vino.ino; /* ^ (vino.snap << 20); */ ino_t ino = (ino_t)vino.ino; /* ^ (vino.snap << 20); */
#if BITS_PER_LONG == 32 #if BITS_PER_LONG == 32
ino ^= vino.ino >> (sizeof(u64)-sizeof(ino_t)) * 8; ino = ceph_ino_to_ino32(ino);
if (!ino)
ino = 1;
#endif #endif
return ino; return ino;
} }
/*
* user-visible ino (stat, filldir)
*/
#if BITS_PER_LONG == 32
static inline ino_t ceph_translate_ino(struct super_block *sb, ino_t ino)
{
return ino;
}
#else
static inline ino_t ceph_translate_ino(struct super_block *sb, ino_t ino)
{
if (ceph_test_mount_opt(ceph_sb_to_client(sb), INO32))
ino = ceph_ino_to_ino32(ino);
return ino;
}
#endif
/* for printf-style formatting */ /* for printf-style formatting */
#define ceph_vinop(i) ceph_inode(i)->i_vino.ino, ceph_inode(i)->i_vino.snap #define ceph_vinop(i) ceph_inode(i)->i_vino.ino, ceph_inode(i)->i_vino.snap
...@@ -428,13 +470,6 @@ static inline loff_t ceph_make_fpos(unsigned frag, unsigned off) ...@@ -428,13 +470,6 @@ static inline loff_t ceph_make_fpos(unsigned frag, unsigned off)
return ((loff_t)frag << 32) | (loff_t)off; return ((loff_t)frag << 32) | (loff_t)off;
} }
static inline int ceph_set_ino_cb(struct inode *inode, void *data)
{
ceph_inode(inode)->i_vino = *(struct ceph_vino *)data;
inode->i_ino = ceph_vino_to_ino(*(struct ceph_vino *)data);
return 0;
}
/* /*
* caps helpers * caps helpers
*/ */
...@@ -503,15 +538,6 @@ extern void ceph_reservation_status(struct ceph_fs_client *client, ...@@ -503,15 +538,6 @@ extern void ceph_reservation_status(struct ceph_fs_client *client,
int *total, int *avail, int *used, int *total, int *avail, int *used,
int *reserved, int *min); int *reserved, int *min);
static inline struct ceph_fs_client *ceph_inode_to_client(struct inode *inode)
{
return (struct ceph_fs_client *)inode->i_sb->s_fs_info;
}
static inline struct ceph_fs_client *ceph_sb_to_client(struct super_block *sb)
{
return (struct ceph_fs_client *)sb->s_fs_info;
}
/* /*
......
...@@ -139,6 +139,15 @@ struct ceph_dir_layout { ...@@ -139,6 +139,15 @@ struct ceph_dir_layout {
#define CEPH_MSG_OSD_MAP 41 #define CEPH_MSG_OSD_MAP 41
#define CEPH_MSG_OSD_OP 42 #define CEPH_MSG_OSD_OP 42
#define CEPH_MSG_OSD_OPREPLY 43 #define CEPH_MSG_OSD_OPREPLY 43
#define CEPH_MSG_WATCH_NOTIFY 44
/* watch-notify operations */
enum {
WATCH_NOTIFY = 1, /* notifying watcher */
WATCH_NOTIFY_COMPLETE = 2, /* notifier notified when done */
};
/* pool operations */ /* pool operations */
enum { enum {
...@@ -213,6 +222,8 @@ struct ceph_client_mount { ...@@ -213,6 +222,8 @@ struct ceph_client_mount {
struct ceph_mon_request_header monhdr; struct ceph_mon_request_header monhdr;
} __attribute__ ((packed)); } __attribute__ ((packed));
#define CEPH_SUBSCRIBE_ONETIME 1 /* i want only 1 update after have */
struct ceph_mon_subscribe_item { struct ceph_mon_subscribe_item {
__le64 have_version; __le64 have; __le64 have_version; __le64 have;
__u8 onetime; __u8 onetime;
......
...@@ -71,7 +71,6 @@ struct ceph_options { ...@@ -71,7 +71,6 @@ struct ceph_options {
#define CEPH_OSD_TIMEOUT_DEFAULT 60 /* seconds */ #define CEPH_OSD_TIMEOUT_DEFAULT 60 /* seconds */
#define CEPH_OSD_KEEPALIVE_DEFAULT 5 #define CEPH_OSD_KEEPALIVE_DEFAULT 5
#define CEPH_OSD_IDLE_TTL_DEFAULT 60 #define CEPH_OSD_IDLE_TTL_DEFAULT 60
#define CEPH_MOUNT_RSIZE_DEFAULT (512*1024) /* readahead */
#define CEPH_MSG_MAX_FRONT_LEN (16*1024*1024) #define CEPH_MSG_MAX_FRONT_LEN (16*1024*1024)
#define CEPH_MSG_MAX_DATA_LEN (16*1024*1024) #define CEPH_MSG_MAX_DATA_LEN (16*1024*1024)
......
...@@ -32,6 +32,7 @@ struct ceph_osd { ...@@ -32,6 +32,7 @@ struct ceph_osd {
struct rb_node o_node; struct rb_node o_node;
struct ceph_connection o_con; struct ceph_connection o_con;
struct list_head o_requests; struct list_head o_requests;
struct list_head o_linger_requests;
struct list_head o_osd_lru; struct list_head o_osd_lru;
struct ceph_authorizer *o_authorizer; struct ceph_authorizer *o_authorizer;
void *o_authorizer_buf, *o_authorizer_reply_buf; void *o_authorizer_buf, *o_authorizer_reply_buf;
...@@ -47,6 +48,8 @@ struct ceph_osd_request { ...@@ -47,6 +48,8 @@ struct ceph_osd_request {
struct rb_node r_node; struct rb_node r_node;
struct list_head r_req_lru_item; struct list_head r_req_lru_item;
struct list_head r_osd_item; struct list_head r_osd_item;
struct list_head r_linger_item;
struct list_head r_linger_osd;
struct ceph_osd *r_osd; struct ceph_osd *r_osd;
struct ceph_pg r_pgid; struct ceph_pg r_pgid;
int r_pg_osds[CEPH_PG_MAX_SIZE]; int r_pg_osds[CEPH_PG_MAX_SIZE];
...@@ -59,6 +62,7 @@ struct ceph_osd_request { ...@@ -59,6 +62,7 @@ struct ceph_osd_request {
int r_flags; /* any additional flags for the osd */ int r_flags; /* any additional flags for the osd */
u32 r_sent; /* >0 if r_request is sending/sent */ u32 r_sent; /* >0 if r_request is sending/sent */
int r_got_reply; int r_got_reply;
int r_linger;
struct ceph_osd_client *r_osdc; struct ceph_osd_client *r_osdc;
struct kref r_kref; struct kref r_kref;
...@@ -74,7 +78,6 @@ struct ceph_osd_request { ...@@ -74,7 +78,6 @@ struct ceph_osd_request {
char r_oid[40]; /* object name */ char r_oid[40]; /* object name */
int r_oid_len; int r_oid_len;
unsigned long r_stamp; /* send OR check time */ unsigned long r_stamp; /* send OR check time */
bool r_resend; /* msg send failed, needs retry */
struct ceph_file_layout r_file_layout; struct ceph_file_layout r_file_layout;
struct ceph_snap_context *r_snapc; /* snap context for writes */ struct ceph_snap_context *r_snapc; /* snap context for writes */
...@@ -90,6 +93,26 @@ struct ceph_osd_request { ...@@ -90,6 +93,26 @@ struct ceph_osd_request {
struct ceph_pagelist *r_trail; /* trailing part of the data */ struct ceph_pagelist *r_trail; /* trailing part of the data */
}; };
struct ceph_osd_event {
u64 cookie;
int one_shot;
struct ceph_osd_client *osdc;
void (*cb)(u64, u64, u8, void *);
void *data;
struct rb_node node;
struct list_head osd_node;
struct kref kref;
struct completion completion;
};
struct ceph_osd_event_work {
struct work_struct work;
struct ceph_osd_event *event;
u64 ver;
u64 notify_id;
u8 opcode;
};
struct ceph_osd_client { struct ceph_osd_client {
struct ceph_client *client; struct ceph_client *client;
...@@ -104,7 +127,10 @@ struct ceph_osd_client { ...@@ -104,7 +127,10 @@ struct ceph_osd_client {
u64 timeout_tid; /* tid of timeout triggering rq */ u64 timeout_tid; /* tid of timeout triggering rq */
u64 last_tid; /* tid of last request */ u64 last_tid; /* tid of last request */
struct rb_root requests; /* pending requests */ struct rb_root requests; /* pending requests */
struct list_head req_lru; /* pending requests lru */ struct list_head req_lru; /* in-flight lru */
struct list_head req_unsent; /* unsent/need-resend queue */
struct list_head req_notarget; /* map to no osd */
struct list_head req_linger; /* lingering requests */
int num_requests; int num_requests;
struct delayed_work timeout_work; struct delayed_work timeout_work;
struct delayed_work osds_timeout_work; struct delayed_work osds_timeout_work;
...@@ -116,6 +142,12 @@ struct ceph_osd_client { ...@@ -116,6 +142,12 @@ struct ceph_osd_client {
struct ceph_msgpool msgpool_op; struct ceph_msgpool msgpool_op;
struct ceph_msgpool msgpool_op_reply; struct ceph_msgpool msgpool_op_reply;
spinlock_t event_lock;
struct rb_root event_tree;
u64 event_count;
struct workqueue_struct *notify_wq;
}; };
struct ceph_osd_req_op { struct ceph_osd_req_op {
...@@ -150,6 +182,13 @@ struct ceph_osd_req_op { ...@@ -150,6 +182,13 @@ struct ceph_osd_req_op {
struct { struct {
u64 snapid; u64 snapid;
} snap; } snap;
struct {
u64 cookie;
u64 ver;
__u8 flag;
u32 prot_ver;
u32 timeout;
} watch;
}; };
u32 payload_len; u32 payload_len;
}; };
...@@ -198,6 +237,11 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *, ...@@ -198,6 +237,11 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
bool use_mempool, int num_reply, bool use_mempool, int num_reply,
int page_align); int page_align);
extern void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
struct ceph_osd_request *req);
extern void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req);
static inline void ceph_osdc_get_request(struct ceph_osd_request *req) static inline void ceph_osdc_get_request(struct ceph_osd_request *req)
{ {
kref_get(&req->r_kref); kref_get(&req->r_kref);
...@@ -233,5 +277,14 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc, ...@@ -233,5 +277,14 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
struct page **pages, int nr_pages, struct page **pages, int nr_pages,
int flags, int do_sync, bool nofail); int flags, int do_sync, bool nofail);
/* watch/notify events */
extern int ceph_osdc_create_event(struct ceph_osd_client *osdc,
void (*event_cb)(u64, u64, u8, void *),
int one_shot, void *data,
struct ceph_osd_event **pevent);
extern void ceph_osdc_cancel_event(struct ceph_osd_event *event);
extern int ceph_osdc_wait_event(struct ceph_osd_event *event,
unsigned long timeout);
extern void ceph_osdc_put_event(struct ceph_osd_event *event);
#endif #endif
...@@ -12,9 +12,9 @@ ...@@ -12,9 +12,9 @@
* osdmap encoding versions * osdmap encoding versions
*/ */
#define CEPH_OSDMAP_INC_VERSION 5 #define CEPH_OSDMAP_INC_VERSION 5
#define CEPH_OSDMAP_INC_VERSION_EXT 5 #define CEPH_OSDMAP_INC_VERSION_EXT 6
#define CEPH_OSDMAP_VERSION 5 #define CEPH_OSDMAP_VERSION 5
#define CEPH_OSDMAP_VERSION_EXT 5 #define CEPH_OSDMAP_VERSION_EXT 6
/* /*
* fs id * fs id
...@@ -181,9 +181,17 @@ enum { ...@@ -181,9 +181,17 @@ enum {
/* read */ /* read */
CEPH_OSD_OP_READ = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 1, CEPH_OSD_OP_READ = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 1,
CEPH_OSD_OP_STAT = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 2, CEPH_OSD_OP_STAT = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 2,
CEPH_OSD_OP_MAPEXT = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 3,
/* fancy read */ /* fancy read */
CEPH_OSD_OP_MASKTRUNC = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 4, CEPH_OSD_OP_MASKTRUNC = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 4,
CEPH_OSD_OP_SPARSE_READ = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 5,
CEPH_OSD_OP_NOTIFY = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 6,
CEPH_OSD_OP_NOTIFY_ACK = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 7,
/* versioning */
CEPH_OSD_OP_ASSERT_VER = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 8,
/* write */ /* write */
CEPH_OSD_OP_WRITE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 1, CEPH_OSD_OP_WRITE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 1,
...@@ -205,6 +213,8 @@ enum { ...@@ -205,6 +213,8 @@ enum {
CEPH_OSD_OP_CREATE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 13, CEPH_OSD_OP_CREATE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 13,
CEPH_OSD_OP_ROLLBACK= CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 14, CEPH_OSD_OP_ROLLBACK= CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 14,
CEPH_OSD_OP_WATCH = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 15,
/** attrs **/ /** attrs **/
/* read */ /* read */
CEPH_OSD_OP_GETXATTR = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_ATTR | 1, CEPH_OSD_OP_GETXATTR = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_ATTR | 1,
...@@ -223,6 +233,9 @@ enum { ...@@ -223,6 +233,9 @@ enum {
CEPH_OSD_OP_BALANCEREADS = CEPH_OSD_OP_MODE_SUB | 3, CEPH_OSD_OP_BALANCEREADS = CEPH_OSD_OP_MODE_SUB | 3,
CEPH_OSD_OP_UNBALANCEREADS = CEPH_OSD_OP_MODE_SUB | 4, CEPH_OSD_OP_UNBALANCEREADS = CEPH_OSD_OP_MODE_SUB | 4,
CEPH_OSD_OP_SCRUB = CEPH_OSD_OP_MODE_SUB | 5, CEPH_OSD_OP_SCRUB = CEPH_OSD_OP_MODE_SUB | 5,
CEPH_OSD_OP_SCRUB_RESERVE = CEPH_OSD_OP_MODE_SUB | 6,
CEPH_OSD_OP_SCRUB_UNRESERVE = CEPH_OSD_OP_MODE_SUB | 7,
CEPH_OSD_OP_SCRUB_STOP = CEPH_OSD_OP_MODE_SUB | 8,
/** lock **/ /** lock **/
CEPH_OSD_OP_WRLOCK = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 1, CEPH_OSD_OP_WRLOCK = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 1,
...@@ -328,6 +341,8 @@ enum { ...@@ -328,6 +341,8 @@ enum {
CEPH_OSD_CMPXATTR_MODE_U64 = 2 CEPH_OSD_CMPXATTR_MODE_U64 = 2
}; };
#define RADOS_NOTIFY_VER 1
/* /*
* an individual object operation. each may be accompanied by some data * an individual object operation. each may be accompanied by some data
* payload * payload
...@@ -359,7 +374,12 @@ struct ceph_osd_op { ...@@ -359,7 +374,12 @@ struct ceph_osd_op {
struct { struct {
__le64 snapid; __le64 snapid;
} __attribute__ ((packed)) snap; } __attribute__ ((packed)) snap;
}; struct {
__le64 cookie;
__le64 ver;
__u8 flag; /* 0 = unwatch, 1 = watch */
} __attribute__ ((packed)) watch;
};
__le32 payload_len; __le32 payload_len;
} __attribute__ ((packed)); } __attribute__ ((packed));
...@@ -402,4 +422,5 @@ struct ceph_osd_reply_head { ...@@ -402,4 +422,5 @@ struct ceph_osd_reply_head {
} __attribute__ ((packed)); } __attribute__ ((packed));
#endif #endif
...@@ -78,8 +78,10 @@ int ceph_unarmor(char *dst, const char *src, const char *end) ...@@ -78,8 +78,10 @@ int ceph_unarmor(char *dst, const char *src, const char *end)
while (src < end) { while (src < end) {
int a, b, c, d; int a, b, c, d;
if (src < end && src[0] == '\n') if (src[0] == '\n') {
src++; src++;
continue;
}
if (src + 4 > end) if (src + 4 > end)
return -EINVAL; return -EINVAL;
a = decode_bits(src[0]); a = decode_bits(src[0]);
......
...@@ -62,6 +62,7 @@ const char *ceph_msg_type_name(int type) ...@@ -62,6 +62,7 @@ const char *ceph_msg_type_name(int type)
case CEPH_MSG_OSD_MAP: return "osd_map"; case CEPH_MSG_OSD_MAP: return "osd_map";
case CEPH_MSG_OSD_OP: return "osd_op"; case CEPH_MSG_OSD_OP: return "osd_op";
case CEPH_MSG_OSD_OPREPLY: return "osd_opreply"; case CEPH_MSG_OSD_OPREPLY: return "osd_opreply";
case CEPH_MSG_WATCH_NOTIFY: return "watch_notify";
default: return "unknown"; default: return "unknown";
} }
} }
......
...@@ -22,10 +22,15 @@ ...@@ -22,10 +22,15 @@
#define OSD_OPREPLY_FRONT_LEN 512 #define OSD_OPREPLY_FRONT_LEN 512
static const struct ceph_connection_operations osd_con_ops; static const struct ceph_connection_operations osd_con_ops;
static int __kick_requests(struct ceph_osd_client *osdc,
struct ceph_osd *kickosd);
static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd); static void send_queued(struct ceph_osd_client *osdc);
static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
static void __register_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req);
static void __unregister_linger_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req);
static int __send_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req);
static int op_needs_trail(int op) static int op_needs_trail(int op)
{ {
...@@ -34,6 +39,7 @@ static int op_needs_trail(int op) ...@@ -34,6 +39,7 @@ static int op_needs_trail(int op)
case CEPH_OSD_OP_SETXATTR: case CEPH_OSD_OP_SETXATTR:
case CEPH_OSD_OP_CMPXATTR: case CEPH_OSD_OP_CMPXATTR:
case CEPH_OSD_OP_CALL: case CEPH_OSD_OP_CALL:
case CEPH_OSD_OP_NOTIFY:
return 1; return 1;
default: default:
return 0; return 0;
...@@ -209,6 +215,8 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, ...@@ -209,6 +215,8 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
init_completion(&req->r_completion); init_completion(&req->r_completion);
init_completion(&req->r_safe_completion); init_completion(&req->r_safe_completion);
INIT_LIST_HEAD(&req->r_unsafe_item); INIT_LIST_HEAD(&req->r_unsafe_item);
INIT_LIST_HEAD(&req->r_linger_item);
INIT_LIST_HEAD(&req->r_linger_osd);
req->r_flags = flags; req->r_flags = flags;
WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
...@@ -315,6 +323,24 @@ static void osd_req_encode_op(struct ceph_osd_request *req, ...@@ -315,6 +323,24 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
break; break;
case CEPH_OSD_OP_STARTSYNC: case CEPH_OSD_OP_STARTSYNC:
break; break;
case CEPH_OSD_OP_NOTIFY:
{
__le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
__le32 timeout = cpu_to_le32(src->watch.timeout);
BUG_ON(!req->r_trail);
ceph_pagelist_append(req->r_trail,
&prot_ver, sizeof(prot_ver));
ceph_pagelist_append(req->r_trail,
&timeout, sizeof(timeout));
}
case CEPH_OSD_OP_NOTIFY_ACK:
case CEPH_OSD_OP_WATCH:
dst->watch.cookie = cpu_to_le64(src->watch.cookie);
dst->watch.ver = cpu_to_le64(src->watch.ver);
dst->watch.flag = src->watch.flag;
break;
default: default:
pr_err("unrecognized osd opcode %d\n", dst->op); pr_err("unrecognized osd opcode %d\n", dst->op);
WARN_ON(1); WARN_ON(1);
...@@ -529,6 +555,45 @@ __lookup_request_ge(struct ceph_osd_client *osdc, ...@@ -529,6 +555,45 @@ __lookup_request_ge(struct ceph_osd_client *osdc,
return NULL; return NULL;
} }
/*
* Resubmit requests pending on the given osd.
*/
static void __kick_osd_requests(struct ceph_osd_client *osdc,
struct ceph_osd *osd)
{
struct ceph_osd_request *req, *nreq;
int err;
dout("__kick_osd_requests osd%d\n", osd->o_osd);
err = __reset_osd(osdc, osd);
if (err == -EAGAIN)
return;
list_for_each_entry(req, &osd->o_requests, r_osd_item) {
list_move(&req->r_req_lru_item, &osdc->req_unsent);
dout("requeued %p tid %llu osd%d\n", req, req->r_tid,
osd->o_osd);
if (!req->r_linger)
req->r_flags |= CEPH_OSD_FLAG_RETRY;
}
list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
r_linger_osd) {
__unregister_linger_request(osdc, req);
__register_request(osdc, req);
list_move(&req->r_req_lru_item, &osdc->req_unsent);
dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid,
osd->o_osd);
}
}
static void kick_osd_requests(struct ceph_osd_client *osdc,
struct ceph_osd *kickosd)
{
mutex_lock(&osdc->request_mutex);
__kick_osd_requests(osdc, kickosd);
mutex_unlock(&osdc->request_mutex);
}
/* /*
* If the osd connection drops, we need to resubmit all requests. * If the osd connection drops, we need to resubmit all requests.
...@@ -543,7 +608,8 @@ static void osd_reset(struct ceph_connection *con) ...@@ -543,7 +608,8 @@ static void osd_reset(struct ceph_connection *con)
dout("osd_reset osd%d\n", osd->o_osd); dout("osd_reset osd%d\n", osd->o_osd);
osdc = osd->o_osdc; osdc = osd->o_osdc;
down_read(&osdc->map_sem); down_read(&osdc->map_sem);
kick_requests(osdc, osd); kick_osd_requests(osdc, osd);
send_queued(osdc);
up_read(&osdc->map_sem); up_read(&osdc->map_sem);
} }
...@@ -561,6 +627,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc) ...@@ -561,6 +627,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
atomic_set(&osd->o_ref, 1); atomic_set(&osd->o_ref, 1);
osd->o_osdc = osdc; osd->o_osdc = osdc;
INIT_LIST_HEAD(&osd->o_requests); INIT_LIST_HEAD(&osd->o_requests);
INIT_LIST_HEAD(&osd->o_linger_requests);
INIT_LIST_HEAD(&osd->o_osd_lru); INIT_LIST_HEAD(&osd->o_osd_lru);
osd->o_incarnation = 1; osd->o_incarnation = 1;
...@@ -650,7 +717,8 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) ...@@ -650,7 +717,8 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
int ret = 0; int ret = 0;
dout("__reset_osd %p osd%d\n", osd, osd->o_osd); dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
if (list_empty(&osd->o_requests)) { if (list_empty(&osd->o_requests) &&
list_empty(&osd->o_linger_requests)) {
__remove_osd(osdc, osd); __remove_osd(osdc, osd);
} else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd], } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
&osd->o_con.peer_addr, &osd->o_con.peer_addr,
...@@ -723,10 +791,9 @@ static void __cancel_osd_timeout(struct ceph_osd_client *osdc) ...@@ -723,10 +791,9 @@ static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
* Register request, assign tid. If this is the first request, set up * Register request, assign tid. If this is the first request, set up
* the timeout event. * the timeout event.
*/ */
static void register_request(struct ceph_osd_client *osdc, static void __register_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req) struct ceph_osd_request *req)
{ {
mutex_lock(&osdc->request_mutex);
req->r_tid = ++osdc->last_tid; req->r_tid = ++osdc->last_tid;
req->r_request->hdr.tid = cpu_to_le64(req->r_tid); req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
INIT_LIST_HEAD(&req->r_req_lru_item); INIT_LIST_HEAD(&req->r_req_lru_item);
...@@ -740,6 +807,13 @@ static void register_request(struct ceph_osd_client *osdc, ...@@ -740,6 +807,13 @@ static void register_request(struct ceph_osd_client *osdc,
dout(" first request, scheduling timeout\n"); dout(" first request, scheduling timeout\n");
__schedule_osd_timeout(osdc); __schedule_osd_timeout(osdc);
} }
}
static void register_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req)
{
mutex_lock(&osdc->request_mutex);
__register_request(osdc, req);
mutex_unlock(&osdc->request_mutex); mutex_unlock(&osdc->request_mutex);
} }
...@@ -758,8 +832,13 @@ static void __unregister_request(struct ceph_osd_client *osdc, ...@@ -758,8 +832,13 @@ static void __unregister_request(struct ceph_osd_client *osdc,
ceph_con_revoke(&req->r_osd->o_con, req->r_request); ceph_con_revoke(&req->r_osd->o_con, req->r_request);
list_del_init(&req->r_osd_item); list_del_init(&req->r_osd_item);
if (list_empty(&req->r_osd->o_requests)) if (list_empty(&req->r_osd->o_requests) &&
list_empty(&req->r_osd->o_linger_requests)) {
dout("moving osd to %p lru\n", req->r_osd);
__move_osd_to_lru(osdc, req->r_osd); __move_osd_to_lru(osdc, req->r_osd);
}
if (list_empty(&req->r_osd_item) &&
list_empty(&req->r_linger_item))
req->r_osd = NULL; req->r_osd = NULL;
} }
...@@ -781,19 +860,71 @@ static void __cancel_request(struct ceph_osd_request *req) ...@@ -781,19 +860,71 @@ static void __cancel_request(struct ceph_osd_request *req)
ceph_con_revoke(&req->r_osd->o_con, req->r_request); ceph_con_revoke(&req->r_osd->o_con, req->r_request);
req->r_sent = 0; req->r_sent = 0;
} }
list_del_init(&req->r_req_lru_item);
} }
static void __register_linger_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req)
{
dout("__register_linger_request %p\n", req);
list_add_tail(&req->r_linger_item, &osdc->req_linger);
list_add_tail(&req->r_linger_osd, &req->r_osd->o_linger_requests);
}
static void __unregister_linger_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req)
{
dout("__unregister_linger_request %p\n", req);
if (req->r_osd) {
list_del_init(&req->r_linger_item);
list_del_init(&req->r_linger_osd);
if (list_empty(&req->r_osd->o_requests) &&
list_empty(&req->r_osd->o_linger_requests)) {
dout("moving osd to %p lru\n", req->r_osd);
__move_osd_to_lru(osdc, req->r_osd);
}
req->r_osd = NULL;
}
}
void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req)
{
mutex_lock(&osdc->request_mutex);
if (req->r_linger) {
__unregister_linger_request(osdc, req);
ceph_osdc_put_request(req);
}
mutex_unlock(&osdc->request_mutex);
}
EXPORT_SYMBOL(ceph_osdc_unregister_linger_request);
void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
struct ceph_osd_request *req)
{
if (!req->r_linger) {
dout("set_request_linger %p\n", req);
req->r_linger = 1;
/*
* caller is now responsible for calling
* unregister_linger_request
*/
ceph_osdc_get_request(req);
}
}
EXPORT_SYMBOL(ceph_osdc_set_request_linger);
/* /*
* Pick an osd (the first 'up' osd in the pg), allocate the osd struct * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
* (as needed), and set the request r_osd appropriately. If there is * (as needed), and set the request r_osd appropriately. If there is
* no up osd, set r_osd to NULL. * no up osd, set r_osd to NULL. Move the request to the appropiate list
* (unsent, homeless) or leave on in-flight lru.
* *
* Return 0 if unchanged, 1 if changed, or negative on error. * Return 0 if unchanged, 1 if changed, or negative on error.
* *
* Caller should hold map_sem for read and request_mutex. * Caller should hold map_sem for read and request_mutex.
*/ */
static int __map_osds(struct ceph_osd_client *osdc, static int __map_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req) struct ceph_osd_request *req)
{ {
struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
...@@ -802,11 +933,13 @@ static int __map_osds(struct ceph_osd_client *osdc, ...@@ -802,11 +933,13 @@ static int __map_osds(struct ceph_osd_client *osdc,
int o = -1, num = 0; int o = -1, num = 0;
int err; int err;
dout("map_osds %p tid %lld\n", req, req->r_tid); dout("map_request %p tid %lld\n", req, req->r_tid);
err = ceph_calc_object_layout(&reqhead->layout, req->r_oid, err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
&req->r_file_layout, osdc->osdmap); &req->r_file_layout, osdc->osdmap);
if (err) if (err) {
list_move(&req->r_req_lru_item, &osdc->req_notarget);
return err; return err;
}
pgid = reqhead->layout.ol_pgid; pgid = reqhead->layout.ol_pgid;
req->r_pgid = pgid; req->r_pgid = pgid;
...@@ -823,7 +956,7 @@ static int __map_osds(struct ceph_osd_client *osdc, ...@@ -823,7 +956,7 @@ static int __map_osds(struct ceph_osd_client *osdc,
(req->r_osd == NULL && o == -1)) (req->r_osd == NULL && o == -1))
return 0; /* no change */ return 0; /* no change */
dout("map_osds tid %llu pgid %d.%x osd%d (was osd%d)\n", dout("map_request tid %llu pgid %d.%x osd%d (was osd%d)\n",
req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o, req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
req->r_osd ? req->r_osd->o_osd : -1); req->r_osd ? req->r_osd->o_osd : -1);
...@@ -841,10 +974,12 @@ static int __map_osds(struct ceph_osd_client *osdc, ...@@ -841,10 +974,12 @@ static int __map_osds(struct ceph_osd_client *osdc,
if (!req->r_osd && o >= 0) { if (!req->r_osd && o >= 0) {
err = -ENOMEM; err = -ENOMEM;
req->r_osd = create_osd(osdc); req->r_osd = create_osd(osdc);
if (!req->r_osd) if (!req->r_osd) {
list_move(&req->r_req_lru_item, &osdc->req_notarget);
goto out; goto out;
}
dout("map_osds osd %p is osd%d\n", req->r_osd, o); dout("map_request osd %p is osd%d\n", req->r_osd, o);
req->r_osd->o_osd = o; req->r_osd->o_osd = o;
req->r_osd->o_con.peer_name.num = cpu_to_le64(o); req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
__insert_osd(osdc, req->r_osd); __insert_osd(osdc, req->r_osd);
...@@ -855,6 +990,9 @@ static int __map_osds(struct ceph_osd_client *osdc, ...@@ -855,6 +990,9 @@ static int __map_osds(struct ceph_osd_client *osdc,
if (req->r_osd) { if (req->r_osd) {
__remove_osd_from_lru(req->r_osd); __remove_osd_from_lru(req->r_osd);
list_add(&req->r_osd_item, &req->r_osd->o_requests); list_add(&req->r_osd_item, &req->r_osd->o_requests);
list_move(&req->r_req_lru_item, &osdc->req_unsent);
} else {
list_move(&req->r_req_lru_item, &osdc->req_notarget);
} }
err = 1; /* osd or pg changed */ err = 1; /* osd or pg changed */
...@@ -869,16 +1007,6 @@ static int __send_request(struct ceph_osd_client *osdc, ...@@ -869,16 +1007,6 @@ static int __send_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req) struct ceph_osd_request *req)
{ {
struct ceph_osd_request_head *reqhead; struct ceph_osd_request_head *reqhead;
int err;
err = __map_osds(osdc, req);
if (err < 0)
return err;
if (req->r_osd == NULL) {
dout("send_request %p no up osds in pg\n", req);
ceph_monc_request_next_osdmap(&osdc->client->monc);
return 0;
}
dout("send_request %p tid %llu to osd%d flags %d\n", dout("send_request %p tid %llu to osd%d flags %d\n",
req, req->r_tid, req->r_osd->o_osd, req->r_flags); req, req->r_tid, req->r_osd->o_osd, req->r_flags);
...@@ -897,6 +1025,21 @@ static int __send_request(struct ceph_osd_client *osdc, ...@@ -897,6 +1025,21 @@ static int __send_request(struct ceph_osd_client *osdc,
return 0; return 0;
} }
/*
* Send any requests in the queue (req_unsent).
*/
static void send_queued(struct ceph_osd_client *osdc)
{
struct ceph_osd_request *req, *tmp;
dout("send_queued\n");
mutex_lock(&osdc->request_mutex);
list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
__send_request(osdc, req);
}
mutex_unlock(&osdc->request_mutex);
}
/* /*
* Timeout callback, called every N seconds when 1 or more osd * Timeout callback, called every N seconds when 1 or more osd
* requests has been active for more than N seconds. When this * requests has been active for more than N seconds. When this
...@@ -916,30 +1059,13 @@ static void handle_timeout(struct work_struct *work) ...@@ -916,30 +1059,13 @@ static void handle_timeout(struct work_struct *work)
unsigned long keepalive = unsigned long keepalive =
osdc->client->options->osd_keepalive_timeout * HZ; osdc->client->options->osd_keepalive_timeout * HZ;
unsigned long last_stamp = 0; unsigned long last_stamp = 0;
struct rb_node *p;
struct list_head slow_osds; struct list_head slow_osds;
dout("timeout\n"); dout("timeout\n");
down_read(&osdc->map_sem); down_read(&osdc->map_sem);
ceph_monc_request_next_osdmap(&osdc->client->monc); ceph_monc_request_next_osdmap(&osdc->client->monc);
mutex_lock(&osdc->request_mutex); mutex_lock(&osdc->request_mutex);
for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
req = rb_entry(p, struct ceph_osd_request, r_node);
if (req->r_resend) {
int err;
dout("osdc resending prev failed %lld\n", req->r_tid);
err = __send_request(osdc, req);
if (err)
dout("osdc failed again on %lld\n", req->r_tid);
else
req->r_resend = false;
continue;
}
}
/* /*
* reset osds that appear to be _really_ unresponsive. this * reset osds that appear to be _really_ unresponsive. this
...@@ -963,7 +1089,7 @@ static void handle_timeout(struct work_struct *work) ...@@ -963,7 +1089,7 @@ static void handle_timeout(struct work_struct *work)
BUG_ON(!osd); BUG_ON(!osd);
pr_warning(" tid %llu timed out on osd%d, will reset osd\n", pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
req->r_tid, osd->o_osd); req->r_tid, osd->o_osd);
__kick_requests(osdc, osd); __kick_osd_requests(osdc, osd);
} }
/* /*
...@@ -991,7 +1117,7 @@ static void handle_timeout(struct work_struct *work) ...@@ -991,7 +1117,7 @@ static void handle_timeout(struct work_struct *work)
__schedule_osd_timeout(osdc); __schedule_osd_timeout(osdc);
mutex_unlock(&osdc->request_mutex); mutex_unlock(&osdc->request_mutex);
send_queued(osdc);
up_read(&osdc->map_sem); up_read(&osdc->map_sem);
} }
...@@ -1035,7 +1161,6 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, ...@@ -1035,7 +1161,6 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
numops * sizeof(struct ceph_osd_op)) numops * sizeof(struct ceph_osd_op))
goto bad; goto bad;
dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result); dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
/* lookup */ /* lookup */
mutex_lock(&osdc->request_mutex); mutex_lock(&osdc->request_mutex);
req = __lookup_request(osdc, tid); req = __lookup_request(osdc, tid);
...@@ -1079,6 +1204,9 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, ...@@ -1079,6 +1204,9 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
dout("handle_reply tid %llu flags %d\n", tid, flags); dout("handle_reply tid %llu flags %d\n", tid, flags);
if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK))
__register_linger_request(osdc, req);
/* either this is a read, or we got the safe response */ /* either this is a read, or we got the safe response */
if (result < 0 || if (result < 0 ||
(flags & CEPH_OSD_FLAG_ONDISK) || (flags & CEPH_OSD_FLAG_ONDISK) ||
...@@ -1099,6 +1227,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, ...@@ -1099,6 +1227,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
} }
done: done:
dout("req=%p req->r_linger=%d\n", req, req->r_linger);
ceph_osdc_put_request(req); ceph_osdc_put_request(req);
return; return;
...@@ -1109,24 +1238,12 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, ...@@ -1109,24 +1238,12 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
ceph_msg_dump(msg); ceph_msg_dump(msg);
} }
static void reset_changed_osds(struct ceph_osd_client *osdc)
static int __kick_requests(struct ceph_osd_client *osdc,
struct ceph_osd *kickosd)
{ {
struct ceph_osd_request *req;
struct rb_node *p, *n; struct rb_node *p, *n;
int needmap = 0;
int err;
dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
if (kickosd) {
err = __reset_osd(osdc, kickosd);
if (err == -EAGAIN)
return 1;
} else {
for (p = rb_first(&osdc->osds); p; p = n) { for (p = rb_first(&osdc->osds); p; p = n) {
struct ceph_osd *osd = struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
rb_entry(p, struct ceph_osd, o_node);
n = rb_next(p); n = rb_next(p);
if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) || if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
...@@ -1136,81 +1253,68 @@ static int __kick_requests(struct ceph_osd_client *osdc, ...@@ -1136,81 +1253,68 @@ static int __kick_requests(struct ceph_osd_client *osdc,
sizeof(struct ceph_entity_addr)) != 0) sizeof(struct ceph_entity_addr)) != 0)
__reset_osd(osdc, osd); __reset_osd(osdc, osd);
} }
} }
/*
* Requeue requests whose mapping to an OSD has changed. If requests map to
* no osd, request a new map.
*
* Caller should hold map_sem for read and request_mutex.
*/
static void kick_requests(struct ceph_osd_client *osdc)
{
struct ceph_osd_request *req, *nreq;
struct rb_node *p;
int needmap = 0;
int err;
dout("kick_requests\n");
mutex_lock(&osdc->request_mutex);
for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
req = rb_entry(p, struct ceph_osd_request, r_node); req = rb_entry(p, struct ceph_osd_request, r_node);
err = __map_request(osdc, req);
if (req->r_resend) { if (err < 0)
dout(" r_resend set on tid %llu\n", req->r_tid); continue; /* error */
__cancel_request(req); if (req->r_osd == NULL) {
goto kick; dout("%p tid %llu maps to no osd\n", req, req->r_tid);
needmap++; /* request a newer map */
} else if (err > 0) {
dout("%p tid %llu requeued on osd%d\n", req, req->r_tid,
req->r_osd ? req->r_osd->o_osd : -1);
if (!req->r_linger)
req->r_flags |= CEPH_OSD_FLAG_RETRY;
} }
if (req->r_osd && kickosd == req->r_osd) {
__cancel_request(req);
goto kick;
} }
err = __map_osds(osdc, req); list_for_each_entry_safe(req, nreq, &osdc->req_linger,
r_linger_item) {
dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
err = __map_request(osdc, req);
if (err == 0) if (err == 0)
continue; /* no change */ continue; /* no change and no osd was specified */
if (err < 0) { if (err < 0)
/* continue; /* hrm! */
* FIXME: really, we should set the request
* error and fail if this isn't a 'nofail'
* request, but that's a fair bit more
* complicated to do. So retry!
*/
dout(" setting r_resend on %llu\n", req->r_tid);
req->r_resend = true;
continue;
}
if (req->r_osd == NULL) { if (req->r_osd == NULL) {
dout("tid %llu maps to no valid osd\n", req->r_tid); dout("tid %llu maps to no valid osd\n", req->r_tid);
needmap++; /* request a newer map */ needmap++; /* request a newer map */
continue; continue;
} }
kick: dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid,
dout("kicking %p tid %llu osd%d\n", req, req->r_tid,
req->r_osd ? req->r_osd->o_osd : -1); req->r_osd ? req->r_osd->o_osd : -1);
req->r_flags |= CEPH_OSD_FLAG_RETRY; __unregister_linger_request(osdc, req);
err = __send_request(osdc, req); __register_request(osdc, req);
if (err) {
dout(" setting r_resend on %llu\n", req->r_tid);
req->r_resend = true;
} }
}
return needmap;
}
/*
* Resubmit osd requests whose osd or osd address has changed. Request
* a new osd map if osds are down, or we are otherwise unable to determine
* how to direct a request.
*
* Close connections to down osds.
*
* If @who is specified, resubmit requests for that specific osd.
*
* Caller should hold map_sem for read and request_mutex.
*/
static void kick_requests(struct ceph_osd_client *osdc,
struct ceph_osd *kickosd)
{
int needmap;
mutex_lock(&osdc->request_mutex);
needmap = __kick_requests(osdc, kickosd);
mutex_unlock(&osdc->request_mutex); mutex_unlock(&osdc->request_mutex);
if (needmap) { if (needmap) {
dout("%d requests for down osds, need new map\n", needmap); dout("%d requests for down osds, need new map\n", needmap);
ceph_monc_request_next_osdmap(&osdc->client->monc); ceph_monc_request_next_osdmap(&osdc->client->monc);
} }
} }
/* /*
* Process updated osd map. * Process updated osd map.
* *
...@@ -1263,6 +1367,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) ...@@ -1263,6 +1367,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
ceph_osdmap_destroy(osdc->osdmap); ceph_osdmap_destroy(osdc->osdmap);
osdc->osdmap = newmap; osdc->osdmap = newmap;
} }
kick_requests(osdc);
reset_changed_osds(osdc);
} else { } else {
dout("ignoring incremental map %u len %d\n", dout("ignoring incremental map %u len %d\n",
epoch, maplen); epoch, maplen);
...@@ -1300,6 +1406,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) ...@@ -1300,6 +1406,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
osdc->osdmap = newmap; osdc->osdmap = newmap;
if (oldmap) if (oldmap)
ceph_osdmap_destroy(oldmap); ceph_osdmap_destroy(oldmap);
kick_requests(osdc);
} }
p += maplen; p += maplen;
nr_maps--; nr_maps--;
...@@ -1308,8 +1415,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) ...@@ -1308,8 +1415,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
done: done:
downgrade_write(&osdc->map_sem); downgrade_write(&osdc->map_sem);
ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch); ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
if (newmap) send_queued(osdc);
kick_requests(osdc, NULL);
up_read(&osdc->map_sem); up_read(&osdc->map_sem);
wake_up_all(&osdc->client->auth_wq); wake_up_all(&osdc->client->auth_wq);
return; return;
...@@ -1321,6 +1427,223 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) ...@@ -1321,6 +1427,223 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
return; return;
} }
/*
* watch/notify callback event infrastructure
*
* These callbacks are used both for watch and notify operations.
*/
static void __release_event(struct kref *kref)
{
struct ceph_osd_event *event =
container_of(kref, struct ceph_osd_event, kref);
dout("__release_event %p\n", event);
kfree(event);
}
static void get_event(struct ceph_osd_event *event)
{
kref_get(&event->kref);
}
void ceph_osdc_put_event(struct ceph_osd_event *event)
{
kref_put(&event->kref, __release_event);
}
EXPORT_SYMBOL(ceph_osdc_put_event);
static void __insert_event(struct ceph_osd_client *osdc,
struct ceph_osd_event *new)
{
struct rb_node **p = &osdc->event_tree.rb_node;
struct rb_node *parent = NULL;
struct ceph_osd_event *event = NULL;
while (*p) {
parent = *p;
event = rb_entry(parent, struct ceph_osd_event, node);
if (new->cookie < event->cookie)
p = &(*p)->rb_left;
else if (new->cookie > event->cookie)
p = &(*p)->rb_right;
else
BUG();
}
rb_link_node(&new->node, parent, p);
rb_insert_color(&new->node, &osdc->event_tree);
}
static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
u64 cookie)
{
struct rb_node **p = &osdc->event_tree.rb_node;
struct rb_node *parent = NULL;
struct ceph_osd_event *event = NULL;
while (*p) {
parent = *p;
event = rb_entry(parent, struct ceph_osd_event, node);
if (cookie < event->cookie)
p = &(*p)->rb_left;
else if (cookie > event->cookie)
p = &(*p)->rb_right;
else
return event;
}
return NULL;
}
static void __remove_event(struct ceph_osd_event *event)
{
struct ceph_osd_client *osdc = event->osdc;
if (!RB_EMPTY_NODE(&event->node)) {
dout("__remove_event removed %p\n", event);
rb_erase(&event->node, &osdc->event_tree);
ceph_osdc_put_event(event);
} else {
dout("__remove_event didn't remove %p\n", event);
}
}
int ceph_osdc_create_event(struct ceph_osd_client *osdc,
void (*event_cb)(u64, u64, u8, void *),
int one_shot, void *data,
struct ceph_osd_event **pevent)
{
struct ceph_osd_event *event;
event = kmalloc(sizeof(*event), GFP_NOIO);
if (!event)
return -ENOMEM;
dout("create_event %p\n", event);
event->cb = event_cb;
event->one_shot = one_shot;
event->data = data;
event->osdc = osdc;
INIT_LIST_HEAD(&event->osd_node);
kref_init(&event->kref); /* one ref for us */
kref_get(&event->kref); /* one ref for the caller */
init_completion(&event->completion);
spin_lock(&osdc->event_lock);
event->cookie = ++osdc->event_count;
__insert_event(osdc, event);
spin_unlock(&osdc->event_lock);
*pevent = event;
return 0;
}
EXPORT_SYMBOL(ceph_osdc_create_event);
void ceph_osdc_cancel_event(struct ceph_osd_event *event)
{
struct ceph_osd_client *osdc = event->osdc;
dout("cancel_event %p\n", event);
spin_lock(&osdc->event_lock);
__remove_event(event);
spin_unlock(&osdc->event_lock);
ceph_osdc_put_event(event); /* caller's */
}
EXPORT_SYMBOL(ceph_osdc_cancel_event);
static void do_event_work(struct work_struct *work)
{
struct ceph_osd_event_work *event_work =
container_of(work, struct ceph_osd_event_work, work);
struct ceph_osd_event *event = event_work->event;
u64 ver = event_work->ver;
u64 notify_id = event_work->notify_id;
u8 opcode = event_work->opcode;
dout("do_event_work completing %p\n", event);
event->cb(ver, notify_id, opcode, event->data);
complete(&event->completion);
dout("do_event_work completed %p\n", event);
ceph_osdc_put_event(event);
kfree(event_work);
}
/*
* Process osd watch notifications
*/
void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
{
void *p, *end;
u8 proto_ver;
u64 cookie, ver, notify_id;
u8 opcode;
struct ceph_osd_event *event;
struct ceph_osd_event_work *event_work;
p = msg->front.iov_base;
end = p + msg->front.iov_len;
ceph_decode_8_safe(&p, end, proto_ver, bad);
ceph_decode_8_safe(&p, end, opcode, bad);
ceph_decode_64_safe(&p, end, cookie, bad);
ceph_decode_64_safe(&p, end, ver, bad);
ceph_decode_64_safe(&p, end, notify_id, bad);
spin_lock(&osdc->event_lock);
event = __find_event(osdc, cookie);
if (event) {
get_event(event);
if (event->one_shot)
__remove_event(event);
}
spin_unlock(&osdc->event_lock);
dout("handle_watch_notify cookie %lld ver %lld event %p\n",
cookie, ver, event);
if (event) {
event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
INIT_WORK(&event_work->work, do_event_work);
if (!event_work) {
dout("ERROR: could not allocate event_work\n");
goto done_err;
}
event_work->event = event;
event_work->ver = ver;
event_work->notify_id = notify_id;
event_work->opcode = opcode;
if (!queue_work(osdc->notify_wq, &event_work->work)) {
dout("WARNING: failed to queue notify event work\n");
goto done_err;
}
}
return;
done_err:
complete(&event->completion);
ceph_osdc_put_event(event);
return;
bad:
pr_err("osdc handle_watch_notify corrupt msg\n");
return;
}
int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout)
{
int err;
dout("wait_event %p\n", event);
err = wait_for_completion_interruptible_timeout(&event->completion,
timeout * HZ);
ceph_osdc_put_event(event);
if (err > 0)
err = 0;
dout("wait_event %p returns %d\n", event, err);
return err;
}
EXPORT_SYMBOL(ceph_osdc_wait_event);
/* /*
* Register request, send initial attempt. * Register request, send initial attempt.
*/ */
...@@ -1347,18 +1670,25 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc, ...@@ -1347,18 +1670,25 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
* the request still han't been touched yet. * the request still han't been touched yet.
*/ */
if (req->r_sent == 0) { if (req->r_sent == 0) {
rc = __map_request(osdc, req);
if (rc < 0)
return rc;
if (req->r_osd == NULL) {
dout("send_request %p no up osds in pg\n", req);
ceph_monc_request_next_osdmap(&osdc->client->monc);
} else {
rc = __send_request(osdc, req); rc = __send_request(osdc, req);
if (rc) { if (rc) {
if (nofail) { if (nofail) {
dout("osdc_start_request failed send, " dout("osdc_start_request failed send, "
" marking %lld\n", req->r_tid); " will retry %lld\n", req->r_tid);
req->r_resend = true;
rc = 0; rc = 0;
} else { } else {
__unregister_request(osdc, req); __unregister_request(osdc, req);
} }
} }
} }
}
mutex_unlock(&osdc->request_mutex); mutex_unlock(&osdc->request_mutex);
up_read(&osdc->map_sem); up_read(&osdc->map_sem);
return rc; return rc;
...@@ -1441,9 +1771,15 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) ...@@ -1441,9 +1771,15 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
INIT_LIST_HEAD(&osdc->osd_lru); INIT_LIST_HEAD(&osdc->osd_lru);
osdc->requests = RB_ROOT; osdc->requests = RB_ROOT;
INIT_LIST_HEAD(&osdc->req_lru); INIT_LIST_HEAD(&osdc->req_lru);
INIT_LIST_HEAD(&osdc->req_unsent);
INIT_LIST_HEAD(&osdc->req_notarget);
INIT_LIST_HEAD(&osdc->req_linger);
osdc->num_requests = 0; osdc->num_requests = 0;
INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
spin_lock_init(&osdc->event_lock);
osdc->event_tree = RB_ROOT;
osdc->event_count = 0;
schedule_delayed_work(&osdc->osds_timeout_work, schedule_delayed_work(&osdc->osds_timeout_work,
round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ)); round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
...@@ -1463,6 +1799,13 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) ...@@ -1463,6 +1799,13 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
"osd_op_reply"); "osd_op_reply");
if (err < 0) if (err < 0)
goto out_msgpool; goto out_msgpool;
osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
if (IS_ERR(osdc->notify_wq)) {
err = PTR_ERR(osdc->notify_wq);
osdc->notify_wq = NULL;
goto out_msgpool;
}
return 0; return 0;
out_msgpool: out_msgpool:
...@@ -1476,6 +1819,8 @@ EXPORT_SYMBOL(ceph_osdc_init); ...@@ -1476,6 +1819,8 @@ EXPORT_SYMBOL(ceph_osdc_init);
void ceph_osdc_stop(struct ceph_osd_client *osdc) void ceph_osdc_stop(struct ceph_osd_client *osdc)
{ {
flush_workqueue(osdc->notify_wq);
destroy_workqueue(osdc->notify_wq);
cancel_delayed_work_sync(&osdc->timeout_work); cancel_delayed_work_sync(&osdc->timeout_work);
cancel_delayed_work_sync(&osdc->osds_timeout_work); cancel_delayed_work_sync(&osdc->osds_timeout_work);
if (osdc->osdmap) { if (osdc->osdmap) {
...@@ -1483,6 +1828,7 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc) ...@@ -1483,6 +1828,7 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
osdc->osdmap = NULL; osdc->osdmap = NULL;
} }
remove_old_osds(osdc, 1); remove_old_osds(osdc, 1);
WARN_ON(!RB_EMPTY_ROOT(&osdc->osds));
mempool_destroy(osdc->req_mempool); mempool_destroy(osdc->req_mempool);
ceph_msgpool_destroy(&osdc->msgpool_op); ceph_msgpool_destroy(&osdc->msgpool_op);
ceph_msgpool_destroy(&osdc->msgpool_op_reply); ceph_msgpool_destroy(&osdc->msgpool_op_reply);
...@@ -1591,6 +1937,9 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) ...@@ -1591,6 +1937,9 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
case CEPH_MSG_OSD_OPREPLY: case CEPH_MSG_OSD_OPREPLY:
handle_reply(osdc, msg, con); handle_reply(osdc, msg, con);
break; break;
case CEPH_MSG_WATCH_NOTIFY:
handle_watch_notify(osdc, msg);
break;
default: default:
pr_err("received unknown message type %d %s\n", type, pr_err("received unknown message type %d %s\n", type,
...@@ -1684,6 +2033,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con, ...@@ -1684,6 +2033,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
switch (type) { switch (type) {
case CEPH_MSG_OSD_MAP: case CEPH_MSG_OSD_MAP:
case CEPH_MSG_WATCH_NOTIFY:
return ceph_msg_new(type, front, GFP_NOFS); return ceph_msg_new(type, front, GFP_NOFS);
case CEPH_MSG_OSD_OPREPLY: case CEPH_MSG_OSD_OPREPLY:
return get_reply(con, hdr, skip); return get_reply(con, hdr, skip);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment