Commit 5b64640c authored by Yan, Zheng's avatar Yan, Zheng Committed by Ilya Dryomov

ceph: scattered page writeback

This patch makes ceph_writepages_start() try using single OSD request
to write all dirty pages within a strip unit. When a nonconsecutive
dirty page is found, ceph_writepages_start() tries starting a new write
operation to existing OSD request. If it succeeds, it uses the new
operation to writeback the dirty page.
Signed-off-by: default avatarYan, Zheng <zyan@redhat.com>
parent 2c63f49a
...@@ -606,71 +606,71 @@ static void writepages_finish(struct ceph_osd_request *req, ...@@ -606,71 +606,71 @@ static void writepages_finish(struct ceph_osd_request *req,
struct inode *inode = req->r_inode; struct inode *inode = req->r_inode;
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_osd_data *osd_data; struct ceph_osd_data *osd_data;
unsigned wrote;
struct page *page; struct page *page;
int num_pages; int num_pages, total_pages = 0;
int i; int i, j;
int rc = req->r_result;
struct ceph_snap_context *snapc = req->r_snapc; struct ceph_snap_context *snapc = req->r_snapc;
struct address_space *mapping = inode->i_mapping; struct address_space *mapping = inode->i_mapping;
int rc = req->r_result;
u64 bytes = req->r_ops[0].extent.length;
struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
long writeback_stat; bool remove_page;
unsigned issued = ceph_caps_issued(ci);
dout("writepages_finish %p rc %d\n", inode, rc);
if (rc < 0)
mapping_set_error(mapping, rc);
osd_data = osd_req_op_extent_osd_data(req, 0);
BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
num_pages = calc_pages_for((u64)osd_data->alignment,
(u64)osd_data->length);
if (rc >= 0) {
/* /*
* Assume we wrote the pages we originally sent. The * We lost the cache cap, need to truncate the page before
* osd might reply with fewer pages if our writeback * it is unlocked, otherwise we'd truncate it later in the
* raced with a truncation and was adjusted at the osd, * page truncation thread, possibly losing some data that
* so don't believe the reply. * raced its way in
*/ */
wrote = num_pages; remove_page = !(ceph_caps_issued(ci) &
} else { (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
wrote = 0;
mapping_set_error(mapping, rc);
}
dout("writepages_finish %p rc %d bytes %llu wrote %d (pages)\n",
inode, rc, bytes, wrote);
/* clean all pages */ /* clean all pages */
for (i = 0; i < num_pages; i++) { for (i = 0; i < req->r_num_ops; i++) {
page = osd_data->pages[i]; if (req->r_ops[i].op != CEPH_OSD_OP_WRITE)
break;
osd_data = osd_req_op_extent_osd_data(req, i);
BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
num_pages = calc_pages_for((u64)osd_data->alignment,
(u64)osd_data->length);
total_pages += num_pages;
for (j = 0; j < num_pages; j++) {
page = osd_data->pages[j];
BUG_ON(!page); BUG_ON(!page);
WARN_ON(!PageUptodate(page)); WARN_ON(!PageUptodate(page));
writeback_stat = if (atomic_long_dec_return(&fsc->writeback_count) <
atomic_long_dec_return(&fsc->writeback_count); CONGESTION_OFF_THRESH(
if (writeback_stat < fsc->mount_options->congestion_kb))
CONGESTION_OFF_THRESH(fsc->mount_options->congestion_kb))
clear_bdi_congested(&fsc->backing_dev_info, clear_bdi_congested(&fsc->backing_dev_info,
BLK_RW_ASYNC); BLK_RW_ASYNC);
ceph_put_snap_context(page_snap_context(page)); ceph_put_snap_context(page_snap_context(page));
page->private = 0; page->private = 0;
ClearPagePrivate(page); ClearPagePrivate(page);
dout("unlocking %d %p\n", i, page); dout("unlocking %p\n", page);
end_page_writeback(page); end_page_writeback(page);
/* if (remove_page)
* We lost the cache cap, need to truncate the page before generic_error_remove_page(inode->i_mapping,
* it is unlocked, otherwise we'd truncate it later in the page);
* page truncation thread, possibly losing some data that
* raced its way in
*/
if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0)
generic_error_remove_page(inode->i_mapping, page);
unlock_page(page); unlock_page(page);
} }
dout("%p wrote+cleaned %d pages\n", inode, wrote); dout("writepages_finish %p wrote %llu bytes cleaned %d pages\n",
ceph_put_wrbuffer_cap_refs(ci, num_pages, snapc); inode, osd_data->length, rc >= 0 ? num_pages : 0);
ceph_release_pages(osd_data->pages, num_pages); ceph_release_pages(osd_data->pages, num_pages);
}
ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc);
osd_data = osd_req_op_extent_osd_data(req, 0);
if (osd_data->pages_from_pool) if (osd_data->pages_from_pool)
mempool_free(osd_data->pages, mempool_free(osd_data->pages,
ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool); ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool);
...@@ -778,17 +778,15 @@ static int ceph_writepages_start(struct address_space *mapping, ...@@ -778,17 +778,15 @@ static int ceph_writepages_start(struct address_space *mapping,
while (!done && index <= end) { while (!done && index <= end) {
unsigned i; unsigned i;
int first; int first;
pgoff_t next; pgoff_t strip_unit_end = 0;
int pvec_pages, locked_pages; int num_ops = 0, op_idx;
struct page **pages = NULL; int pvec_pages, locked_pages = 0;
struct page **pages = NULL, **data_pages;
mempool_t *pool = NULL; /* Becomes non-null if mempool used */ mempool_t *pool = NULL; /* Becomes non-null if mempool used */
struct page *page; struct page *page;
int want; int want;
u64 offset, len; u64 offset = 0, len = 0;
long writeback_stat;
next = 0;
locked_pages = 0;
max_pages = max_pages_ever; max_pages = max_pages_ever;
get_more_pages: get_more_pages:
...@@ -824,8 +822,8 @@ static int ceph_writepages_start(struct address_space *mapping, ...@@ -824,8 +822,8 @@ static int ceph_writepages_start(struct address_space *mapping,
unlock_page(page); unlock_page(page);
break; break;
} }
if (next && (page->index != next)) { if (strip_unit_end && (page->index > strip_unit_end)) {
dout("not consecutive %p\n", page); dout("end of strip unit %p\n", page);
unlock_page(page); unlock_page(page);
break; break;
} }
...@@ -867,36 +865,31 @@ static int ceph_writepages_start(struct address_space *mapping, ...@@ -867,36 +865,31 @@ static int ceph_writepages_start(struct address_space *mapping,
/* /*
* We have something to write. If this is * We have something to write. If this is
* the first locked page this time through, * the first locked page this time through,
* allocate an osd request and a page array * calculate max possinle write size and
* that it will use. * allocate a page array
*/ */
if (locked_pages == 0) { if (locked_pages == 0) {
BUG_ON(pages); u64 objnum;
u64 objoff;
/* prepare async write request */ /* prepare async write request */
offset = (u64)page_offset(page); offset = (u64)page_offset(page);
len = wsize; len = wsize;
req = ceph_osdc_new_request(&fsc->client->osdc,
&ci->i_layout, vino, rc = ceph_calc_file_object_mapping(&ci->i_layout,
offset, &len, 0, offset, len,
do_sync ? 2 : 1, &objnum, &objoff,
CEPH_OSD_OP_WRITE, &len);
CEPH_OSD_FLAG_WRITE | if (rc < 0) {
CEPH_OSD_FLAG_ONDISK,
snapc, truncate_seq,
truncate_size, true);
if (IS_ERR(req)) {
rc = PTR_ERR(req);
unlock_page(page); unlock_page(page);
break; break;
} }
if (do_sync) num_ops = 1 + do_sync;
osd_req_op_init(req, 1, strip_unit_end = page->index +
CEPH_OSD_OP_STARTSYNC, 0); ((len - 1) >> PAGE_CACHE_SHIFT);
req->r_callback = writepages_finish;
req->r_inode = inode;
BUG_ON(pages);
max_pages = calc_pages_for(0, (u64)len); max_pages = calc_pages_for(0, (u64)len);
pages = kmalloc(max_pages * sizeof (*pages), pages = kmalloc(max_pages * sizeof (*pages),
GFP_NOFS); GFP_NOFS);
...@@ -905,6 +898,20 @@ static int ceph_writepages_start(struct address_space *mapping, ...@@ -905,6 +898,20 @@ static int ceph_writepages_start(struct address_space *mapping,
pages = mempool_alloc(pool, GFP_NOFS); pages = mempool_alloc(pool, GFP_NOFS);
BUG_ON(!pages); BUG_ON(!pages);
} }
len = 0;
} else if (page->index !=
(offset + len) >> PAGE_CACHE_SHIFT) {
if (num_ops >= (pool ? CEPH_OSD_SLAB_OPS :
CEPH_OSD_MAX_OPS)) {
redirty_page_for_writepage(wbc, page);
unlock_page(page);
break;
}
num_ops++;
offset = (u64)page_offset(page);
len = 0;
} }
/* note position of first page in pvec */ /* note position of first page in pvec */
...@@ -913,18 +920,16 @@ static int ceph_writepages_start(struct address_space *mapping, ...@@ -913,18 +920,16 @@ static int ceph_writepages_start(struct address_space *mapping,
dout("%p will write page %p idx %lu\n", dout("%p will write page %p idx %lu\n",
inode, page, page->index); inode, page, page->index);
writeback_stat = if (atomic_long_inc_return(&fsc->writeback_count) >
atomic_long_inc_return(&fsc->writeback_count); CONGESTION_ON_THRESH(
if (writeback_stat > CONGESTION_ON_THRESH(
fsc->mount_options->congestion_kb)) { fsc->mount_options->congestion_kb)) {
set_bdi_congested(&fsc->backing_dev_info, set_bdi_congested(&fsc->backing_dev_info,
BLK_RW_ASYNC); BLK_RW_ASYNC);
} }
set_page_writeback(page);
pages[locked_pages] = page; pages[locked_pages] = page;
locked_pages++; locked_pages++;
next = page->index + 1; len += PAGE_CACHE_SIZE;
} }
/* did we get anything? */ /* did we get anything? */
...@@ -944,38 +949,119 @@ static int ceph_writepages_start(struct address_space *mapping, ...@@ -944,38 +949,119 @@ static int ceph_writepages_start(struct address_space *mapping,
/* shift unused pages over in the pvec... we /* shift unused pages over in the pvec... we
* will need to release them below. */ * will need to release them below. */
for (j = i; j < pvec_pages; j++) { for (j = i; j < pvec_pages; j++) {
dout(" pvec leftover page %p\n", dout(" pvec leftover page %p\n", pvec.pages[j]);
pvec.pages[j]);
pvec.pages[j-i+first] = pvec.pages[j]; pvec.pages[j-i+first] = pvec.pages[j];
} }
pvec.nr -= i-first; pvec.nr -= i-first;
} }
/* Format the osd request message and submit the write */ new_request:
offset = page_offset(pages[0]); offset = page_offset(pages[0]);
len = (u64)locked_pages << PAGE_CACHE_SHIFT; len = wsize;
if (snap_size == -1) {
len = min(len, (u64)i_size_read(inode) - offset); req = ceph_osdc_new_request(&fsc->client->osdc,
&ci->i_layout, vino,
offset, &len, 0, num_ops,
CEPH_OSD_OP_WRITE,
CEPH_OSD_FLAG_WRITE |
CEPH_OSD_FLAG_ONDISK,
snapc, truncate_seq,
truncate_size, false);
if (IS_ERR(req)) {
req = ceph_osdc_new_request(&fsc->client->osdc,
&ci->i_layout, vino,
offset, &len, 0,
min(num_ops,
CEPH_OSD_SLAB_OPS),
CEPH_OSD_OP_WRITE,
CEPH_OSD_FLAG_WRITE |
CEPH_OSD_FLAG_ONDISK,
snapc, truncate_seq,
truncate_size, true);
BUG_ON(IS_ERR(req));
}
BUG_ON(len < page_offset(pages[locked_pages - 1]) +
PAGE_CACHE_SIZE - offset);
req->r_callback = writepages_finish;
req->r_inode = inode;
/* Format the osd request message and submit the write */
len = 0;
data_pages = pages;
op_idx = 0;
for (i = 0; i < locked_pages; i++) {
u64 cur_offset = page_offset(pages[i]);
if (offset + len != cur_offset) {
if (op_idx + do_sync + 1 == req->r_num_ops)
break;
osd_req_op_extent_dup_last(req, op_idx,
cur_offset - offset);
dout("writepages got pages at %llu~%llu\n",
offset, len);
osd_req_op_extent_osd_data_pages(req, op_idx,
data_pages, len, 0,
!!pool, false);
osd_req_op_extent_update(req, op_idx, len);
len = 0;
offset = cur_offset;
data_pages = pages + i;
op_idx++;
}
set_page_writeback(pages[i]);
len += PAGE_CACHE_SIZE;
}
if (snap_size != -1) {
len = min(len, snap_size - offset);
} else if (i == locked_pages) {
/* writepages_finish() clears writeback pages /* writepages_finish() clears writeback pages
* according to the data length, so make sure * according to the data length, so make sure
* data length covers all locked pages */ * data length covers all locked pages */
len = max(len, 1 + u64 min_len = len + 1 - PAGE_CACHE_SIZE;
((u64)(locked_pages - 1) << PAGE_CACHE_SHIFT)); len = min(len, (u64)i_size_read(inode) - offset);
} else { len = max(len, min_len);
len = min(len, snap_size - offset);
} }
dout("writepages got %d pages at %llu~%llu\n", dout("writepages got pages at %llu~%llu\n", offset, len);
locked_pages, offset, len);
osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, osd_req_op_extent_osd_data_pages(req, op_idx, data_pages, len,
!!pool, false); 0, !!pool, false);
osd_req_op_extent_update(req, op_idx, len);
pages = NULL; /* request message now owns the pages array */ if (do_sync) {
pool = NULL; op_idx++;
osd_req_op_init(req, op_idx, CEPH_OSD_OP_STARTSYNC, 0);
/* Update the write op length in case we changed it */ }
BUG_ON(op_idx + 1 != req->r_num_ops);
osd_req_op_extent_update(req, 0, len); pool = NULL;
if (i < locked_pages) {
BUG_ON(num_ops <= req->r_num_ops);
num_ops -= req->r_num_ops;
num_ops += do_sync;
locked_pages -= i;
/* allocate new pages array for next request */
data_pages = pages;
pages = kmalloc(locked_pages * sizeof (*pages),
GFP_NOFS);
if (!pages) {
pool = fsc->wb_pagevec_pool;
pages = mempool_alloc(pool, GFP_NOFS);
BUG_ON(!pages);
}
memcpy(pages, data_pages + i,
locked_pages * sizeof(*pages));
memset(data_pages + i, 0,
locked_pages * sizeof(*pages));
} else {
BUG_ON(num_ops != req->r_num_ops);
index = pages[i - 1]->index + 1;
/* request message now owns the pages array */
pages = NULL;
}
vino = ceph_vino(inode); vino = ceph_vino(inode);
ceph_osdc_build_request(req, offset, snapc, vino.snap, ceph_osdc_build_request(req, offset, snapc, vino.snap,
...@@ -985,9 +1071,10 @@ static int ceph_writepages_start(struct address_space *mapping, ...@@ -985,9 +1071,10 @@ static int ceph_writepages_start(struct address_space *mapping,
BUG_ON(rc); BUG_ON(rc);
req = NULL; req = NULL;
/* continue? */ wbc->nr_to_write -= i;
index = next; if (pages)
wbc->nr_to_write -= locked_pages; goto new_request;
if (wbc->nr_to_write <= 0) if (wbc->nr_to_write <= 0)
done = 1; done = 1;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment