Commit 0d59ab81 authored by Yehuda Sadeh's avatar Yehuda Sadeh Committed by Sage Weil

ceph: keep reserved replies on the request structure

This includes treating all the data preallocation and revokation
at the same place, not having to have a special case for
the reserved pages.
Signed-off-by: default avatarYehuda Sadeh <yehuda@hq.newdream.net>
parent 0547a9b3
...@@ -1985,30 +1985,30 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) ...@@ -1985,30 +1985,30 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
} }
/* /*
* Revoke a page vector that we may be reading data into * Revoke a message that we may be reading data into
*/ */
void ceph_con_revoke_pages(struct ceph_connection *con, struct page **pages) void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg)
{ {
mutex_lock(&con->mutex); mutex_lock(&con->mutex);
if (con->in_msg && con->in_msg->pages == pages) { if (con->in_msg && con->in_msg == msg) {
unsigned front_len = le32_to_cpu(con->in_hdr.front_len);
unsigned middle_len = le32_to_cpu(con->in_hdr.middle_len);
unsigned data_len = le32_to_cpu(con->in_hdr.data_len); unsigned data_len = le32_to_cpu(con->in_hdr.data_len);
/* skip rest of message */ /* skip rest of message */
dout("con_revoke_pages %p msg %p pages %p revoked\n", con, dout("con_revoke_pages %p msg %p revoked\n", con, msg);
con->in_msg, pages);
if (con->in_msg_pos.data_pos < data_len)
con->in_base_pos = con->in_msg_pos.data_pos - data_len;
else
con->in_base_pos = con->in_base_pos - con->in_base_pos = con->in_base_pos -
sizeof(struct ceph_msg_header) - sizeof(struct ceph_msg_header) -
front_len -
middle_len -
data_len -
sizeof(struct ceph_msg_footer); sizeof(struct ceph_msg_footer);
con->in_msg->pages = NULL;
ceph_msg_put(con->in_msg); ceph_msg_put(con->in_msg);
con->in_msg = NULL; con->in_msg = NULL;
con->in_tag = CEPH_MSGR_TAG_READY; con->in_tag = CEPH_MSGR_TAG_READY;
} else { } else {
dout("con_revoke_pages %p msg %p pages %p no-op\n", dout("con_revoke_pages %p msg %p pages %p no-op\n",
con, con->in_msg, pages); con, con->in_msg, msg);
} }
mutex_unlock(&con->mutex); mutex_unlock(&con->mutex);
} }
......
...@@ -226,8 +226,8 @@ extern void ceph_con_open(struct ceph_connection *con, ...@@ -226,8 +226,8 @@ extern void ceph_con_open(struct ceph_connection *con,
extern void ceph_con_close(struct ceph_connection *con); extern void ceph_con_close(struct ceph_connection *con);
extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg); extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg);
extern void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg); extern void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg);
extern void ceph_con_revoke_pages(struct ceph_connection *con, extern void ceph_con_revoke_message(struct ceph_connection *con,
struct page **pages); struct ceph_msg *msg);
extern void ceph_con_keepalive(struct ceph_connection *con); extern void ceph_con_keepalive(struct ceph_connection *con);
extern struct ceph_connection *ceph_con_get(struct ceph_connection *con); extern struct ceph_connection *ceph_con_get(struct ceph_connection *con);
extern void ceph_con_put(struct ceph_connection *con); extern void ceph_con_put(struct ceph_connection *con);
......
...@@ -13,6 +13,8 @@ ...@@ -13,6 +13,8 @@
#include "decode.h" #include "decode.h"
#include "auth.h" #include "auth.h"
#define OSD_REPLY_RESERVE_FRONT_LEN 512
const static struct ceph_connection_operations osd_con_ops; const static struct ceph_connection_operations osd_con_ops;
static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd); static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
...@@ -73,6 +75,16 @@ static void calc_layout(struct ceph_osd_client *osdc, ...@@ -73,6 +75,16 @@ static void calc_layout(struct ceph_osd_client *osdc,
req->r_oid, req->r_oid_len, objoff, objlen, req->r_num_pages); req->r_oid, req->r_oid_len, objoff, objlen, req->r_num_pages);
} }
static void remove_replies(struct ceph_osd_request *req)
{
int i;
int max = ARRAY_SIZE(req->replies);
for (i=0; i<max; i++) {
if (req->replies[i])
ceph_msg_put(req->replies[i]);
}
}
/* /*
* requests * requests
...@@ -87,12 +99,13 @@ void ceph_osdc_release_request(struct kref *kref) ...@@ -87,12 +99,13 @@ void ceph_osdc_release_request(struct kref *kref)
ceph_msg_put(req->r_request); ceph_msg_put(req->r_request);
if (req->r_reply) if (req->r_reply)
ceph_msg_put(req->r_reply); ceph_msg_put(req->r_reply);
if (req->r_con_filling_pages) { remove_replies(req);
if (req->r_con_filling_msg) {
dout("release_request revoking pages %p from con %p\n", dout("release_request revoking pages %p from con %p\n",
req->r_pages, req->r_con_filling_pages); req->r_pages, req->r_con_filling_msg);
ceph_con_revoke_pages(req->r_con_filling_pages, ceph_con_revoke_message(req->r_con_filling_msg,
req->r_pages); req->r_reply);
ceph_con_put(req->r_con_filling_pages); ceph_con_put(req->r_con_filling_msg);
} }
if (req->r_own_pages) if (req->r_own_pages)
ceph_release_page_vector(req->r_pages, ceph_release_page_vector(req->r_pages,
...@@ -104,6 +117,60 @@ void ceph_osdc_release_request(struct kref *kref) ...@@ -104,6 +117,60 @@ void ceph_osdc_release_request(struct kref *kref)
kfree(req); kfree(req);
} }
static int alloc_replies(struct ceph_osd_request *req, int num_reply)
{
int i;
int max = ARRAY_SIZE(req->replies);
BUG_ON(num_reply > max);
for (i=0; i<num_reply; i++) {
req->replies[i] = ceph_msg_new(0, OSD_REPLY_RESERVE_FRONT_LEN, 0, 0, NULL);
if (IS_ERR(req->replies[i])) {
int j;
int err = PTR_ERR(req->replies[i]);
for (j = 0; j<=i; j++) {
ceph_msg_put(req->replies[j]);
}
return err;
}
}
for (; i<max; i++) {
req->replies[i] = NULL;
}
req->cur_reply = 0;
return 0;
}
static struct ceph_msg *__get_next_reply(struct ceph_connection *con,
struct ceph_osd_request *req,
int front_len)
{
struct ceph_msg *reply;
if (req->r_con_filling_msg) {
dout("revoking reply msg %p from old con %p\n", req->r_reply,
req->r_con_filling_msg);
ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
ceph_con_put(req->r_con_filling_msg);
req->cur_reply = 0;
}
reply = req->replies[req->cur_reply];
if (!reply || front_len > OSD_REPLY_RESERVE_FRONT_LEN) {
/* maybe we can allocate it now? */
reply = ceph_msg_new(0, front_len, 0, 0, NULL);
if (!reply || IS_ERR(reply)) {
pr_err(" reply alloc failed, front_len=%d\n", front_len);
return ERR_PTR(-ENOMEM);
}
}
req->r_con_filling_msg = ceph_con_get(con);
req->r_reply = ceph_msg_get(reply); /* for duration of read over socket */
return ceph_msg_get(reply);
}
/* /*
* build new request AND message, calculate layout, and adjust file * build new request AND message, calculate layout, and adjust file
* extent as needed. * extent as needed.
...@@ -147,7 +214,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, ...@@ -147,7 +214,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
if (req == NULL) if (req == NULL)
return ERR_PTR(-ENOMEM); return ERR_PTR(-ENOMEM);
err = ceph_msgpool_resv(&osdc->msgpool_op_reply, num_reply); err = alloc_replies(req, num_reply);
if (err) { if (err) {
ceph_osdc_put_request(req); ceph_osdc_put_request(req);
return ERR_PTR(-ENOMEM); return ERR_PTR(-ENOMEM);
...@@ -173,7 +240,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, ...@@ -173,7 +240,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
else else
msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, 0, 0, NULL); msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, 0, 0, NULL);
if (IS_ERR(msg)) { if (IS_ERR(msg)) {
ceph_msgpool_resv(&osdc->msgpool_op_reply, -num_reply);
ceph_osdc_put_request(req); ceph_osdc_put_request(req);
return ERR_PTR(PTR_ERR(msg)); return ERR_PTR(PTR_ERR(msg));
} }
...@@ -471,8 +537,6 @@ static void __unregister_request(struct ceph_osd_client *osdc, ...@@ -471,8 +537,6 @@ static void __unregister_request(struct ceph_osd_client *osdc,
rb_erase(&req->r_node, &osdc->requests); rb_erase(&req->r_node, &osdc->requests);
osdc->num_requests--; osdc->num_requests--;
ceph_msgpool_resv(&osdc->msgpool_op_reply, -req->r_num_prealloc_reply);
if (req->r_osd) { if (req->r_osd) {
/* make sure the original request isn't in flight. */ /* make sure the original request isn't in flight. */
ceph_con_revoke(&req->r_osd->o_con, req->r_request); ceph_con_revoke(&req->r_osd->o_con, req->r_request);
...@@ -724,12 +788,12 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, ...@@ -724,12 +788,12 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
flags = le32_to_cpu(rhead->flags); flags = le32_to_cpu(rhead->flags);
/* /*
* if this connection filled our pages, drop our reference now, to * if this connection filled our message, drop our reference now, to
* avoid a (safe but slower) revoke later. * avoid a (safe but slower) revoke later.
*/ */
if (req->r_con_filling_pages == con && req->r_pages == msg->pages) { if (req->r_con_filling_msg == con && req->r_reply == msg) {
dout(" got pages, dropping con_filling_pages ref %p\n", con); dout(" got pages, dropping con_filling_msg ref %p\n", con);
req->r_con_filling_pages = NULL; req->r_con_filling_msg = NULL;
ceph_con_put(con); ceph_con_put(con);
} }
...@@ -998,7 +1062,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) ...@@ -998,7 +1062,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
* find those pages. * find those pages.
* 0 = success, -1 failure. * 0 = success, -1 failure.
*/ */
static int prepare_pages(struct ceph_connection *con, static int __prepare_pages(struct ceph_connection *con,
struct ceph_msg_header *hdr, struct ceph_msg_header *hdr,
struct ceph_osd_request *req, struct ceph_osd_request *req,
u64 tid, u64 tid,
...@@ -1017,20 +1081,10 @@ static int prepare_pages(struct ceph_connection *con, ...@@ -1017,20 +1081,10 @@ static int prepare_pages(struct ceph_connection *con,
osdc = osd->o_osdc; osdc = osd->o_osdc;
dout("prepare_pages on msg %p want %d\n", m, want); dout("__prepare_pages on msg %p tid %llu, has %d pages, want %d\n", m,
dout("prepare_pages tid %llu has %d pages, want %d\n",
tid, req->r_num_pages, want); tid, req->r_num_pages, want);
if (unlikely(req->r_num_pages < want)) if (unlikely(req->r_num_pages < want))
goto out; goto out;
if (req->r_con_filling_pages) {
dout("revoking pages %p from old con %p\n", req->r_pages,
req->r_con_filling_pages);
ceph_con_revoke_pages(req->r_con_filling_pages, req->r_pages);
ceph_con_put(req->r_con_filling_pages);
}
req->r_con_filling_pages = ceph_con_get(con);
req->r_reply = ceph_msg_get(m); /* for duration of read over socket */
m->pages = req->r_pages; m->pages = req->r_pages;
m->nr_pages = req->r_num_pages; m->nr_pages = req->r_num_pages;
ret = 0; /* success */ ret = 0; /* success */
...@@ -1164,13 +1218,8 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) ...@@ -1164,13 +1218,8 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
err = ceph_msgpool_init(&osdc->msgpool_op, 4096, 10, true); err = ceph_msgpool_init(&osdc->msgpool_op, 4096, 10, true);
if (err < 0) if (err < 0)
goto out_mempool; goto out_mempool;
err = ceph_msgpool_init(&osdc->msgpool_op_reply, 512, 0, false);
if (err < 0)
goto out_msgpool;
return 0; return 0;
out_msgpool:
ceph_msgpool_destroy(&osdc->msgpool_op);
out_mempool: out_mempool:
mempool_destroy(osdc->req_mempool); mempool_destroy(osdc->req_mempool);
out: out:
...@@ -1186,7 +1235,6 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc) ...@@ -1186,7 +1235,6 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
} }
mempool_destroy(osdc->req_mempool); mempool_destroy(osdc->req_mempool);
ceph_msgpool_destroy(&osdc->msgpool_op); ceph_msgpool_destroy(&osdc->msgpool_op);
ceph_msgpool_destroy(&osdc->msgpool_op_reply);
} }
/* /*
...@@ -1323,17 +1371,17 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con, ...@@ -1323,17 +1371,17 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
if (!req) { if (!req) {
*skip = 1; *skip = 1;
m = NULL; m = NULL;
dout("prepare_pages unknown tid %llu\n", tid); dout("alloc_msg unknown tid %llu\n", tid);
goto out; goto out;
} }
m = ceph_msgpool_get(&osdc->msgpool_op_reply, front); m = __get_next_reply(con, req, front);
if (!m) { if (!m || IS_ERR(m)) {
*skip = 1; *skip = 1;
goto out; goto out;
} }
if (data_len > 0) { if (data_len > 0) {
err = prepare_pages(con, hdr, req, tid, m); err = __prepare_pages(con, hdr, req, tid, m);
if (err < 0) { if (err < 0) {
*skip = 1; *skip = 1;
ceph_msg_put(m); ceph_msg_put(m);
......
...@@ -44,7 +44,7 @@ struct ceph_osd_request { ...@@ -44,7 +44,7 @@ struct ceph_osd_request {
struct ceph_osd *r_osd; struct ceph_osd *r_osd;
struct ceph_pg r_pgid; struct ceph_pg r_pgid;
struct ceph_connection *r_con_filling_pages; struct ceph_connection *r_con_filling_msg;
struct ceph_msg *r_request, *r_reply; struct ceph_msg *r_request, *r_reply;
int r_result; int r_result;
...@@ -75,6 +75,9 @@ struct ceph_osd_request { ...@@ -75,6 +75,9 @@ struct ceph_osd_request {
struct page **r_pages; /* pages for data payload */ struct page **r_pages; /* pages for data payload */
int r_pages_from_pool; int r_pages_from_pool;
int r_own_pages; /* if true, i own page list */ int r_own_pages; /* if true, i own page list */
struct ceph_msg *replies[2];
int cur_reply;
}; };
struct ceph_osd_client { struct ceph_osd_client {
...@@ -99,7 +102,6 @@ struct ceph_osd_client { ...@@ -99,7 +102,6 @@ struct ceph_osd_client {
mempool_t *req_mempool; mempool_t *req_mempool;
struct ceph_msgpool msgpool_op; struct ceph_msgpool msgpool_op;
struct ceph_msgpool msgpool_op_reply;
}; };
extern int ceph_osdc_init(struct ceph_osd_client *osdc, extern int ceph_osdc_init(struct ceph_osd_client *osdc,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment