Commit e766d7b5 authored by Alex Elder's avatar Alex Elder Committed by Sage Weil

libceph: implement pages array cursor

Implement and use cursor routines for page array message data items
for outbound message data.
Signed-off-by: default avatarAlex Elder <elder@inktank.com>
Reviewed-by: default avatarJosh Durgin <josh.durgin@inktank.com>
parent 6aaa4511
...@@ -105,6 +105,12 @@ struct ceph_msg_data_cursor { ...@@ -105,6 +105,12 @@ struct ceph_msg_data_cursor {
unsigned int vector_offset; /* bytes from vector */ unsigned int vector_offset; /* bytes from vector */
}; };
#endif /* CONFIG_BLOCK */ #endif /* CONFIG_BLOCK */
struct { /* pages */
size_t resid; /* bytes from array */
unsigned int page_offset; /* offset in page */
unsigned short page_index; /* index in array */
unsigned short page_count; /* pages in array */
};
struct { /* pagelist */ struct { /* pagelist */
struct page *page; /* page from list */ struct page *page; /* page from list */
size_t offset; /* bytes from list */ size_t offset; /* bytes from list */
......
...@@ -830,6 +830,79 @@ static bool ceph_msg_data_bio_advance(struct ceph_msg_data *data, size_t bytes) ...@@ -830,6 +830,79 @@ static bool ceph_msg_data_bio_advance(struct ceph_msg_data *data, size_t bytes)
} }
#endif #endif
/*
* For a page array, a piece comes from the first page in the array
* that has not already been fully consumed.
*/
static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data *data)
{
struct ceph_msg_data_cursor *cursor = &data->cursor;
int page_count;
BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
BUG_ON(!data->pages);
BUG_ON(!data->length);
page_count = calc_pages_for(data->alignment, (u64)data->length);
BUG_ON(page_count > (int) USHRT_MAX);
cursor->resid = data->length;
cursor->page_offset = data->alignment & ~PAGE_MASK;
cursor->page_index = 0;
cursor->page_count = (unsigned short) page_count;
cursor->last_piece = cursor->page_count == 1;
}
static struct page *ceph_msg_data_pages_next(struct ceph_msg_data *data,
size_t *page_offset,
size_t *length)
{
struct ceph_msg_data_cursor *cursor = &data->cursor;
BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
BUG_ON(cursor->page_index >= cursor->page_count);
BUG_ON(cursor->page_offset >= PAGE_SIZE);
BUG_ON(!cursor->resid);
*page_offset = cursor->page_offset;
if (cursor->last_piece) {
BUG_ON(*page_offset + cursor->resid > PAGE_SIZE);
*length = cursor->resid;
} else {
*length = PAGE_SIZE - *page_offset;
}
return data->pages[cursor->page_index];
}
static bool ceph_msg_data_pages_advance(struct ceph_msg_data *data,
size_t bytes)
{
struct ceph_msg_data_cursor *cursor = &data->cursor;
BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
BUG_ON(cursor->page_offset + bytes > PAGE_SIZE);
BUG_ON(bytes > cursor->resid);
/* Advance the cursor page offset */
cursor->resid -= bytes;
cursor->page_offset += bytes;
if (!bytes || cursor->page_offset & ~PAGE_MASK)
return false; /* more bytes to process in the current page */
/* Move on to the next page */
BUG_ON(cursor->page_index >= cursor->page_count);
cursor->page_offset = 0;
cursor->page_index++;
cursor->last_piece = cursor->page_index == cursor->page_count - 1;
return true;
}
/* /*
* For a pagelist, a piece is whatever remains to be consumed in the * For a pagelist, a piece is whatever remains to be consumed in the
* first page in the list, or the front of the next page. * first page in the list, or the front of the next page.
...@@ -932,13 +1005,15 @@ static void ceph_msg_data_cursor_init(struct ceph_msg_data *data) ...@@ -932,13 +1005,15 @@ static void ceph_msg_data_cursor_init(struct ceph_msg_data *data)
case CEPH_MSG_DATA_PAGELIST: case CEPH_MSG_DATA_PAGELIST:
ceph_msg_data_pagelist_cursor_init(data); ceph_msg_data_pagelist_cursor_init(data);
break; break;
case CEPH_MSG_DATA_PAGES:
ceph_msg_data_pages_cursor_init(data);
break;
#ifdef CONFIG_BLOCK #ifdef CONFIG_BLOCK
case CEPH_MSG_DATA_BIO: case CEPH_MSG_DATA_BIO:
ceph_msg_data_bio_cursor_init(data); ceph_msg_data_bio_cursor_init(data);
break; break;
#endif /* CONFIG_BLOCK */ #endif /* CONFIG_BLOCK */
case CEPH_MSG_DATA_NONE: case CEPH_MSG_DATA_NONE:
case CEPH_MSG_DATA_PAGES:
default: default:
/* BUG(); */ /* BUG(); */
break; break;
...@@ -961,13 +1036,15 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data *data, ...@@ -961,13 +1036,15 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data *data,
case CEPH_MSG_DATA_PAGELIST: case CEPH_MSG_DATA_PAGELIST:
page = ceph_msg_data_pagelist_next(data, page_offset, length); page = ceph_msg_data_pagelist_next(data, page_offset, length);
break; break;
case CEPH_MSG_DATA_PAGES:
page = ceph_msg_data_pages_next(data, page_offset, length);
break;
#ifdef CONFIG_BLOCK #ifdef CONFIG_BLOCK
case CEPH_MSG_DATA_BIO: case CEPH_MSG_DATA_BIO:
page = ceph_msg_data_bio_next(data, page_offset, length); page = ceph_msg_data_bio_next(data, page_offset, length);
break; break;
#endif /* CONFIG_BLOCK */ #endif /* CONFIG_BLOCK */
case CEPH_MSG_DATA_NONE: case CEPH_MSG_DATA_NONE:
case CEPH_MSG_DATA_PAGES:
default: default:
page = NULL; page = NULL;
break; break;
...@@ -993,13 +1070,15 @@ static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes) ...@@ -993,13 +1070,15 @@ static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
case CEPH_MSG_DATA_PAGELIST: case CEPH_MSG_DATA_PAGELIST:
new_piece = ceph_msg_data_pagelist_advance(data, bytes); new_piece = ceph_msg_data_pagelist_advance(data, bytes);
break; break;
case CEPH_MSG_DATA_PAGES:
new_piece = ceph_msg_data_pages_advance(data, bytes);
break;
#ifdef CONFIG_BLOCK #ifdef CONFIG_BLOCK
case CEPH_MSG_DATA_BIO: case CEPH_MSG_DATA_BIO:
new_piece = ceph_msg_data_bio_advance(data, bytes); new_piece = ceph_msg_data_bio_advance(data, bytes);
break; break;
#endif /* CONFIG_BLOCK */ #endif /* CONFIG_BLOCK */
case CEPH_MSG_DATA_NONE: case CEPH_MSG_DATA_NONE:
case CEPH_MSG_DATA_PAGES:
default: default:
BUG(); BUG();
break; break;
...@@ -1032,6 +1111,8 @@ static void prepare_message_data(struct ceph_msg *msg, ...@@ -1032,6 +1111,8 @@ static void prepare_message_data(struct ceph_msg *msg,
if (ceph_msg_has_bio(msg)) if (ceph_msg_has_bio(msg))
ceph_msg_data_cursor_init(&msg->b); ceph_msg_data_cursor_init(&msg->b);
#endif /* CONFIG_BLOCK */ #endif /* CONFIG_BLOCK */
if (ceph_msg_has_pages(msg))
ceph_msg_data_cursor_init(&msg->p);
if (ceph_msg_has_pagelist(msg)) if (ceph_msg_has_pagelist(msg))
ceph_msg_data_cursor_init(&msg->l); ceph_msg_data_cursor_init(&msg->l);
if (ceph_msg_has_trail(msg)) if (ceph_msg_has_trail(msg))
...@@ -1330,6 +1411,8 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page, ...@@ -1330,6 +1411,8 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
msg_pos->page_pos += sent; msg_pos->page_pos += sent;
if (in_trail) if (in_trail)
need_crc = ceph_msg_data_advance(&msg->t, sent); need_crc = ceph_msg_data_advance(&msg->t, sent);
else if (ceph_msg_has_pages(msg))
need_crc = ceph_msg_data_advance(&msg->p, sent);
else if (ceph_msg_has_pagelist(msg)) else if (ceph_msg_has_pagelist(msg))
need_crc = ceph_msg_data_advance(&msg->l, sent); need_crc = ceph_msg_data_advance(&msg->l, sent);
#ifdef CONFIG_BLOCK #ifdef CONFIG_BLOCK
...@@ -1435,7 +1518,9 @@ static int write_partial_message_data(struct ceph_connection *con) ...@@ -1435,7 +1518,9 @@ static int write_partial_message_data(struct ceph_connection *con)
page = ceph_msg_data_next(&msg->t, &page_offset, page = ceph_msg_data_next(&msg->t, &page_offset,
&length, &last_piece); &length, &last_piece);
} else if (ceph_msg_has_pages(msg)) { } else if (ceph_msg_has_pages(msg)) {
page = msg->p.pages[msg_pos->page]; use_cursor = true;
page = ceph_msg_data_next(&msg->p, &page_offset,
&length, &last_piece);
} else if (ceph_msg_has_pagelist(msg)) { } else if (ceph_msg_has_pagelist(msg)) {
use_cursor = true; use_cursor = true;
page = ceph_msg_data_next(&msg->l, &page_offset, page = ceph_msg_data_next(&msg->l, &page_offset,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment