Commit 1b83bef2 authored by Sage Weil's avatar Sage Weil

libceph: update osd request/reply encoding

Use the new version of the encoding for osd requests and replies.  In the
process, update the way we are tracking request ops and reply lengths and
results in the struct ceph_osd_request.  Update the rbd and fs/ceph users
appropriately.

The main changes are:
 - we keep pointers into the request memory for fields we need to update
   each time the request is sent out over the wire
 - we keep information about the result in an array in the request struct
   where the users can easily get at it.
Signed-off-by: default avatarSage Weil <sage@inktank.com>
Reviewed-by: default avatarAlex Elder <elder@inktank.com>
parent 2169aea6
......@@ -196,7 +196,7 @@ struct rbd_obj_request {
u64 xferred; /* bytes transferred */
u64 version;
s32 result;
int result;
atomic_t done;
rbd_obj_callback_t callback;
......@@ -1282,12 +1282,19 @@ static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
{
dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
obj_request->result, obj_request->xferred, obj_request->length);
if (obj_request->result == (s32) -ENOENT) {
/*
* ENOENT means a hole in the object. We zero-fill the
* entire length of the request. A short read also implies
* zero-fill to the end of the request. Either way we
* update the xferred count to indicate the whole request
* was satisfied.
*/
if (obj_request->result == -ENOENT) {
zero_bio_chain(obj_request->bio_list, 0);
obj_request->result = 0;
obj_request->xferred = obj_request->length;
} else if (obj_request->xferred < obj_request->length &&
!obj_request->result) {
zero_bio_chain(obj_request->bio_list, obj_request->xferred);
......@@ -1298,20 +1305,14 @@ static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
{
dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
obj_request->result, obj_request->xferred, obj_request->length);
/* A short write really shouldn't occur. Warn if we see one */
if (obj_request->xferred != obj_request->length) {
struct rbd_img_request *img_request = obj_request->img_request;
struct rbd_device *rbd_dev;
rbd_dev = img_request ? img_request->rbd_dev : NULL;
rbd_warn(rbd_dev, "wrote %llu want %llu\n",
obj_request->xferred, obj_request->length);
}
dout("%s: obj %p result %d %llu\n", __func__, obj_request,
obj_request->result, obj_request->length);
/*
* There is no such thing as a successful short write.
* Our xferred value is the number of bytes transferred
* back. Set it to our originally-requested length.
*/
obj_request->xferred = obj_request->length;
obj_request_done_set(obj_request);
}
......@@ -1329,9 +1330,6 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
struct ceph_msg *msg)
{
struct rbd_obj_request *obj_request = osd_req->r_priv;
struct ceph_osd_reply_head *reply_head;
struct ceph_osd_op *op;
u32 num_ops;
u16 opcode;
dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
......@@ -1339,22 +1337,19 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
rbd_assert(!!obj_request->img_request ^
(obj_request->which == BAD_WHICH));
reply_head = msg->front.iov_base;
obj_request->result = (s32) le32_to_cpu(reply_head->result);
if (osd_req->r_result < 0)
obj_request->result = osd_req->r_result;
obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
num_ops = le32_to_cpu(reply_head->num_ops);
WARN_ON(num_ops != 1); /* For now */
WARN_ON(osd_req->r_num_ops != 1); /* For now */
/*
* We support a 64-bit length, but ultimately it has to be
* passed to blk_end_request(), which takes an unsigned int.
*/
op = &reply_head->ops[0];
obj_request->xferred = le64_to_cpu(op->extent.length);
obj_request->xferred = osd_req->r_reply_op_len[0];
rbd_assert(obj_request->xferred < (u64) UINT_MAX);
opcode = le16_to_cpu(op->op);
opcode = osd_req->r_request_ops[0].op;
switch (opcode) {
case CEPH_OSD_OP_READ:
rbd_osd_read_callback(obj_request);
......@@ -1719,6 +1714,7 @@ static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
more = blk_end_request(img_request->rq, result, xferred);
which++;
}
rbd_assert(more ^ (which == img_request->obj_request_count));
img_request->next_completion = which;
out:
......
......@@ -236,16 +236,10 @@ static int ceph_readpage(struct file *filp, struct page *page)
static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
{
struct inode *inode = req->r_inode;
struct ceph_osd_reply_head *replyhead;
int rc, bytes;
int rc = req->r_result;
int bytes = le32_to_cpu(msg->hdr.data_len);
int i;
/* parse reply */
replyhead = msg->front.iov_base;
WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
rc = le32_to_cpu(replyhead->result);
bytes = le32_to_cpu(msg->hdr.data_len);
dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
/* unlock all pages, zeroing any data we didn't read */
......@@ -553,27 +547,18 @@ static void writepages_finish(struct ceph_osd_request *req,
struct ceph_msg *msg)
{
struct inode *inode = req->r_inode;
struct ceph_osd_reply_head *replyhead;
struct ceph_osd_op *op;
struct ceph_inode_info *ci = ceph_inode(inode);
unsigned wrote;
struct page *page;
int i;
struct ceph_snap_context *snapc = req->r_snapc;
struct address_space *mapping = inode->i_mapping;
__s32 rc = -EIO;
u64 bytes = 0;
int rc = req->r_result;
u64 bytes = le64_to_cpu(req->r_request_ops[0].extent.length);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
long writeback_stat;
unsigned issued = ceph_caps_issued(ci);
/* parse reply */
replyhead = msg->front.iov_base;
WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
op = (void *)(replyhead + 1);
rc = le32_to_cpu(replyhead->result);
bytes = le64_to_cpu(op->extent.length);
if (rc >= 0) {
/*
* Assume we wrote the pages we originally sent. The
......@@ -740,8 +725,6 @@ static int ceph_writepages_start(struct address_space *mapping,
struct page *page;
int want;
u64 offset, len;
struct ceph_osd_request_head *reqhead;
struct ceph_osd_op *op;
long writeback_stat;
next = 0;
......@@ -905,10 +888,8 @@ static int ceph_writepages_start(struct address_space *mapping,
/* revise final length, page count */
req->r_num_pages = locked_pages;
reqhead = req->r_request->front.iov_base;
op = (void *)(reqhead + 1);
op->extent.length = cpu_to_le64(len);
op->payload_len = cpu_to_le32(len);
req->r_request_ops[0].extent.length = cpu_to_le64(len);
req->r_request_ops[0].payload_len = cpu_to_le32(len);
req->r_request->hdr.data_len = cpu_to_le32(len);
rc = ceph_osdc_start_request(&fsc->client->osdc, req, true);
......
......@@ -47,6 +47,9 @@ struct ceph_osd {
struct list_head o_keepalive_item;
};
#define CEPH_OSD_MAX_OP 10
/* an in-flight request */
struct ceph_osd_request {
u64 r_tid; /* unique for this client */
......@@ -63,9 +66,23 @@ struct ceph_osd_request {
struct ceph_connection *r_con_filling_msg;
struct ceph_msg *r_request, *r_reply;
int r_result;
int r_flags; /* any additional flags for the osd */
u32 r_sent; /* >0 if r_request is sending/sent */
int r_num_ops;
/* encoded message content */
struct ceph_osd_op *r_request_ops;
/* these are updated on each send */
__le32 *r_request_osdmap_epoch;
__le32 *r_request_flags;
__le64 *r_request_pool;
void *r_request_pgid;
__le32 *r_request_attempts;
struct ceph_eversion *r_request_reassert_version;
int r_result;
int r_reply_op_len[CEPH_OSD_MAX_OP];
s32 r_reply_op_result[CEPH_OSD_MAX_OP];
int r_got_reply;
int r_linger;
......
......@@ -416,43 +416,5 @@ struct ceph_osd_op {
__le32 payload_len;
} __attribute__ ((packed));
/*
* osd request message header. each request may include multiple
* ceph_osd_op object operations.
*/
struct ceph_osd_request_head {
__le32 client_inc; /* client incarnation */
struct ceph_object_layout layout; /* pgid */
__le32 osdmap_epoch; /* client's osdmap epoch */
__le32 flags;
struct ceph_timespec mtime; /* for mutations only */
struct ceph_eversion reassert_version; /* if we are replaying op */
__le32 object_len; /* length of object name */
__le64 snapid; /* snapid to read */
__le64 snap_seq; /* writer's snap context */
__le32 num_snaps;
__le16 num_ops;
struct ceph_osd_op ops[]; /* followed by ops[], obj, ticket, snaps */
} __attribute__ ((packed));
struct ceph_osd_reply_head {
__le32 client_inc; /* client incarnation */
__le32 flags;
struct ceph_object_layout layout;
__le32 osdmap_epoch;
struct ceph_eversion reassert_version; /* for replaying uncommitted */
__le32 result; /* result code */
__le32 object_len; /* length of object name */
__le32 num_ops;
struct ceph_osd_op ops[0]; /* ops[], object */
} __attribute__ ((packed));
#endif
......@@ -123,10 +123,7 @@ static int osdc_show(struct seq_file *s, void *pp)
mutex_lock(&osdc->request_mutex);
for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
struct ceph_osd_request *req;
struct ceph_osd_request_head *head;
struct ceph_osd_op *op;
int num_ops;
int opcode, olen;
int opcode;
int i;
req = rb_entry(p, struct ceph_osd_request, r_node);
......@@ -135,13 +132,7 @@ static int osdc_show(struct seq_file *s, void *pp)
req->r_osd ? req->r_osd->o_osd : -1,
req->r_pgid.pool, req->r_pgid.seed);
head = req->r_request->front.iov_base;
op = (void *)(head + 1);
num_ops = le16_to_cpu(head->num_ops);
olen = le32_to_cpu(head->object_len);
seq_printf(s, "%.*s", olen,
(const char *)(head->ops + num_ops));
seq_printf(s, "%.*s", req->r_oid_len, req->r_oid);
if (req->r_reassert_version.epoch)
seq_printf(s, "\t%u'%llu",
......@@ -150,10 +141,9 @@ static int osdc_show(struct seq_file *s, void *pp)
else
seq_printf(s, "\t");
for (i = 0; i < num_ops; i++) {
opcode = le16_to_cpu(op->op);
for (i = 0; i < req->r_num_ops; i++) {
opcode = le16_to_cpu(req->r_request_ops[i].op);
seq_printf(s, "\t%s", ceph_osd_op_name(opcode));
op++;
}
seq_printf(s, "\n");
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment