Commit 8921d114 authored by Alex Elder's avatar Alex Elder Committed by Alex Elder

libceph: make ceph_con_revoke_message() a msg op

ceph_con_revoke_message() is passed both a message and a ceph
connection.  A ceph_msg allocated for incoming messages on a
connection always has a pointer to that connection, so there's no
need to provide the connection when revoking such a message.

Note that the existing logic does not preclude the message supplied
being a null/bogus message pointer.  The only user of this interface
is the OSD client, and the only value an osd client passes is a
request's r_reply field.  That is always non-null (except briefly in
an error path in ceph_osdc_alloc_request(), and that drops the
only reference so the request won't ever have a reply to revoke).
So we can safely assume the passed-in message is non-null, but add a
BUG_ON() to make it very obvious we are imposing this restriction.

Rename the function ceph_msg_revoke_incoming() to reflect that it is
really an operation on an incoming message.
Signed-off-by: default avatarAlex Elder <elder@inktank.com>
Reviewed-by: default avatarSage Weil <sage@inktank.com>
parent 6740a845
...@@ -241,8 +241,8 @@ extern void ceph_con_close(struct ceph_connection *con); ...@@ -241,8 +241,8 @@ extern void ceph_con_close(struct ceph_connection *con);
extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg); extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg);
extern void ceph_msg_revoke(struct ceph_msg *msg); extern void ceph_msg_revoke(struct ceph_msg *msg);
extern void ceph_con_revoke_message(struct ceph_connection *con, extern void ceph_msg_revoke_incoming(struct ceph_msg *msg);
struct ceph_msg *msg);
extern void ceph_con_keepalive(struct ceph_connection *con); extern void ceph_con_keepalive(struct ceph_connection *con);
extern struct ceph_connection *ceph_con_get(struct ceph_connection *con); extern struct ceph_connection *ceph_con_get(struct ceph_connection *con);
extern void ceph_con_put(struct ceph_connection *con); extern void ceph_con_put(struct ceph_connection *con);
......
...@@ -2456,16 +2456,26 @@ void ceph_msg_revoke(struct ceph_msg *msg) ...@@ -2456,16 +2456,26 @@ void ceph_msg_revoke(struct ceph_msg *msg)
/* /*
* Revoke a message that we may be reading data into * Revoke a message that we may be reading data into
*/ */
void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg) void ceph_msg_revoke_incoming(struct ceph_msg *msg)
{ {
struct ceph_connection *con;
BUG_ON(msg == NULL);
if (!msg->con) {
dout("%s msg %p null con\n", __func__, msg);
return; /* Message not in our possession */
}
con = msg->con;
mutex_lock(&con->mutex); mutex_lock(&con->mutex);
if (con->in_msg && con->in_msg == msg) { if (con->in_msg == msg) {
unsigned front_len = le32_to_cpu(con->in_hdr.front_len); unsigned front_len = le32_to_cpu(con->in_hdr.front_len);
unsigned middle_len = le32_to_cpu(con->in_hdr.middle_len); unsigned middle_len = le32_to_cpu(con->in_hdr.middle_len);
unsigned data_len = le32_to_cpu(con->in_hdr.data_len); unsigned data_len = le32_to_cpu(con->in_hdr.data_len);
/* skip rest of message */ /* skip rest of message */
dout("con_revoke_pages %p msg %p revoked\n", con, msg); dout("%s %p msg %p revoked\n", __func__, con, msg);
con->in_base_pos = con->in_base_pos - con->in_base_pos = con->in_base_pos -
sizeof(struct ceph_msg_header) - sizeof(struct ceph_msg_header) -
front_len - front_len -
...@@ -2477,8 +2487,8 @@ void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg) ...@@ -2477,8 +2487,8 @@ void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg)
con->in_tag = CEPH_MSGR_TAG_READY; con->in_tag = CEPH_MSGR_TAG_READY;
con->in_seq++; con->in_seq++;
} else { } else {
dout("con_revoke_pages %p msg %p pages %p no-op\n", dout("%s %p in_msg %p msg %p no-op\n",
con, con->in_msg, msg); __func__, con, con->in_msg, msg);
} }
mutex_unlock(&con->mutex); mutex_unlock(&con->mutex);
} }
......
...@@ -140,10 +140,9 @@ void ceph_osdc_release_request(struct kref *kref) ...@@ -140,10 +140,9 @@ void ceph_osdc_release_request(struct kref *kref)
if (req->r_request) if (req->r_request)
ceph_msg_put(req->r_request); ceph_msg_put(req->r_request);
if (req->r_con_filling_msg) { if (req->r_con_filling_msg) {
dout("release_request revoking pages %p from con %p\n", dout("%s revoking pages %p from con %p\n", __func__,
req->r_pages, req->r_con_filling_msg); req->r_pages, req->r_con_filling_msg);
ceph_con_revoke_message(req->r_con_filling_msg, ceph_msg_revoke_incoming(req->r_reply);
req->r_reply);
req->r_con_filling_msg->ops->put(req->r_con_filling_msg); req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
} }
if (req->r_reply) if (req->r_reply)
...@@ -2022,9 +2021,9 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, ...@@ -2022,9 +2021,9 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
} }
if (req->r_con_filling_msg) { if (req->r_con_filling_msg) {
dout("get_reply revoking msg %p from old con %p\n", dout("%s revoking msg %p from old con %p\n", __func__,
req->r_reply, req->r_con_filling_msg); req->r_reply, req->r_con_filling_msg);
ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply); ceph_msg_revoke_incoming(req->r_reply);
req->r_con_filling_msg->ops->put(req->r_con_filling_msg); req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
req->r_con_filling_msg = NULL; req->r_con_filling_msg = NULL;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment