Commit fe38a2b6 authored by Alex Elder's avatar Alex Elder Committed by Sage Weil

libceph: start defining message data cursor

This patch lays out the foundation for using generic routines to
manage processing items of message data.

For simplicity, we'll start with just the trail portion of a
message, because it stands alone and is only present for outgoing
data.

First some basic concepts.  We'll use the term "data item" to
represent one of the ceph_msg_data structures associated with a
message.  There are currently four of those, with single-letter
field names p, l, b, and t.  A data item is further broken into
"pieces" which always lie in a single page.  A data item will
include a "cursor" that will track state as the memory defined by
the item is consumed by sending data from or receiving data into it.

We define three routines to manipulate a data item's cursor: the
"init" routine; the "next" routine; and the "advance" routine.  The
"init" routine initializes the cursor so it points at the beginning
of the first piece in the item.  The "next" routine returns the
page, page offset, and length (limited by both the page and item
size) of the next unconsumed piece in the item.  It also indicates
to the caller whether the piece being returned is the last one in
the data item.

The "advance" routine consumes the requested number of bytes in the
item (advancing the cursor).  This is used to record the number of
bytes from the current piece that were actually sent or received by
the network code.  It returns an indication of whether the result
means the current piece has been fully consumed.  This is used by
the message send code to determine whether it should calculate the
CRC for the next piece processed.

The trail of a message is implemented as a ceph pagelist.  The
routines defined for it will be usable for non-trail pagelist data
as well.
Signed-off-by: default avatarAlex Elder <elder@inktank.com>
Reviewed-by: default avatarJosh Durgin <josh.durgin@inktank.com>
parent 43794509
...@@ -95,6 +95,12 @@ static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type) ...@@ -95,6 +95,12 @@ static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
} }
} }
struct ceph_msg_data_cursor {
bool last_piece; /* now at last piece of data item */
struct page *page; /* current page in pagelist */
size_t offset; /* pagelist bytes consumed */
};
struct ceph_msg_data { struct ceph_msg_data {
enum ceph_msg_data_type type; enum ceph_msg_data_type type;
union { union {
...@@ -112,6 +118,7 @@ struct ceph_msg_data { ...@@ -112,6 +118,7 @@ struct ceph_msg_data {
}; };
struct ceph_pagelist *pagelist; struct ceph_pagelist *pagelist;
}; };
struct ceph_msg_data_cursor cursor; /* pagelist only */
}; };
/* /*
......
...@@ -21,6 +21,9 @@ ...@@ -21,6 +21,9 @@
#include <linux/ceph/pagelist.h> #include <linux/ceph/pagelist.h>
#include <linux/export.h> #include <linux/export.h>
#define list_entry_next(pos, member) \
list_entry(pos->member.next, typeof(*pos), member)
/* /*
* Ceph uses the messenger to exchange ceph_msg messages with other * Ceph uses the messenger to exchange ceph_msg messages with other
* hosts in the system. The messenger provides ordered and reliable * hosts in the system. The messenger provides ordered and reliable
...@@ -738,6 +741,109 @@ static void iter_bio_next(struct bio **bio_iter, unsigned int *seg) ...@@ -738,6 +741,109 @@ static void iter_bio_next(struct bio **bio_iter, unsigned int *seg)
} }
#endif #endif
/*
* Message data is handled (sent or received) in pieces, where each
* piece resides on a single page. The network layer might not
* consume an entire piece at once. A data item's cursor keeps
* track of which piece is next to process and how much remains to
* be processed in that piece. It also tracks whether the current
* piece is the last one in the data item.
*/
static void ceph_msg_data_cursor_init(struct ceph_msg_data *data)
{
struct ceph_msg_data_cursor *cursor = &data->cursor;
struct ceph_pagelist *pagelist;
struct page *page;
if (data->type != CEPH_MSG_DATA_PAGELIST)
return;
pagelist = data->pagelist;
BUG_ON(!pagelist);
if (!pagelist->length)
return; /* pagelist can be assigned but empty */
BUG_ON(list_empty(&pagelist->head));
page = list_first_entry(&pagelist->head, struct page, lru);
cursor->page = page;
cursor->offset = 0;
cursor->last_piece = pagelist->length <= PAGE_SIZE;
}
/*
* Return the page containing the next piece to process for a given
* data item, and supply the page offset and length of that piece.
* Indicate whether this is the last piece in this data item.
*/
static struct page *ceph_msg_data_next(struct ceph_msg_data *data,
size_t *page_offset,
size_t *length,
bool *last_piece)
{
struct ceph_msg_data_cursor *cursor = &data->cursor;
struct ceph_pagelist *pagelist;
size_t piece_end;
BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
pagelist = data->pagelist;
BUG_ON(!pagelist);
BUG_ON(!cursor->page);
BUG_ON(cursor->offset >= pagelist->length);
*last_piece = cursor->last_piece;
if (*last_piece) {
/* pagelist offset is always 0 */
piece_end = pagelist->length & ~PAGE_MASK;
if (!piece_end)
piece_end = PAGE_SIZE;
} else {
piece_end = PAGE_SIZE;
}
*page_offset = cursor->offset & ~PAGE_MASK;
*length = piece_end - *page_offset;
return data->cursor.page;
}
/*
* Returns true if the result moves the cursor on to the next piece
* (the next page) of the pagelist.
*/
static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
{
struct ceph_msg_data_cursor *cursor = &data->cursor;
struct ceph_pagelist *pagelist;
BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
pagelist = data->pagelist;
BUG_ON(!pagelist);
BUG_ON(!cursor->page);
BUG_ON(cursor->offset + bytes > pagelist->length);
BUG_ON((cursor->offset & ~PAGE_MASK) + bytes > PAGE_SIZE);
/* Advance the cursor offset */
cursor->offset += bytes;
/* pagelist offset is always 0 */
if (!bytes || cursor->offset & ~PAGE_MASK)
return false; /* more bytes to process in the current page */
/* Move on to the next page */
BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head));
cursor->page = list_entry_next(cursor->page, lru);
/* cursor offset is at page boundary; pagelist offset is always 0 */
if (pagelist->length - cursor->offset <= PAGE_SIZE)
cursor->last_piece = true;
return true;
}
static void prepare_message_data(struct ceph_msg *msg, static void prepare_message_data(struct ceph_msg *msg,
struct ceph_msg_pos *msg_pos) struct ceph_msg_pos *msg_pos)
{ {
...@@ -755,6 +861,12 @@ static void prepare_message_data(struct ceph_msg *msg, ...@@ -755,6 +861,12 @@ static void prepare_message_data(struct ceph_msg *msg,
init_bio_iter(msg->b.bio, &msg->b.bio_iter, &msg->b.bio_seg); init_bio_iter(msg->b.bio, &msg->b.bio_iter, &msg->b.bio_seg);
#endif #endif
msg_pos->data_pos = 0; msg_pos->data_pos = 0;
/* If there's a trail, initialize its cursor */
if (ceph_msg_has_trail(msg))
ceph_msg_data_cursor_init(&msg->t);
msg_pos->did_page_crc = false; msg_pos->did_page_crc = false;
} }
...@@ -1045,6 +1157,12 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page, ...@@ -1045,6 +1157,12 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
msg_pos->data_pos += sent; msg_pos->data_pos += sent;
msg_pos->page_pos += sent; msg_pos->page_pos += sent;
if (in_trail) {
bool need_crc;
need_crc = ceph_msg_data_advance(&msg->t, sent);
BUG_ON(need_crc && sent != len);
}
if (sent < len) if (sent < len)
return; return;
...@@ -1052,10 +1170,7 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page, ...@@ -1052,10 +1170,7 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
msg_pos->page_pos = 0; msg_pos->page_pos = 0;
msg_pos->page++; msg_pos->page++;
msg_pos->did_page_crc = false; msg_pos->did_page_crc = false;
if (in_trail) { if (ceph_msg_has_pagelist(msg)) {
BUG_ON(!ceph_msg_has_trail(msg));
list_rotate_left(&msg->t.pagelist->head);
} else if (ceph_msg_has_pagelist(msg)) {
list_rotate_left(&msg->l.pagelist->head); list_rotate_left(&msg->l.pagelist->head);
#ifdef CONFIG_BLOCK #ifdef CONFIG_BLOCK
} else if (ceph_msg_has_bio(msg)) { } else if (ceph_msg_has_bio(msg)) {
...@@ -1141,6 +1256,8 @@ static int write_partial_message_data(struct ceph_connection *con) ...@@ -1141,6 +1256,8 @@ static int write_partial_message_data(struct ceph_connection *con)
size_t length; size_t length;
int max_write = PAGE_SIZE; int max_write = PAGE_SIZE;
int bio_offset = 0; int bio_offset = 0;
bool use_cursor = false;
bool last_piece = true; /* preserve existing behavior */
in_trail = in_trail || msg_pos->data_pos >= trail_off; in_trail = in_trail || msg_pos->data_pos >= trail_off;
if (!in_trail) if (!in_trail)
...@@ -1148,9 +1265,9 @@ static int write_partial_message_data(struct ceph_connection *con) ...@@ -1148,9 +1265,9 @@ static int write_partial_message_data(struct ceph_connection *con)
if (in_trail) { if (in_trail) {
BUG_ON(!ceph_msg_has_trail(msg)); BUG_ON(!ceph_msg_has_trail(msg));
total_max_write = data_len - msg_pos->data_pos; use_cursor = true;
page = list_first_entry(&msg->t.pagelist->head, page = ceph_msg_data_next(&msg->t, &page_offset,
struct page, lru); &length, &last_piece);
} else if (ceph_msg_has_pages(msg)) { } else if (ceph_msg_has_pages(msg)) {
page = msg->p.pages[msg_pos->page]; page = msg->p.pages[msg_pos->page];
} else if (ceph_msg_has_pagelist(msg)) { } else if (ceph_msg_has_pagelist(msg)) {
...@@ -1168,8 +1285,9 @@ static int write_partial_message_data(struct ceph_connection *con) ...@@ -1168,8 +1285,9 @@ static int write_partial_message_data(struct ceph_connection *con)
} else { } else {
page = zero_page; page = zero_page;
} }
length = min_t(int, max_write - msg_pos->page_pos, if (!use_cursor)
total_max_write); length = min_t(int, max_write - msg_pos->page_pos,
total_max_write);
page_offset = msg_pos->page_pos + bio_offset; page_offset = msg_pos->page_pos + bio_offset;
if (do_datacrc && !msg_pos->did_page_crc) { if (do_datacrc && !msg_pos->did_page_crc) {
...@@ -1180,7 +1298,7 @@ static int write_partial_message_data(struct ceph_connection *con) ...@@ -1180,7 +1298,7 @@ static int write_partial_message_data(struct ceph_connection *con)
msg_pos->did_page_crc = true; msg_pos->did_page_crc = true;
} }
ret = ceph_tcp_sendpage(con->sock, page, page_offset, ret = ceph_tcp_sendpage(con->sock, page, page_offset,
length, true); length, last_piece);
if (ret <= 0) if (ret <= 0)
goto out; goto out;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment