Commit 2450418c authored by Yehuda Sadeh's avatar Yehuda Sadeh Committed by Sage Weil

ceph: allocate middle of message before stating to read

Both front and middle parts of the message are now being
allocated at the ceph_alloc_msg().
Signed-off-by: default avatarYehuda Sadeh <yehuda@hq.newdream.net>
parent 5b1daecd
...@@ -2953,8 +2953,6 @@ const static struct ceph_connection_operations mds_con_ops = { ...@@ -2953,8 +2953,6 @@ const static struct ceph_connection_operations mds_con_ops = {
.get_authorizer = get_authorizer, .get_authorizer = get_authorizer,
.verify_authorizer_reply = verify_authorizer_reply, .verify_authorizer_reply = verify_authorizer_reply,
.peer_reset = peer_reset, .peer_reset = peer_reset,
.alloc_msg = ceph_alloc_msg,
.alloc_middle = ceph_alloc_middle,
}; };
......
...@@ -1279,8 +1279,34 @@ static void process_ack(struct ceph_connection *con) ...@@ -1279,8 +1279,34 @@ static void process_ack(struct ceph_connection *con)
static int read_partial_message_section(struct ceph_connection *con,
struct kvec *section, unsigned int sec_len,
u32 *crc)
{
int left;
int ret;
BUG_ON(!section);
while (section->iov_len < sec_len) {
BUG_ON(section->iov_base == NULL);
left = sec_len - section->iov_len;
ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base +
section->iov_len, left);
if (ret <= 0)
return ret;
section->iov_len += ret;
if (section->iov_len == sec_len)
*crc = crc32c(0, section->iov_base,
section->iov_len);
}
return 1;
}
static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
struct ceph_msg_header *hdr,
int *skip);
/* /*
* read (part of) a message. * read (part of) a message.
*/ */
...@@ -1292,6 +1318,7 @@ static int read_partial_message(struct ceph_connection *con) ...@@ -1292,6 +1318,7 @@ static int read_partial_message(struct ceph_connection *con)
int to, want, left; int to, want, left;
unsigned front_len, middle_len, data_len, data_off; unsigned front_len, middle_len, data_len, data_off;
int datacrc = con->msgr->nocrc; int datacrc = con->msgr->nocrc;
int skip;
dout("read_partial_message con %p msg %p\n", con, m); dout("read_partial_message con %p msg %p\n", con, m);
...@@ -1315,7 +1342,6 @@ static int read_partial_message(struct ceph_connection *con) ...@@ -1315,7 +1342,6 @@ static int read_partial_message(struct ceph_connection *con)
} }
} }
} }
front_len = le32_to_cpu(con->in_hdr.front_len); front_len = le32_to_cpu(con->in_hdr.front_len);
if (front_len > CEPH_MSG_MAX_FRONT_LEN) if (front_len > CEPH_MSG_MAX_FRONT_LEN)
return -EIO; return -EIO;
...@@ -1330,8 +1356,8 @@ static int read_partial_message(struct ceph_connection *con) ...@@ -1330,8 +1356,8 @@ static int read_partial_message(struct ceph_connection *con)
if (!con->in_msg) { if (!con->in_msg) {
dout("got hdr type %d front %d data %d\n", con->in_hdr.type, dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
con->in_hdr.front_len, con->in_hdr.data_len); con->in_hdr.front_len, con->in_hdr.data_len);
con->in_msg = con->ops->alloc_msg(con, &con->in_hdr); con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip);
if (!con->in_msg) { if (skip) {
/* skip this message */ /* skip this message */
pr_err("alloc_msg returned NULL, skipping message\n"); pr_err("alloc_msg returned NULL, skipping message\n");
con->in_base_pos = -front_len - middle_len - data_len - con->in_base_pos = -front_len - middle_len - data_len -
...@@ -1342,56 +1368,28 @@ static int read_partial_message(struct ceph_connection *con) ...@@ -1342,56 +1368,28 @@ static int read_partial_message(struct ceph_connection *con)
if (IS_ERR(con->in_msg)) { if (IS_ERR(con->in_msg)) {
ret = PTR_ERR(con->in_msg); ret = PTR_ERR(con->in_msg);
con->in_msg = NULL; con->in_msg = NULL;
con->error_msg = "out of memory for incoming message"; con->error_msg = "error allocating memory for incoming message";
return ret; return ret;
} }
m = con->in_msg; m = con->in_msg;
m->front.iov_len = 0; /* haven't read it yet */ m->front.iov_len = 0; /* haven't read it yet */
if (m->middle)
m->middle->vec.iov_len = 0;
memcpy(&m->hdr, &con->in_hdr, sizeof(con->in_hdr)); memcpy(&m->hdr, &con->in_hdr, sizeof(con->in_hdr));
} }
/* front */ /* front */
while (m->front.iov_len < front_len) { ret = read_partial_message_section(con, &m->front, front_len,
BUG_ON(m->front.iov_base == NULL); &con->in_front_crc);
left = front_len - m->front.iov_len; if (ret <= 0)
ret = ceph_tcp_recvmsg(con->sock, (char *)m->front.iov_base + return ret;
m->front.iov_len, left);
if (ret <= 0)
return ret;
m->front.iov_len += ret;
if (m->front.iov_len == front_len)
con->in_front_crc = crc32c(0, m->front.iov_base,
m->front.iov_len);
}
/* middle */ /* middle */
while (middle_len > 0 && (!m->middle || if (m->middle) {
m->middle->vec.iov_len < middle_len)) { ret = read_partial_message_section(con, &m->middle->vec, middle_len,
if (m->middle == NULL) { &con->in_middle_crc);
ret = -EOPNOTSUPP;
if (con->ops->alloc_middle)
ret = con->ops->alloc_middle(con, m);
if (ret < 0) {
pr_err("alloc_middle fail skipping payload\n");
con->in_base_pos = -middle_len - data_len
- sizeof(m->footer);
ceph_msg_put(con->in_msg);
con->in_msg = NULL;
con->in_tag = CEPH_MSGR_TAG_READY;
return 0;
}
m->middle->vec.iov_len = 0;
}
left = middle_len - m->middle->vec.iov_len;
ret = ceph_tcp_recvmsg(con->sock,
(char *)m->middle->vec.iov_base +
m->middle->vec.iov_len, left);
if (ret <= 0) if (ret <= 0)
return ret; return ret;
m->middle->vec.iov_len += ret;
if (m->middle->vec.iov_len == middle_len)
con->in_middle_crc = crc32c(0, m->middle->vec.iov_base,
m->middle->vec.iov_len);
} }
/* (page) data */ /* (page) data */
...@@ -2115,24 +2113,6 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, ...@@ -2115,24 +2113,6 @@ struct ceph_msg *ceph_msg_new(int type, int front_len,
return ERR_PTR(-ENOMEM); return ERR_PTR(-ENOMEM);
} }
/*
* Generic message allocator, for incoming messages.
*/
struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
struct ceph_msg_header *hdr)
{
int type = le16_to_cpu(hdr->type);
int front_len = le32_to_cpu(hdr->front_len);
struct ceph_msg *msg = ceph_msg_new(type, front_len, 0, 0, NULL);
if (!msg) {
pr_err("unable to allocate msg type %d len %d\n",
type, front_len);
return ERR_PTR(-ENOMEM);
}
return msg;
}
/* /*
* Allocate "middle" portion of a message, if it is needed and wasn't * Allocate "middle" portion of a message, if it is needed and wasn't
* allocated by alloc_msg. This allows us to read a small fixed-size * allocated by alloc_msg. This allows us to read a small fixed-size
...@@ -2140,7 +2120,7 @@ struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, ...@@ -2140,7 +2120,7 @@ struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
* propagate the error to the caller based on info in the front) when * propagate the error to the caller based on info in the front) when
* the middle is too large. * the middle is too large.
*/ */
int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
{ {
int type = le16_to_cpu(msg->hdr.type); int type = le16_to_cpu(msg->hdr.type);
int middle_len = le32_to_cpu(msg->hdr.middle_len); int middle_len = le32_to_cpu(msg->hdr.middle_len);
...@@ -2156,6 +2136,48 @@ int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) ...@@ -2156,6 +2136,48 @@ int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
return 0; return 0;
} }
/*
* Generic message allocator, for incoming messages.
*/
static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
struct ceph_msg_header *hdr,
int *skip)
{
int type = le16_to_cpu(hdr->type);
int front_len = le32_to_cpu(hdr->front_len);
int middle_len = le32_to_cpu(hdr->middle_len);
struct ceph_msg *msg = NULL;
int ret;
if (con->ops->alloc_msg) {
msg = con->ops->alloc_msg(con, hdr, skip);
if (IS_ERR(msg))
return msg;
if (*skip)
return NULL;
}
if (!msg) {
*skip = 0;
msg = ceph_msg_new(type, front_len, 0, 0, NULL);
if (!msg) {
pr_err("unable to allocate msg type %d len %d\n",
type, front_len);
return ERR_PTR(-ENOMEM);
}
}
if (middle_len) {
ret = ceph_alloc_middle(con, msg);
if (ret < 0) {
ceph_msg_put(msg);
return msg;
}
}
return msg;
}
/* /*
* Free a generically kmalloc'd message. * Free a generically kmalloc'd message.
......
...@@ -44,9 +44,8 @@ struct ceph_connection_operations { ...@@ -44,9 +44,8 @@ struct ceph_connection_operations {
void (*peer_reset) (struct ceph_connection *con); void (*peer_reset) (struct ceph_connection *con);
struct ceph_msg * (*alloc_msg) (struct ceph_connection *con, struct ceph_msg * (*alloc_msg) (struct ceph_connection *con,
struct ceph_msg_header *hdr); struct ceph_msg_header *hdr,
int (*alloc_middle) (struct ceph_connection *con, int *skip);
struct ceph_msg *msg);
/* an incoming message has a data payload; tell me what pages I /* an incoming message has a data payload; tell me what pages I
* should read the data into. */ * should read the data into. */
int (*prepare_pages) (struct ceph_connection *con, struct ceph_msg *m, int (*prepare_pages) (struct ceph_connection *con, struct ceph_msg *m,
...@@ -242,10 +241,6 @@ extern struct ceph_msg *ceph_msg_new(int type, int front_len, ...@@ -242,10 +241,6 @@ extern struct ceph_msg *ceph_msg_new(int type, int front_len,
struct page **pages); struct page **pages);
extern void ceph_msg_kfree(struct ceph_msg *m); extern void ceph_msg_kfree(struct ceph_msg *m);
extern struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
struct ceph_msg_header *hdr);
extern int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg);
static inline struct ceph_msg *ceph_msg_get(struct ceph_msg *msg) static inline struct ceph_msg *ceph_msg_get(struct ceph_msg *msg)
{ {
......
...@@ -692,21 +692,33 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) ...@@ -692,21 +692,33 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
* Allocate memory for incoming message * Allocate memory for incoming message
*/ */
static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
struct ceph_msg_header *hdr) struct ceph_msg_header *hdr,
int *skip)
{ {
struct ceph_mon_client *monc = con->private; struct ceph_mon_client *monc = con->private;
int type = le16_to_cpu(hdr->type); int type = le16_to_cpu(hdr->type);
int front = le32_to_cpu(hdr->front_len); int front_len = le32_to_cpu(hdr->front_len);
struct ceph_msg *m;
*skip = 0;
switch (type) { switch (type) {
case CEPH_MSG_MON_SUBSCRIBE_ACK: case CEPH_MSG_MON_SUBSCRIBE_ACK:
return ceph_msgpool_get(&monc->msgpool_subscribe_ack, front); m = ceph_msgpool_get(&monc->msgpool_subscribe_ack, front_len);
break;
case CEPH_MSG_STATFS_REPLY: case CEPH_MSG_STATFS_REPLY:
return ceph_msgpool_get(&monc->msgpool_statfs_reply, front); m = ceph_msgpool_get(&monc->msgpool_statfs_reply, front_len);
break;
case CEPH_MSG_AUTH_REPLY: case CEPH_MSG_AUTH_REPLY:
return ceph_msgpool_get(&monc->msgpool_auth_reply, front); m = ceph_msgpool_get(&monc->msgpool_auth_reply, front_len);
break;
default:
return NULL;
} }
return ceph_alloc_msg(con, hdr);
if (!m)
*skip = 1;
return m;
} }
/* /*
...@@ -749,5 +761,4 @@ const static struct ceph_connection_operations mon_con_ops = { ...@@ -749,5 +761,4 @@ const static struct ceph_connection_operations mon_con_ops = {
.dispatch = dispatch, .dispatch = dispatch,
.fault = mon_fault, .fault = mon_fault,
.alloc_msg = mon_alloc_msg, .alloc_msg = mon_alloc_msg,
.alloc_middle = ceph_alloc_middle,
}; };
...@@ -1304,18 +1304,28 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) ...@@ -1304,18 +1304,28 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
} }
static struct ceph_msg *alloc_msg(struct ceph_connection *con, static struct ceph_msg *alloc_msg(struct ceph_connection *con,
struct ceph_msg_header *hdr) struct ceph_msg_header *hdr,
int *skip)
{ {
struct ceph_osd *osd = con->private; struct ceph_osd *osd = con->private;
struct ceph_osd_client *osdc = osd->o_osdc; struct ceph_osd_client *osdc = osd->o_osdc;
int type = le16_to_cpu(hdr->type); int type = le16_to_cpu(hdr->type);
int front = le32_to_cpu(hdr->front_len); int front = le32_to_cpu(hdr->front_len);
struct ceph_msg *m;
*skip = 0;
switch (type) { switch (type) {
case CEPH_MSG_OSD_OPREPLY: case CEPH_MSG_OSD_OPREPLY:
return ceph_msgpool_get(&osdc->msgpool_op_reply, front); m = ceph_msgpool_get(&osdc->msgpool_op_reply, front);
break;
default:
return NULL;
} }
return ceph_alloc_msg(con, hdr);
if (!m)
*skip = 1;
return m;
} }
/* /*
...@@ -1390,6 +1400,5 @@ const static struct ceph_connection_operations osd_con_ops = { ...@@ -1390,6 +1400,5 @@ const static struct ceph_connection_operations osd_con_ops = {
.verify_authorizer_reply = verify_authorizer_reply, .verify_authorizer_reply = verify_authorizer_reply,
.alloc_msg = alloc_msg, .alloc_msg = alloc_msg,
.fault = osd_reset, .fault = osd_reset,
.alloc_middle = ceph_alloc_middle,
.prepare_pages = prepare_pages, .prepare_pages = prepare_pages,
}; };
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment