Commit 43794509 authored by Alex Elder's avatar Alex Elder Committed by Sage Weil

libceph: abstract message data

Group the types of message data into an abstract structure with a
type indicator and a union containing fields appropriate to the
type of data it represents.  Use this to represent the pages,
pagelist, bio, and trail in a ceph message.

Verify message data is of type NONE in ceph_msg_data_set_*()
routines.  Since information about message data of type NONE really
should not be interpreted, get rid of the other assertions in those
functions.
Signed-off-by: default avatarAlex Elder <elder@inktank.com>
Reviewed-by: default avatarJosh Durgin <josh.durgin@inktank.com>
parent f9e15777
...@@ -64,12 +64,55 @@ struct ceph_messenger { ...@@ -64,12 +64,55 @@ struct ceph_messenger {
u32 required_features; u32 required_features;
}; };
#define ceph_msg_has_pages(m) ((m)->p.pages != NULL) #define ceph_msg_has_pages(m) ((m)->p.type == CEPH_MSG_DATA_PAGES)
#define ceph_msg_has_pagelist(m) ((m)->l.pagelist != NULL) #define ceph_msg_has_pagelist(m) ((m)->l.type == CEPH_MSG_DATA_PAGELIST)
#ifdef CONFIG_BLOCK #ifdef CONFIG_BLOCK
#define ceph_msg_has_bio(m) ((m)->b.bio != NULL) #define ceph_msg_has_bio(m) ((m)->b.type == CEPH_MSG_DATA_BIO)
#endif /* CONFIG_BLOCK */ #endif /* CONFIG_BLOCK */
#define ceph_msg_has_trail(m) ((m)->t.trail != NULL) #define ceph_msg_has_trail(m) ((m)->t.type == CEPH_MSG_DATA_PAGELIST)
enum ceph_msg_data_type {
CEPH_MSG_DATA_NONE, /* message contains no data payload */
CEPH_MSG_DATA_PAGES, /* data source/destination is a page array */
CEPH_MSG_DATA_PAGELIST, /* data source/destination is a pagelist */
#ifdef CONFIG_BLOCK
CEPH_MSG_DATA_BIO, /* data source/destination is a bio list */
#endif /* CONFIG_BLOCK */
};
static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
{
switch (type) {
case CEPH_MSG_DATA_NONE:
case CEPH_MSG_DATA_PAGES:
case CEPH_MSG_DATA_PAGELIST:
#ifdef CONFIG_BLOCK
case CEPH_MSG_DATA_BIO:
#endif /* CONFIG_BLOCK */
return true;
default:
return false;
}
}
struct ceph_msg_data {
enum ceph_msg_data_type type;
union {
#ifdef CONFIG_BLOCK
struct {
struct bio *bio_iter; /* iterator */
struct bio *bio;
unsigned int bio_seg; /* current seg in bio */
};
#endif /* CONFIG_BLOCK */
struct {
struct page **pages; /* NOT OWNER. */
size_t length; /* total # bytes */
unsigned int alignment; /* first page */
};
struct ceph_pagelist *pagelist;
};
};
/* /*
* a single message. it contains a header (src, dest, message type, etc.), * a single message. it contains a header (src, dest, message type, etc.),
...@@ -83,24 +126,12 @@ struct ceph_msg { ...@@ -83,24 +126,12 @@ struct ceph_msg {
struct ceph_buffer *middle; struct ceph_buffer *middle;
/* data payload */ /* data payload */
struct { struct ceph_msg_data p; /* pages */
struct page **pages; /* NOT OWNER. */ struct ceph_msg_data l; /* pagelist */
size_t length; /* # data bytes in array */
unsigned int alignment; /* first page */
} p;
struct {
struct ceph_pagelist *pagelist;
} l;
#ifdef CONFIG_BLOCK #ifdef CONFIG_BLOCK
struct { struct ceph_msg_data b; /* bio */
struct bio *bio_iter; /* iterator */
struct bio *bio;
unsigned int bio_seg; /* current seg in bio */
} b;
#endif /* CONFIG_BLOCK */ #endif /* CONFIG_BLOCK */
struct { struct ceph_msg_data t; /* trail */
struct ceph_pagelist *trail; /* trailing part of data */
} t;
struct ceph_connection *con; struct ceph_connection *con;
struct list_head list_head; /* links for connection lists */ struct list_head list_head; /* links for connection lists */
......
...@@ -1054,7 +1054,7 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page, ...@@ -1054,7 +1054,7 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
msg_pos->did_page_crc = false; msg_pos->did_page_crc = false;
if (in_trail) { if (in_trail) {
BUG_ON(!ceph_msg_has_trail(msg)); BUG_ON(!ceph_msg_has_trail(msg));
list_rotate_left(&msg->t.trail->head); list_rotate_left(&msg->t.pagelist->head);
} else if (ceph_msg_has_pagelist(msg)) { } else if (ceph_msg_has_pagelist(msg)) {
list_rotate_left(&msg->l.pagelist->head); list_rotate_left(&msg->l.pagelist->head);
#ifdef CONFIG_BLOCK #ifdef CONFIG_BLOCK
...@@ -1120,7 +1120,7 @@ static int write_partial_message_data(struct ceph_connection *con) ...@@ -1120,7 +1120,7 @@ static int write_partial_message_data(struct ceph_connection *con)
size_t trail_off = data_len; size_t trail_off = data_len;
if (ceph_msg_has_trail(msg)) { if (ceph_msg_has_trail(msg)) {
trail_len = msg->t.trail->length; trail_len = msg->t.pagelist->length;
trail_off -= trail_len; trail_off -= trail_len;
} }
...@@ -1149,7 +1149,7 @@ static int write_partial_message_data(struct ceph_connection *con) ...@@ -1149,7 +1149,7 @@ static int write_partial_message_data(struct ceph_connection *con)
if (in_trail) { if (in_trail) {
BUG_ON(!ceph_msg_has_trail(msg)); BUG_ON(!ceph_msg_has_trail(msg));
total_max_write = data_len - msg_pos->data_pos; total_max_write = data_len - msg_pos->data_pos;
page = list_first_entry(&msg->t.trail->head, page = list_first_entry(&msg->t.pagelist->head,
struct page, lru); struct page, lru);
} else if (ceph_msg_has_pages(msg)) { } else if (ceph_msg_has_pages(msg)) {
page = msg->p.pages[msg_pos->page]; page = msg->p.pages[msg_pos->page];
...@@ -2736,14 +2736,19 @@ void ceph_con_keepalive(struct ceph_connection *con) ...@@ -2736,14 +2736,19 @@ void ceph_con_keepalive(struct ceph_connection *con)
} }
EXPORT_SYMBOL(ceph_con_keepalive); EXPORT_SYMBOL(ceph_con_keepalive);
static void ceph_msg_data_init(struct ceph_msg_data *data)
{
data->type = CEPH_MSG_DATA_NONE;
}
void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages, void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
size_t length, size_t alignment) size_t length, size_t alignment)
{ {
BUG_ON(!pages); BUG_ON(!pages);
BUG_ON(!length); BUG_ON(!length);
BUG_ON(msg->p.pages); BUG_ON(msg->p.type != CEPH_MSG_DATA_NONE);
BUG_ON(msg->p.length);
msg->p.type = CEPH_MSG_DATA_PAGES;
msg->p.pages = pages; msg->p.pages = pages;
msg->p.length = length; msg->p.length = length;
msg->p.alignment = alignment & ~PAGE_MASK; msg->p.alignment = alignment & ~PAGE_MASK;
...@@ -2755,8 +2760,9 @@ void ceph_msg_data_set_pagelist(struct ceph_msg *msg, ...@@ -2755,8 +2760,9 @@ void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
{ {
BUG_ON(!pagelist); BUG_ON(!pagelist);
BUG_ON(!pagelist->length); BUG_ON(!pagelist->length);
BUG_ON(msg->l.pagelist); BUG_ON(msg->l.type != CEPH_MSG_DATA_NONE);
msg->l.type = CEPH_MSG_DATA_PAGELIST;
msg->l.pagelist = pagelist; msg->l.pagelist = pagelist;
} }
EXPORT_SYMBOL(ceph_msg_data_set_pagelist); EXPORT_SYMBOL(ceph_msg_data_set_pagelist);
...@@ -2764,8 +2770,9 @@ EXPORT_SYMBOL(ceph_msg_data_set_pagelist); ...@@ -2764,8 +2770,9 @@ EXPORT_SYMBOL(ceph_msg_data_set_pagelist);
void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio) void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio)
{ {
BUG_ON(!bio); BUG_ON(!bio);
BUG_ON(msg->b.bio); BUG_ON(msg->b.type != CEPH_MSG_DATA_NONE);
msg->b.type = CEPH_MSG_DATA_BIO;
msg->b.bio = bio; msg->b.bio = bio;
} }
EXPORT_SYMBOL(ceph_msg_data_set_bio); EXPORT_SYMBOL(ceph_msg_data_set_bio);
...@@ -2774,9 +2781,10 @@ void ceph_msg_data_set_trail(struct ceph_msg *msg, struct ceph_pagelist *trail) ...@@ -2774,9 +2781,10 @@ void ceph_msg_data_set_trail(struct ceph_msg *msg, struct ceph_pagelist *trail)
{ {
BUG_ON(!trail); BUG_ON(!trail);
BUG_ON(!trail->length); BUG_ON(!trail->length);
BUG_ON(msg->t.trail); BUG_ON(msg->b.type != CEPH_MSG_DATA_NONE);
msg->t.trail = trail; msg->t.type = CEPH_MSG_DATA_PAGELIST;
msg->t.pagelist = trail;
} }
EXPORT_SYMBOL(ceph_msg_data_set_trail); EXPORT_SYMBOL(ceph_msg_data_set_trail);
...@@ -2800,6 +2808,11 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, ...@@ -2800,6 +2808,11 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
INIT_LIST_HEAD(&m->list_head); INIT_LIST_HEAD(&m->list_head);
kref_init(&m->kref); kref_init(&m->kref);
ceph_msg_data_init(&m->p);
ceph_msg_data_init(&m->l);
ceph_msg_data_init(&m->b);
ceph_msg_data_init(&m->t);
/* front */ /* front */
m->front_max = front_len; m->front_max = front_len;
if (front_len) { if (front_len) {
...@@ -2965,7 +2978,7 @@ void ceph_msg_last_put(struct kref *kref) ...@@ -2965,7 +2978,7 @@ void ceph_msg_last_put(struct kref *kref)
} }
if (ceph_msg_has_trail(m)) if (ceph_msg_has_trail(m))
m->t.trail = NULL; m->t.pagelist = NULL;
if (m->pool) if (m->pool)
ceph_msgpool_put(m->pool, m); ceph_msgpool_put(m->pool, m);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment