Commit 5b3a4db3 authored by Sage Weil's avatar Sage Weil

ceph: fix up unexpected message handling

Fix skipping of unexpected message types from osd, mon.

Clean up pr_info and debug output.
Signed-off-by: default avatarSage Weil <sage@newdream.net>
parent bcd2cbd1
...@@ -1361,7 +1361,7 @@ static int read_partial_message(struct ceph_connection *con) ...@@ -1361,7 +1361,7 @@ static int read_partial_message(struct ceph_connection *con)
con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip); con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip);
if (skip) { if (skip) {
/* skip this message */ /* skip this message */
pr_err("alloc_msg returned NULL, skipping message\n"); dout("alloc_msg returned NULL, skipping message\n");
con->in_base_pos = -front_len - middle_len - data_len - con->in_base_pos = -front_len - middle_len - data_len -
sizeof(m->footer); sizeof(m->footer);
con->in_tag = CEPH_MSGR_TAG_READY; con->in_tag = CEPH_MSGR_TAG_READY;
...@@ -1370,7 +1370,8 @@ static int read_partial_message(struct ceph_connection *con) ...@@ -1370,7 +1370,8 @@ static int read_partial_message(struct ceph_connection *con)
if (IS_ERR(con->in_msg)) { if (IS_ERR(con->in_msg)) {
ret = PTR_ERR(con->in_msg); ret = PTR_ERR(con->in_msg);
con->in_msg = NULL; con->in_msg = NULL;
con->error_msg = "error allocating memory for incoming message"; con->error_msg =
"error allocating memory for incoming message";
return ret; return ret;
} }
m = con->in_msg; m = con->in_msg;
......
...@@ -763,7 +763,7 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, ...@@ -763,7 +763,7 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
struct ceph_mon_client *monc = con->private; struct ceph_mon_client *monc = con->private;
int type = le16_to_cpu(hdr->type); int type = le16_to_cpu(hdr->type);
int front_len = le32_to_cpu(hdr->front_len); int front_len = le32_to_cpu(hdr->front_len);
struct ceph_msg *m; struct ceph_msg *m = NULL;
*skip = 0; *skip = 0;
...@@ -777,13 +777,17 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, ...@@ -777,13 +777,17 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
case CEPH_MSG_AUTH_REPLY: case CEPH_MSG_AUTH_REPLY:
m = ceph_msgpool_get(&monc->msgpool_auth_reply, front_len); m = ceph_msgpool_get(&monc->msgpool_auth_reply, front_len);
break; break;
default: case CEPH_MSG_MON_MAP:
return NULL; case CEPH_MSG_MDS_MAP:
case CEPH_MSG_OSD_MAP:
m = ceph_msg_new(type, front_len, 0, 0, NULL);
break;
} }
if (!m) if (!m) {
pr_info("alloc_msg unknown type %d\n", type);
*skip = 1; *skip = 1;
}
return m; return m;
} }
......
...@@ -1396,31 +1396,30 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) ...@@ -1396,31 +1396,30 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
ceph_msg_put(msg); ceph_msg_put(msg);
} }
static struct ceph_msg *alloc_msg(struct ceph_connection *con, /*
* lookup and return message for incoming reply
*/
static struct ceph_msg *get_reply(struct ceph_connection *con,
struct ceph_msg_header *hdr, struct ceph_msg_header *hdr,
int *skip) int *skip)
{ {
struct ceph_osd *osd = con->private; struct ceph_osd *osd = con->private;
struct ceph_osd_client *osdc = osd->o_osdc; struct ceph_osd_client *osdc = osd->o_osdc;
int type = le16_to_cpu(hdr->type);
int front = le32_to_cpu(hdr->front_len);
int data_len = le32_to_cpu(hdr->data_len);
struct ceph_msg *m; struct ceph_msg *m;
struct ceph_osd_request *req; struct ceph_osd_request *req;
int front = le32_to_cpu(hdr->front_len);
int data_len = le32_to_cpu(hdr->data_len);
u64 tid; u64 tid;
int err; int err;
*skip = 0;
if (type != CEPH_MSG_OSD_OPREPLY)
return NULL;
tid = le64_to_cpu(hdr->tid); tid = le64_to_cpu(hdr->tid);
mutex_lock(&osdc->request_mutex); mutex_lock(&osdc->request_mutex);
req = __lookup_request(osdc, tid); req = __lookup_request(osdc, tid);
if (!req) { if (!req) {
*skip = 1; *skip = 1;
m = NULL; m = NULL;
dout("alloc_msg unknown tid %llu\n", tid); pr_info("alloc_msg unknown tid %llu from osd%d\n", tid,
osd->o_osd);
goto out; goto out;
} }
m = __get_next_reply(con, req, front); m = __get_next_reply(con, req, front);
...@@ -1437,11 +1436,33 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con, ...@@ -1437,11 +1436,33 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
m = ERR_PTR(err); m = ERR_PTR(err);
} }
} }
*skip = 0;
out: out:
mutex_unlock(&osdc->request_mutex); mutex_unlock(&osdc->request_mutex);
return m; return m;
}
static struct ceph_msg *alloc_msg(struct ceph_connection *con,
struct ceph_msg_header *hdr,
int *skip)
{
struct ceph_osd *osd = con->private;
int type = le16_to_cpu(hdr->type);
int front = le32_to_cpu(hdr->front_len);
switch (type) {
case CEPH_MSG_OSD_MAP:
return ceph_msg_new(type, front, 0, 0, NULL);
case CEPH_MSG_OSD_OPREPLY:
return get_reply(con, hdr, skip);
default:
pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
osd->o_osd);
*skip = 1;
return NULL;
}
} }
/* /*
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment