Commit 00e3f5cc authored by Linus Torvalds's avatar Linus Torvalds

Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

Pull Ceph updates from Sage Weil:
 "The two main changes are aio support in CephFS, and a series that
  fixes several issues in the authentication key timeout/renewal code.

  On top of that are a variety of cleanups and minor bug fixes"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
  libceph: remove outdated comment
  libceph: kill off ceph_x_ticket_handler::validity
  libceph: invalidate AUTH in addition to a service ticket
  libceph: fix authorizer invalidation, take 2
  libceph: clear messenger auth_retry flag if we fault
  libceph: fix ceph_msg_revoke()
  libceph: use list_for_each_entry_safe
  ceph: use i_size_{read,write} to get/set i_size
  ceph: re-send AIO write request when getting -EOLDSNAP error
  ceph: Asynchronous IO support
  ceph: Avoid to propagate the invalid page point
  ceph: fix double page_unlock() in page_mkwrite()
  rbd: delete an unnecessary check before rbd_dev_destroy()
  libceph: use list_next_entry instead of list_entry_next
  ceph: ceph_frag_contains_value can be boolean
  ceph: remove unused functions in ceph_frag.h
parents 772950ed 7e01726a
...@@ -5185,8 +5185,7 @@ static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth) ...@@ -5185,8 +5185,7 @@ static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
out_err: out_err:
rbd_dev_unparent(rbd_dev); rbd_dev_unparent(rbd_dev);
if (parent) rbd_dev_destroy(parent);
rbd_dev_destroy(parent);
return ret; return ret;
} }
......
...@@ -1108,7 +1108,7 @@ static int ceph_update_writeable_page(struct file *file, ...@@ -1108,7 +1108,7 @@ static int ceph_update_writeable_page(struct file *file,
return 0; return 0;
/* past end of file? */ /* past end of file? */
i_size = inode->i_size; /* caller holds i_mutex */ i_size = i_size_read(inode);
if (page_off >= i_size || if (page_off >= i_size ||
(pos_in_page == 0 && (pos+len) >= i_size && (pos_in_page == 0 && (pos+len) >= i_size &&
...@@ -1149,7 +1149,6 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping, ...@@ -1149,7 +1149,6 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
page = grab_cache_page_write_begin(mapping, index, 0); page = grab_cache_page_write_begin(mapping, index, 0);
if (!page) if (!page)
return -ENOMEM; return -ENOMEM;
*pagep = page;
dout("write_begin file %p inode %p page %p %d~%d\n", file, dout("write_begin file %p inode %p page %p %d~%d\n", file,
inode, page, (int)pos, (int)len); inode, page, (int)pos, (int)len);
...@@ -1184,8 +1183,7 @@ static int ceph_write_end(struct file *file, struct address_space *mapping, ...@@ -1184,8 +1183,7 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
zero_user_segment(page, from+copied, len); zero_user_segment(page, from+copied, len);
/* did file size increase? */ /* did file size increase? */
/* (no need for i_size_read(); we caller holds i_mutex */ if (pos+copied > i_size_read(inode))
if (pos+copied > inode->i_size)
check_cap = ceph_inode_set_size(inode, pos+copied); check_cap = ceph_inode_set_size(inode, pos+copied);
if (!PageUptodate(page)) if (!PageUptodate(page))
...@@ -1378,11 +1376,13 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) ...@@ -1378,11 +1376,13 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
ret = VM_FAULT_NOPAGE; ret = VM_FAULT_NOPAGE;
if ((off > size) || if ((off > size) ||
(page->mapping != inode->i_mapping)) (page->mapping != inode->i_mapping)) {
unlock_page(page);
goto out; goto out;
}
ret = ceph_update_writeable_page(vma->vm_file, off, len, page); ret = ceph_update_writeable_page(vma->vm_file, off, len, page);
if (ret == 0) { if (ret >= 0) {
/* success. we'll keep the page locked. */ /* success. we'll keep the page locked. */
set_page_dirty(page); set_page_dirty(page);
ret = VM_FAULT_LOCKED; ret = VM_FAULT_LOCKED;
...@@ -1393,8 +1393,6 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) ...@@ -1393,8 +1393,6 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
ret = VM_FAULT_SIGBUS; ret = VM_FAULT_SIGBUS;
} }
out: out:
if (ret != VM_FAULT_LOCKED)
unlock_page(page);
if (ret == VM_FAULT_LOCKED || if (ret == VM_FAULT_LOCKED ||
ci->i_inline_version != CEPH_INLINE_NONE) { ci->i_inline_version != CEPH_INLINE_NONE) {
int dirty; int dirty;
......
...@@ -106,7 +106,7 @@ static uint16_t ceph_fscache_inode_get_aux(const void *cookie_netfs_data, ...@@ -106,7 +106,7 @@ static uint16_t ceph_fscache_inode_get_aux(const void *cookie_netfs_data,
memset(&aux, 0, sizeof(aux)); memset(&aux, 0, sizeof(aux));
aux.mtime = inode->i_mtime; aux.mtime = inode->i_mtime;
aux.size = inode->i_size; aux.size = i_size_read(inode);
memcpy(buffer, &aux, sizeof(aux)); memcpy(buffer, &aux, sizeof(aux));
...@@ -117,9 +117,7 @@ static void ceph_fscache_inode_get_attr(const void *cookie_netfs_data, ...@@ -117,9 +117,7 @@ static void ceph_fscache_inode_get_attr(const void *cookie_netfs_data,
uint64_t *size) uint64_t *size)
{ {
const struct ceph_inode_info* ci = cookie_netfs_data; const struct ceph_inode_info* ci = cookie_netfs_data;
const struct inode* inode = &ci->vfs_inode; *size = i_size_read(&ci->vfs_inode);
*size = inode->i_size;
} }
static enum fscache_checkaux ceph_fscache_inode_check_aux( static enum fscache_checkaux ceph_fscache_inode_check_aux(
...@@ -134,7 +132,7 @@ static enum fscache_checkaux ceph_fscache_inode_check_aux( ...@@ -134,7 +132,7 @@ static enum fscache_checkaux ceph_fscache_inode_check_aux(
memset(&aux, 0, sizeof(aux)); memset(&aux, 0, sizeof(aux));
aux.mtime = inode->i_mtime; aux.mtime = inode->i_mtime;
aux.size = inode->i_size; aux.size = i_size_read(inode);
if (memcmp(data, &aux, sizeof(aux)) != 0) if (memcmp(data, &aux, sizeof(aux)) != 0)
return FSCACHE_CHECKAUX_OBSOLETE; return FSCACHE_CHECKAUX_OBSOLETE;
......
...@@ -397,8 +397,9 @@ int ceph_release(struct inode *inode, struct file *file) ...@@ -397,8 +397,9 @@ int ceph_release(struct inode *inode, struct file *file)
} }
enum { enum {
CHECK_EOF = 1, HAVE_RETRIED = 1,
READ_INLINE = 2, CHECK_EOF = 2,
READ_INLINE = 3,
}; };
/* /*
...@@ -411,17 +412,15 @@ enum { ...@@ -411,17 +412,15 @@ enum {
static int striped_read(struct inode *inode, static int striped_read(struct inode *inode,
u64 off, u64 len, u64 off, u64 len,
struct page **pages, int num_pages, struct page **pages, int num_pages,
int *checkeof, bool o_direct, int *checkeof)
unsigned long buf_align)
{ {
struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
u64 pos, this_len, left; u64 pos, this_len, left;
int io_align, page_align; loff_t i_size;
int pages_left; int page_align, pages_left;
int read; int read, ret;
struct page **page_pos; struct page **page_pos;
int ret;
bool hit_stripe, was_short; bool hit_stripe, was_short;
/* /*
...@@ -432,13 +431,9 @@ static int striped_read(struct inode *inode, ...@@ -432,13 +431,9 @@ static int striped_read(struct inode *inode,
page_pos = pages; page_pos = pages;
pages_left = num_pages; pages_left = num_pages;
read = 0; read = 0;
io_align = off & ~PAGE_MASK;
more: more:
if (o_direct) page_align = pos & ~PAGE_MASK;
page_align = (pos - io_align + buf_align) & ~PAGE_MASK;
else
page_align = pos & ~PAGE_MASK;
this_len = left; this_len = left;
ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode), ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
&ci->i_layout, pos, &this_len, &ci->i_layout, pos, &this_len,
...@@ -452,13 +447,12 @@ static int striped_read(struct inode *inode, ...@@ -452,13 +447,12 @@ static int striped_read(struct inode *inode,
dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, left, read, dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, left, read,
ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : ""); ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : "");
i_size = i_size_read(inode);
if (ret >= 0) { if (ret >= 0) {
int didpages; int didpages;
if (was_short && (pos + ret < inode->i_size)) { if (was_short && (pos + ret < i_size)) {
int zlen = min(this_len - ret, int zlen = min(this_len - ret, i_size - pos - ret);
inode->i_size - pos - ret); int zoff = (off & ~PAGE_MASK) + read + ret;
int zoff = (o_direct ? buf_align : io_align) +
read + ret;
dout(" zero gap %llu to %llu\n", dout(" zero gap %llu to %llu\n",
pos + ret, pos + ret + zlen); pos + ret, pos + ret + zlen);
ceph_zero_page_vector_range(zoff, zlen, pages); ceph_zero_page_vector_range(zoff, zlen, pages);
...@@ -473,14 +467,14 @@ static int striped_read(struct inode *inode, ...@@ -473,14 +467,14 @@ static int striped_read(struct inode *inode,
pages_left -= didpages; pages_left -= didpages;
/* hit stripe and need continue*/ /* hit stripe and need continue*/
if (left && hit_stripe && pos < inode->i_size) if (left && hit_stripe && pos < i_size)
goto more; goto more;
} }
if (read > 0) { if (read > 0) {
ret = read; ret = read;
/* did we bounce off eof? */ /* did we bounce off eof? */
if (pos + left > inode->i_size) if (pos + left > i_size)
*checkeof = CHECK_EOF; *checkeof = CHECK_EOF;
} }
...@@ -521,54 +515,28 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i, ...@@ -521,54 +515,28 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
if (ret < 0) if (ret < 0)
return ret; return ret;
if (iocb->ki_flags & IOCB_DIRECT) { num_pages = calc_pages_for(off, len);
while (iov_iter_count(i)) { pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
size_t start; if (IS_ERR(pages))
ssize_t n; return PTR_ERR(pages);
ret = striped_read(inode, off, len, pages,
n = dio_get_pagev_size(i); num_pages, checkeof);
pages = dio_get_pages_alloc(i, n, &start, &num_pages); if (ret > 0) {
if (IS_ERR(pages)) int l, k = 0;
return PTR_ERR(pages); size_t left = ret;
ret = striped_read(inode, off, n, while (left) {
pages, num_pages, checkeof, size_t page_off = off & ~PAGE_MASK;
1, start); size_t copy = min_t(size_t, left,
PAGE_SIZE - page_off);
ceph_put_page_vector(pages, num_pages, true); l = copy_page_to_iter(pages[k++], page_off, copy, i);
off += l;
if (ret <= 0) left -= l;
break; if (l < copy)
off += ret;
iov_iter_advance(i, ret);
if (ret < n)
break; break;
} }
} else {
num_pages = calc_pages_for(off, len);
pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
if (IS_ERR(pages))
return PTR_ERR(pages);
ret = striped_read(inode, off, len, pages,
num_pages, checkeof, 0, 0);
if (ret > 0) {
int l, k = 0;
size_t left = ret;
while (left) {
size_t page_off = off & ~PAGE_MASK;
size_t copy = min_t(size_t,
PAGE_SIZE - page_off, left);
l = copy_page_to_iter(pages[k++], page_off,
copy, i);
off += l;
left -= l;
if (l < copy)
break;
}
}
ceph_release_page_vector(pages, num_pages);
} }
ceph_release_page_vector(pages, num_pages);
if (off > iocb->ki_pos) { if (off > iocb->ki_pos) {
ret = off - iocb->ki_pos; ret = off - iocb->ki_pos;
...@@ -579,6 +547,193 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i, ...@@ -579,6 +547,193 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
return ret; return ret;
} }
struct ceph_aio_request {
struct kiocb *iocb;
size_t total_len;
int write;
int error;
struct list_head osd_reqs;
unsigned num_reqs;
atomic_t pending_reqs;
struct timespec mtime;
struct ceph_cap_flush *prealloc_cf;
};
struct ceph_aio_work {
struct work_struct work;
struct ceph_osd_request *req;
};
static void ceph_aio_retry_work(struct work_struct *work);
static void ceph_aio_complete(struct inode *inode,
struct ceph_aio_request *aio_req)
{
struct ceph_inode_info *ci = ceph_inode(inode);
int ret;
if (!atomic_dec_and_test(&aio_req->pending_reqs))
return;
ret = aio_req->error;
if (!ret)
ret = aio_req->total_len;
dout("ceph_aio_complete %p rc %d\n", inode, ret);
if (ret >= 0 && aio_req->write) {
int dirty;
loff_t endoff = aio_req->iocb->ki_pos + aio_req->total_len;
if (endoff > i_size_read(inode)) {
if (ceph_inode_set_size(inode, endoff))
ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
}
spin_lock(&ci->i_ceph_lock);
ci->i_inline_version = CEPH_INLINE_NONE;
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
&aio_req->prealloc_cf);
spin_unlock(&ci->i_ceph_lock);
if (dirty)
__mark_inode_dirty(inode, dirty);
}
ceph_put_cap_refs(ci, (aio_req->write ? CEPH_CAP_FILE_WR :
CEPH_CAP_FILE_RD));
aio_req->iocb->ki_complete(aio_req->iocb, ret, 0);
ceph_free_cap_flush(aio_req->prealloc_cf);
kfree(aio_req);
}
static void ceph_aio_complete_req(struct ceph_osd_request *req,
struct ceph_msg *msg)
{
int rc = req->r_result;
struct inode *inode = req->r_inode;
struct ceph_aio_request *aio_req = req->r_priv;
struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
int num_pages = calc_pages_for((u64)osd_data->alignment,
osd_data->length);
dout("ceph_aio_complete_req %p rc %d bytes %llu\n",
inode, rc, osd_data->length);
if (rc == -EOLDSNAPC) {
struct ceph_aio_work *aio_work;
BUG_ON(!aio_req->write);
aio_work = kmalloc(sizeof(*aio_work), GFP_NOFS);
if (aio_work) {
INIT_WORK(&aio_work->work, ceph_aio_retry_work);
aio_work->req = req;
queue_work(ceph_inode_to_client(inode)->wb_wq,
&aio_work->work);
return;
}
rc = -ENOMEM;
} else if (!aio_req->write) {
if (rc == -ENOENT)
rc = 0;
if (rc >= 0 && osd_data->length > rc) {
int zoff = osd_data->alignment + rc;
int zlen = osd_data->length - rc;
/*
* If read is satisfied by single OSD request,
* it can pass EOF. Otherwise read is within
* i_size.
*/
if (aio_req->num_reqs == 1) {
loff_t i_size = i_size_read(inode);
loff_t endoff = aio_req->iocb->ki_pos + rc;
if (endoff < i_size)
zlen = min_t(size_t, zlen,
i_size - endoff);
aio_req->total_len = rc + zlen;
}
if (zlen > 0)
ceph_zero_page_vector_range(zoff, zlen,
osd_data->pages);
}
}
ceph_put_page_vector(osd_data->pages, num_pages, false);
ceph_osdc_put_request(req);
if (rc < 0)
cmpxchg(&aio_req->error, 0, rc);
ceph_aio_complete(inode, aio_req);
return;
}
static void ceph_aio_retry_work(struct work_struct *work)
{
struct ceph_aio_work *aio_work =
container_of(work, struct ceph_aio_work, work);
struct ceph_osd_request *orig_req = aio_work->req;
struct ceph_aio_request *aio_req = orig_req->r_priv;
struct inode *inode = orig_req->r_inode;
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_snap_context *snapc;
struct ceph_osd_request *req;
int ret;
spin_lock(&ci->i_ceph_lock);
if (__ceph_have_pending_cap_snap(ci)) {
struct ceph_cap_snap *capsnap =
list_last_entry(&ci->i_cap_snaps,
struct ceph_cap_snap,
ci_item);
snapc = ceph_get_snap_context(capsnap->context);
} else {
BUG_ON(!ci->i_head_snapc);
snapc = ceph_get_snap_context(ci->i_head_snapc);
}
spin_unlock(&ci->i_ceph_lock);
req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 2,
false, GFP_NOFS);
if (IS_ERR(req)) {
ret = PTR_ERR(req);
req = orig_req;
goto out;
}
req->r_flags = CEPH_OSD_FLAG_ORDERSNAP |
CEPH_OSD_FLAG_ONDISK |
CEPH_OSD_FLAG_WRITE;
req->r_base_oloc = orig_req->r_base_oloc;
req->r_base_oid = orig_req->r_base_oid;
req->r_ops[0] = orig_req->r_ops[0];
osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
ceph_osdc_build_request(req, req->r_ops[0].extent.offset,
snapc, CEPH_NOSNAP, &aio_req->mtime);
ceph_put_snap_context(snapc);
ceph_osdc_put_request(orig_req);
req->r_callback = ceph_aio_complete_req;
req->r_inode = inode;
req->r_priv = aio_req;
ret = ceph_osdc_start_request(req->r_osdc, req, false);
out:
if (ret < 0) {
BUG_ON(ret == -EOLDSNAPC);
req->r_result = ret;
ceph_aio_complete_req(req, NULL);
}
kfree(aio_work);
}
/* /*
* Write commit request unsafe callback, called to tell us when a * Write commit request unsafe callback, called to tell us when a
* request is unsafe (that is, in flight--has been handed to the * request is unsafe (that is, in flight--has been handed to the
...@@ -612,16 +767,10 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe) ...@@ -612,16 +767,10 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
} }
/*
* Synchronous write, straight from __user pointer or user pages.
*
* If write spans object boundary, just do multiple writes. (For a
* correct atomic write, we should e.g. take write locks on all
* objects, rollback on failure, etc.)
*/
static ssize_t static ssize_t
ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
struct ceph_snap_context *snapc) struct ceph_snap_context *snapc,
struct ceph_cap_flush **pcf)
{ {
struct file *file = iocb->ki_filp; struct file *file = iocb->ki_filp;
struct inode *inode = file_inode(file); struct inode *inode = file_inode(file);
...@@ -630,44 +779,52 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, ...@@ -630,44 +779,52 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
struct ceph_vino vino; struct ceph_vino vino;
struct ceph_osd_request *req; struct ceph_osd_request *req;
struct page **pages; struct page **pages;
int num_pages; struct ceph_aio_request *aio_req = NULL;
int written = 0; int num_pages = 0;
int flags; int flags;
int check_caps = 0;
int ret; int ret;
struct timespec mtime = CURRENT_TIME; struct timespec mtime = CURRENT_TIME;
size_t count = iov_iter_count(from); size_t count = iov_iter_count(iter);
loff_t pos = iocb->ki_pos;
bool write = iov_iter_rw(iter) == WRITE;
if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) if (write && ceph_snap(file_inode(file)) != CEPH_NOSNAP)
return -EROFS; return -EROFS;
dout("sync_direct_write on file %p %lld~%u\n", file, pos, dout("sync_direct_read_write (%s) on file %p %lld~%u\n",
(unsigned)count); (write ? "write" : "read"), file, pos, (unsigned)count);
ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count); ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
if (ret < 0) if (ret < 0)
return ret; return ret;
ret = invalidate_inode_pages2_range(inode->i_mapping, if (write) {
pos >> PAGE_CACHE_SHIFT, ret = invalidate_inode_pages2_range(inode->i_mapping,
(pos + count) >> PAGE_CACHE_SHIFT); pos >> PAGE_CACHE_SHIFT,
if (ret < 0) (pos + count) >> PAGE_CACHE_SHIFT);
dout("invalidate_inode_pages2_range returned %d\n", ret); if (ret < 0)
dout("invalidate_inode_pages2_range returned %d\n", ret);
flags = CEPH_OSD_FLAG_ORDERSNAP | flags = CEPH_OSD_FLAG_ORDERSNAP |
CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_ONDISK |
CEPH_OSD_FLAG_WRITE; CEPH_OSD_FLAG_WRITE;
} else {
flags = CEPH_OSD_FLAG_READ;
}
while (iov_iter_count(from) > 0) { while (iov_iter_count(iter) > 0) {
u64 len = dio_get_pagev_size(from); u64 size = dio_get_pagev_size(iter);
size_t start; size_t start = 0;
ssize_t n; ssize_t len;
vino = ceph_vino(inode); vino = ceph_vino(inode);
req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
vino, pos, &len, 0, vino, pos, &size, 0,
2,/*include a 'startsync' command*/ /*include a 'startsync' command*/
CEPH_OSD_OP_WRITE, flags, snapc, write ? 2 : 1,
write ? CEPH_OSD_OP_WRITE :
CEPH_OSD_OP_READ,
flags, snapc,
ci->i_truncate_seq, ci->i_truncate_seq,
ci->i_truncate_size, ci->i_truncate_size,
false); false);
...@@ -676,10 +833,8 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, ...@@ -676,10 +833,8 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
break; break;
} }
osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0); len = size;
pages = dio_get_pages_alloc(iter, len, &start, &num_pages);
n = len;
pages = dio_get_pages_alloc(from, len, &start, &num_pages);
if (IS_ERR(pages)) { if (IS_ERR(pages)) {
ceph_osdc_put_request(req); ceph_osdc_put_request(req);
ret = PTR_ERR(pages); ret = PTR_ERR(pages);
...@@ -687,47 +842,128 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, ...@@ -687,47 +842,128 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
} }
/* /*
* throw out any page cache pages in this range. this * To simplify error handling, allow AIO when IO within i_size
* may block. * or IO can be satisfied by single OSD request.
*/ */
truncate_inode_pages_range(inode->i_mapping, pos, if (pos == iocb->ki_pos && !is_sync_kiocb(iocb) &&
(pos+n) | (PAGE_CACHE_SIZE-1)); (len == count || pos + count <= i_size_read(inode))) {
osd_req_op_extent_osd_data_pages(req, 0, pages, n, start, aio_req = kzalloc(sizeof(*aio_req), GFP_KERNEL);
false, false); if (aio_req) {
aio_req->iocb = iocb;
aio_req->write = write;
INIT_LIST_HEAD(&aio_req->osd_reqs);
if (write) {
aio_req->mtime = mtime;
swap(aio_req->prealloc_cf, *pcf);
}
}
/* ignore error */
}
if (write) {
/*
* throw out any page cache pages in this range. this
* may block.
*/
truncate_inode_pages_range(inode->i_mapping, pos,
(pos+len) | (PAGE_CACHE_SIZE - 1));
osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
}
osd_req_op_extent_osd_data_pages(req, 0, pages, len, start,
false, false);
/* BUG_ON(vino.snap != CEPH_NOSNAP); */
ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); if (aio_req) {
aio_req->total_len += len;
aio_req->num_reqs++;
atomic_inc(&aio_req->pending_reqs);
req->r_callback = ceph_aio_complete_req;
req->r_inode = inode;
req->r_priv = aio_req;
list_add_tail(&req->r_unsafe_item, &aio_req->osd_reqs);
pos += len;
iov_iter_advance(iter, len);
continue;
}
ret = ceph_osdc_start_request(req->r_osdc, req, false);
if (!ret) if (!ret)
ret = ceph_osdc_wait_request(&fsc->client->osdc, req); ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
size = i_size_read(inode);
if (!write) {
if (ret == -ENOENT)
ret = 0;
if (ret >= 0 && ret < len && pos + ret < size) {
int zlen = min_t(size_t, len - ret,
size - pos - ret);
ceph_zero_page_vector_range(start + ret, zlen,
pages);
ret += zlen;
}
if (ret >= 0)
len = ret;
}
ceph_put_page_vector(pages, num_pages, false); ceph_put_page_vector(pages, num_pages, false);
ceph_osdc_put_request(req); ceph_osdc_put_request(req);
if (ret) if (ret < 0)
break; break;
pos += n;
written += n;
iov_iter_advance(from, n);
if (pos > i_size_read(inode)) { pos += len;
check_caps = ceph_inode_set_size(inode, pos); iov_iter_advance(iter, len);
if (check_caps)
if (!write && pos >= size)
break;
if (write && pos > size) {
if (ceph_inode_set_size(inode, pos))
ceph_check_caps(ceph_inode(inode), ceph_check_caps(ceph_inode(inode),
CHECK_CAPS_AUTHONLY, CHECK_CAPS_AUTHONLY,
NULL); NULL);
} }
} }
if (ret != -EOLDSNAPC && written > 0) { if (aio_req) {
if (aio_req->num_reqs == 0) {
kfree(aio_req);
return ret;
}
ceph_get_cap_refs(ci, write ? CEPH_CAP_FILE_WR :
CEPH_CAP_FILE_RD);
while (!list_empty(&aio_req->osd_reqs)) {
req = list_first_entry(&aio_req->osd_reqs,
struct ceph_osd_request,
r_unsafe_item);
list_del_init(&req->r_unsafe_item);
if (ret >= 0)
ret = ceph_osdc_start_request(req->r_osdc,
req, false);
if (ret < 0) {
BUG_ON(ret == -EOLDSNAPC);
req->r_result = ret;
ceph_aio_complete_req(req, NULL);
}
}
return -EIOCBQUEUED;
}
if (ret != -EOLDSNAPC && pos > iocb->ki_pos) {
ret = pos - iocb->ki_pos;
iocb->ki_pos = pos; iocb->ki_pos = pos;
ret = written;
} }
return ret; return ret;
} }
/* /*
* Synchronous write, straight from __user pointer or user pages. * Synchronous write, straight from __user pointer or user pages.
* *
...@@ -897,8 +1133,14 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to) ...@@ -897,8 +1133,14 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
ceph_cap_string(got)); ceph_cap_string(got));
if (ci->i_inline_version == CEPH_INLINE_NONE) { if (ci->i_inline_version == CEPH_INLINE_NONE) {
/* hmm, this isn't really async... */ if (!retry_op && (iocb->ki_flags & IOCB_DIRECT)) {
ret = ceph_sync_read(iocb, to, &retry_op); ret = ceph_direct_read_write(iocb, to,
NULL, NULL);
if (ret >= 0 && ret < len)
retry_op = CHECK_EOF;
} else {
ret = ceph_sync_read(iocb, to, &retry_op);
}
} else { } else {
retry_op = READ_INLINE; retry_op = READ_INLINE;
} }
...@@ -916,7 +1158,7 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to) ...@@ -916,7 +1158,7 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
pinned_page = NULL; pinned_page = NULL;
} }
ceph_put_cap_refs(ci, got); ceph_put_cap_refs(ci, got);
if (retry_op && ret >= 0) { if (retry_op > HAVE_RETRIED && ret >= 0) {
int statret; int statret;
struct page *page = NULL; struct page *page = NULL;
loff_t i_size; loff_t i_size;
...@@ -968,12 +1210,11 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to) ...@@ -968,12 +1210,11 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
if (retry_op == CHECK_EOF && iocb->ki_pos < i_size && if (retry_op == CHECK_EOF && iocb->ki_pos < i_size &&
ret < len) { ret < len) {
dout("sync_read hit hole, ppos %lld < size %lld" dout("sync_read hit hole, ppos %lld < size %lld"
", reading more\n", iocb->ki_pos, ", reading more\n", iocb->ki_pos, i_size);
inode->i_size);
read += ret; read += ret;
len -= ret; len -= ret;
retry_op = 0; retry_op = HAVE_RETRIED;
goto again; goto again;
} }
} }
...@@ -1052,7 +1293,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from) ...@@ -1052,7 +1293,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
} }
dout("aio_write %p %llx.%llx %llu~%zd getting caps. i_size %llu\n", dout("aio_write %p %llx.%llx %llu~%zd getting caps. i_size %llu\n",
inode, ceph_vinop(inode), pos, count, inode->i_size); inode, ceph_vinop(inode), pos, count, i_size_read(inode));
if (fi->fmode & CEPH_FILE_MODE_LAZY) if (fi->fmode & CEPH_FILE_MODE_LAZY)
want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO; want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
else else
...@@ -1088,8 +1329,8 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from) ...@@ -1088,8 +1329,8 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
/* we might need to revert back to that point */ /* we might need to revert back to that point */
data = *from; data = *from;
if (iocb->ki_flags & IOCB_DIRECT) if (iocb->ki_flags & IOCB_DIRECT)
written = ceph_sync_direct_write(iocb, &data, pos, written = ceph_direct_read_write(iocb, &data, snapc,
snapc); &prealloc_cf);
else else
written = ceph_sync_write(iocb, &data, pos, snapc); written = ceph_sync_write(iocb, &data, pos, snapc);
if (written == -EOLDSNAPC) { if (written == -EOLDSNAPC) {
...@@ -1104,7 +1345,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from) ...@@ -1104,7 +1345,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
iov_iter_advance(from, written); iov_iter_advance(from, written);
ceph_put_snap_context(snapc); ceph_put_snap_context(snapc);
} else { } else {
loff_t old_size = inode->i_size; loff_t old_size = i_size_read(inode);
/* /*
* No need to acquire the i_truncate_mutex. Because * No need to acquire the i_truncate_mutex. Because
* the MDS revokes Fwb caps before sending truncate * the MDS revokes Fwb caps before sending truncate
...@@ -1115,7 +1356,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from) ...@@ -1115,7 +1356,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
written = generic_perform_write(file, from, pos); written = generic_perform_write(file, from, pos);
if (likely(written >= 0)) if (likely(written >= 0))
iocb->ki_pos = pos + written; iocb->ki_pos = pos + written;
if (inode->i_size > old_size) if (i_size_read(inode) > old_size)
ceph_fscache_update_objectsize(inode); ceph_fscache_update_objectsize(inode);
inode_unlock(inode); inode_unlock(inode);
} }
...@@ -1160,6 +1401,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from) ...@@ -1160,6 +1401,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
static loff_t ceph_llseek(struct file *file, loff_t offset, int whence) static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
{ {
struct inode *inode = file->f_mapping->host; struct inode *inode = file->f_mapping->host;
loff_t i_size;
int ret; int ret;
inode_lock(inode); inode_lock(inode);
...@@ -1172,9 +1414,10 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence) ...@@ -1172,9 +1414,10 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
} }
} }
i_size = i_size_read(inode);
switch (whence) { switch (whence) {
case SEEK_END: case SEEK_END:
offset += inode->i_size; offset += i_size;
break; break;
case SEEK_CUR: case SEEK_CUR:
/* /*
...@@ -1190,17 +1433,17 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence) ...@@ -1190,17 +1433,17 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
offset += file->f_pos; offset += file->f_pos;
break; break;
case SEEK_DATA: case SEEK_DATA:
if (offset >= inode->i_size) { if (offset >= i_size) {
ret = -ENXIO; ret = -ENXIO;
goto out; goto out;
} }
break; break;
case SEEK_HOLE: case SEEK_HOLE:
if (offset >= inode->i_size) { if (offset >= i_size) {
ret = -ENXIO; ret = -ENXIO;
goto out; goto out;
} }
offset = inode->i_size; offset = i_size;
break; break;
} }
......
...@@ -548,7 +548,7 @@ int ceph_fill_file_size(struct inode *inode, int issued, ...@@ -548,7 +548,7 @@ int ceph_fill_file_size(struct inode *inode, int issued,
if (ceph_seq_cmp(truncate_seq, ci->i_truncate_seq) > 0 || if (ceph_seq_cmp(truncate_seq, ci->i_truncate_seq) > 0 ||
(truncate_seq == ci->i_truncate_seq && size > inode->i_size)) { (truncate_seq == ci->i_truncate_seq && size > inode->i_size)) {
dout("size %lld -> %llu\n", inode->i_size, size); dout("size %lld -> %llu\n", inode->i_size, size);
inode->i_size = size; i_size_write(inode, size);
inode->i_blocks = (size + (1<<9) - 1) >> 9; inode->i_blocks = (size + (1<<9) - 1) >> 9;
ci->i_reported_size = size; ci->i_reported_size = size;
if (truncate_seq != ci->i_truncate_seq) { if (truncate_seq != ci->i_truncate_seq) {
...@@ -808,7 +808,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page, ...@@ -808,7 +808,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
err = -EINVAL; err = -EINVAL;
if (WARN_ON(symlen != inode->i_size)) if (WARN_ON(symlen != i_size_read(inode)))
goto out; goto out;
err = -ENOMEM; err = -ENOMEM;
...@@ -1549,7 +1549,7 @@ int ceph_inode_set_size(struct inode *inode, loff_t size) ...@@ -1549,7 +1549,7 @@ int ceph_inode_set_size(struct inode *inode, loff_t size)
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
dout("set_size %p %llu -> %llu\n", inode, inode->i_size, size); dout("set_size %p %llu -> %llu\n", inode, inode->i_size, size);
inode->i_size = size; i_size_write(inode, size);
inode->i_blocks = (size + (1 << 9) - 1) >> 9; inode->i_blocks = (size + (1 << 9) - 1) >> 9;
/* tell the MDS if we are approaching max_size */ /* tell the MDS if we are approaching max_size */
...@@ -1911,7 +1911,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) ...@@ -1911,7 +1911,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
inode->i_size, attr->ia_size); inode->i_size, attr->ia_size);
if ((issued & CEPH_CAP_FILE_EXCL) && if ((issued & CEPH_CAP_FILE_EXCL) &&
attr->ia_size > inode->i_size) { attr->ia_size > inode->i_size) {
inode->i_size = attr->ia_size; i_size_write(inode, attr->ia_size);
inode->i_blocks = inode->i_blocks =
(attr->ia_size + (1 << 9) - 1) >> 9; (attr->ia_size + (1 << 9) - 1) >> 9;
inode->i_ctime = attr->ia_ctime; inode->i_ctime = attr->ia_ctime;
......
...@@ -40,46 +40,11 @@ static inline __u32 ceph_frag_mask_shift(__u32 f) ...@@ -40,46 +40,11 @@ static inline __u32 ceph_frag_mask_shift(__u32 f)
return 24 - ceph_frag_bits(f); return 24 - ceph_frag_bits(f);
} }
static inline int ceph_frag_contains_value(__u32 f, __u32 v) static inline bool ceph_frag_contains_value(__u32 f, __u32 v)
{ {
return (v & ceph_frag_mask(f)) == ceph_frag_value(f); return (v & ceph_frag_mask(f)) == ceph_frag_value(f);
} }
static inline int ceph_frag_contains_frag(__u32 f, __u32 sub)
{
/* is sub as specific as us, and contained by us? */
return ceph_frag_bits(sub) >= ceph_frag_bits(f) &&
(ceph_frag_value(sub) & ceph_frag_mask(f)) == ceph_frag_value(f);
}
static inline __u32 ceph_frag_parent(__u32 f)
{
return ceph_frag_make(ceph_frag_bits(f) - 1,
ceph_frag_value(f) & (ceph_frag_mask(f) << 1));
}
static inline int ceph_frag_is_left_child(__u32 f)
{
return ceph_frag_bits(f) > 0 &&
(ceph_frag_value(f) & (0x1000000 >> ceph_frag_bits(f))) == 0;
}
static inline int ceph_frag_is_right_child(__u32 f)
{
return ceph_frag_bits(f) > 0 &&
(ceph_frag_value(f) & (0x1000000 >> ceph_frag_bits(f))) == 1;
}
static inline __u32 ceph_frag_sibling(__u32 f)
{
return ceph_frag_make(ceph_frag_bits(f),
ceph_frag_value(f) ^ (0x1000000 >> ceph_frag_bits(f)));
}
static inline __u32 ceph_frag_left_child(__u32 f)
{
return ceph_frag_make(ceph_frag_bits(f)+1, ceph_frag_value(f));
}
static inline __u32 ceph_frag_right_child(__u32 f)
{
return ceph_frag_make(ceph_frag_bits(f)+1,
ceph_frag_value(f) | (0x1000000 >> (1+ceph_frag_bits(f))));
}
static inline __u32 ceph_frag_make_child(__u32 f, int by, int i) static inline __u32 ceph_frag_make_child(__u32 f, int by, int i)
{ {
int newbits = ceph_frag_bits(f) + by; int newbits = ceph_frag_bits(f) + by;
......
...@@ -220,6 +220,7 @@ struct ceph_connection { ...@@ -220,6 +220,7 @@ struct ceph_connection {
struct ceph_entity_addr actual_peer_addr; struct ceph_entity_addr actual_peer_addr;
/* message out temps */ /* message out temps */
struct ceph_msg_header out_hdr;
struct ceph_msg *out_msg; /* sending message (== tail of struct ceph_msg *out_msg; /* sending message (== tail of
out_sent) */ out_sent) */
bool out_msg_done; bool out_msg_done;
...@@ -229,7 +230,6 @@ struct ceph_connection { ...@@ -229,7 +230,6 @@ struct ceph_connection {
int out_kvec_left; /* kvec's left in out_kvec */ int out_kvec_left; /* kvec's left in out_kvec */
int out_skip; /* skip this many bytes */ int out_skip; /* skip this many bytes */
int out_kvec_bytes; /* total bytes left */ int out_kvec_bytes; /* total bytes left */
bool out_kvec_is_msg; /* kvec refers to out_msg */
int out_more; /* there is more data after the kvecs */ int out_more; /* there is more data after the kvecs */
__le64 out_temp_ack; /* for writing an ack */ __le64 out_temp_ack; /* for writing an ack */
struct ceph_timespec out_temp_keepalive2; /* for writing keepalive2 struct ceph_timespec out_temp_keepalive2; /* for writing keepalive2
......
...@@ -152,7 +152,6 @@ static int process_one_ticket(struct ceph_auth_client *ac, ...@@ -152,7 +152,6 @@ static int process_one_ticket(struct ceph_auth_client *ac,
void *ticket_buf = NULL; void *ticket_buf = NULL;
void *tp, *tpend; void *tp, *tpend;
void **ptp; void **ptp;
struct ceph_timespec new_validity;
struct ceph_crypto_key new_session_key; struct ceph_crypto_key new_session_key;
struct ceph_buffer *new_ticket_blob; struct ceph_buffer *new_ticket_blob;
unsigned long new_expires, new_renew_after; unsigned long new_expires, new_renew_after;
...@@ -193,8 +192,8 @@ static int process_one_ticket(struct ceph_auth_client *ac, ...@@ -193,8 +192,8 @@ static int process_one_ticket(struct ceph_auth_client *ac,
if (ret) if (ret)
goto out; goto out;
ceph_decode_copy(&dp, &new_validity, sizeof(new_validity)); ceph_decode_timespec(&validity, dp);
ceph_decode_timespec(&validity, &new_validity); dp += sizeof(struct ceph_timespec);
new_expires = get_seconds() + validity.tv_sec; new_expires = get_seconds() + validity.tv_sec;
new_renew_after = new_expires - (validity.tv_sec / 4); new_renew_after = new_expires - (validity.tv_sec / 4);
dout(" expires=%lu renew_after=%lu\n", new_expires, dout(" expires=%lu renew_after=%lu\n", new_expires,
...@@ -233,10 +232,10 @@ static int process_one_ticket(struct ceph_auth_client *ac, ...@@ -233,10 +232,10 @@ static int process_one_ticket(struct ceph_auth_client *ac,
ceph_buffer_put(th->ticket_blob); ceph_buffer_put(th->ticket_blob);
th->session_key = new_session_key; th->session_key = new_session_key;
th->ticket_blob = new_ticket_blob; th->ticket_blob = new_ticket_blob;
th->validity = new_validity;
th->secret_id = new_secret_id; th->secret_id = new_secret_id;
th->expires = new_expires; th->expires = new_expires;
th->renew_after = new_renew_after; th->renew_after = new_renew_after;
th->have_key = true;
dout(" got ticket service %d (%s) secret_id %lld len %d\n", dout(" got ticket service %d (%s) secret_id %lld len %d\n",
type, ceph_entity_type_name(type), th->secret_id, type, ceph_entity_type_name(type), th->secret_id,
(int)th->ticket_blob->vec.iov_len); (int)th->ticket_blob->vec.iov_len);
...@@ -384,6 +383,24 @@ static int ceph_x_encode_ticket(struct ceph_x_ticket_handler *th, ...@@ -384,6 +383,24 @@ static int ceph_x_encode_ticket(struct ceph_x_ticket_handler *th,
return -ERANGE; return -ERANGE;
} }
static bool need_key(struct ceph_x_ticket_handler *th)
{
if (!th->have_key)
return true;
return get_seconds() >= th->renew_after;
}
static bool have_key(struct ceph_x_ticket_handler *th)
{
if (th->have_key) {
if (get_seconds() >= th->expires)
th->have_key = false;
}
return th->have_key;
}
static void ceph_x_validate_tickets(struct ceph_auth_client *ac, int *pneed) static void ceph_x_validate_tickets(struct ceph_auth_client *ac, int *pneed)
{ {
int want = ac->want_keys; int want = ac->want_keys;
...@@ -402,20 +419,18 @@ static void ceph_x_validate_tickets(struct ceph_auth_client *ac, int *pneed) ...@@ -402,20 +419,18 @@ static void ceph_x_validate_tickets(struct ceph_auth_client *ac, int *pneed)
continue; continue;
th = get_ticket_handler(ac, service); th = get_ticket_handler(ac, service);
if (IS_ERR(th)) { if (IS_ERR(th)) {
*pneed |= service; *pneed |= service;
continue; continue;
} }
if (get_seconds() >= th->renew_after) if (need_key(th))
*pneed |= service; *pneed |= service;
if (get_seconds() >= th->expires) if (!have_key(th))
xi->have_keys &= ~service; xi->have_keys &= ~service;
} }
} }
static int ceph_x_build_request(struct ceph_auth_client *ac, static int ceph_x_build_request(struct ceph_auth_client *ac,
void *buf, void *end) void *buf, void *end)
{ {
...@@ -667,14 +682,26 @@ static void ceph_x_destroy(struct ceph_auth_client *ac) ...@@ -667,14 +682,26 @@ static void ceph_x_destroy(struct ceph_auth_client *ac)
ac->private = NULL; ac->private = NULL;
} }
static void ceph_x_invalidate_authorizer(struct ceph_auth_client *ac, static void invalidate_ticket(struct ceph_auth_client *ac, int peer_type)
int peer_type)
{ {
struct ceph_x_ticket_handler *th; struct ceph_x_ticket_handler *th;
th = get_ticket_handler(ac, peer_type); th = get_ticket_handler(ac, peer_type);
if (!IS_ERR(th)) if (!IS_ERR(th))
memset(&th->validity, 0, sizeof(th->validity)); th->have_key = false;
}
static void ceph_x_invalidate_authorizer(struct ceph_auth_client *ac,
int peer_type)
{
/*
* We are to invalidate a service ticket in the hopes of
* getting a new, hopefully more valid, one. But, we won't get
* it unless our AUTH ticket is good, so invalidate AUTH ticket
* as well, just in case.
*/
invalidate_ticket(ac, peer_type);
invalidate_ticket(ac, CEPH_ENTITY_TYPE_AUTH);
} }
static int calcu_signature(struct ceph_x_authorizer *au, static int calcu_signature(struct ceph_x_authorizer *au,
......
...@@ -16,7 +16,7 @@ struct ceph_x_ticket_handler { ...@@ -16,7 +16,7 @@ struct ceph_x_ticket_handler {
unsigned int service; unsigned int service;
struct ceph_crypto_key session_key; struct ceph_crypto_key session_key;
struct ceph_timespec validity; bool have_key;
u64 secret_id; u64 secret_id;
struct ceph_buffer *ticket_blob; struct ceph_buffer *ticket_blob;
......
...@@ -23,9 +23,6 @@ ...@@ -23,9 +23,6 @@
#include <linux/ceph/pagelist.h> #include <linux/ceph/pagelist.h>
#include <linux/export.h> #include <linux/export.h>
#define list_entry_next(pos, member) \
list_entry(pos->member.next, typeof(*pos), member)
/* /*
* Ceph uses the messenger to exchange ceph_msg messages with other * Ceph uses the messenger to exchange ceph_msg messages with other
* hosts in the system. The messenger provides ordered and reliable * hosts in the system. The messenger provides ordered and reliable
...@@ -672,6 +669,8 @@ static void reset_connection(struct ceph_connection *con) ...@@ -672,6 +669,8 @@ static void reset_connection(struct ceph_connection *con)
} }
con->in_seq = 0; con->in_seq = 0;
con->in_seq_acked = 0; con->in_seq_acked = 0;
con->out_skip = 0;
} }
/* /*
...@@ -771,6 +770,8 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt) ...@@ -771,6 +770,8 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
static void con_out_kvec_reset(struct ceph_connection *con) static void con_out_kvec_reset(struct ceph_connection *con)
{ {
BUG_ON(con->out_skip);
con->out_kvec_left = 0; con->out_kvec_left = 0;
con->out_kvec_bytes = 0; con->out_kvec_bytes = 0;
con->out_kvec_cur = &con->out_kvec[0]; con->out_kvec_cur = &con->out_kvec[0];
...@@ -779,9 +780,9 @@ static void con_out_kvec_reset(struct ceph_connection *con) ...@@ -779,9 +780,9 @@ static void con_out_kvec_reset(struct ceph_connection *con)
static void con_out_kvec_add(struct ceph_connection *con, static void con_out_kvec_add(struct ceph_connection *con,
size_t size, void *data) size_t size, void *data)
{ {
int index; int index = con->out_kvec_left;
index = con->out_kvec_left; BUG_ON(con->out_skip);
BUG_ON(index >= ARRAY_SIZE(con->out_kvec)); BUG_ON(index >= ARRAY_SIZE(con->out_kvec));
con->out_kvec[index].iov_len = size; con->out_kvec[index].iov_len = size;
...@@ -790,6 +791,27 @@ static void con_out_kvec_add(struct ceph_connection *con, ...@@ -790,6 +791,27 @@ static void con_out_kvec_add(struct ceph_connection *con,
con->out_kvec_bytes += size; con->out_kvec_bytes += size;
} }
/*
* Chop off a kvec from the end. Return residual number of bytes for
* that kvec, i.e. how many bytes would have been written if the kvec
* hadn't been nuked.
*/
static int con_out_kvec_skip(struct ceph_connection *con)
{
int off = con->out_kvec_cur - con->out_kvec;
int skip = 0;
if (con->out_kvec_bytes > 0) {
skip = con->out_kvec[off + con->out_kvec_left - 1].iov_len;
BUG_ON(con->out_kvec_bytes < skip);
BUG_ON(!con->out_kvec_left);
con->out_kvec_bytes -= skip;
con->out_kvec_left--;
}
return skip;
}
#ifdef CONFIG_BLOCK #ifdef CONFIG_BLOCK
/* /*
...@@ -1042,7 +1064,7 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor, ...@@ -1042,7 +1064,7 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
/* Move on to the next page */ /* Move on to the next page */
BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head)); BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head));
cursor->page = list_entry_next(cursor->page, lru); cursor->page = list_next_entry(cursor->page, lru);
cursor->last_piece = cursor->resid <= PAGE_SIZE; cursor->last_piece = cursor->resid <= PAGE_SIZE;
return true; return true;
...@@ -1166,7 +1188,7 @@ static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, ...@@ -1166,7 +1188,7 @@ static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
if (!cursor->resid && cursor->total_resid) { if (!cursor->resid && cursor->total_resid) {
WARN_ON(!cursor->last_piece); WARN_ON(!cursor->last_piece);
BUG_ON(list_is_last(&cursor->data->links, cursor->data_head)); BUG_ON(list_is_last(&cursor->data->links, cursor->data_head));
cursor->data = list_entry_next(cursor->data, links); cursor->data = list_next_entry(cursor->data, links);
__ceph_msg_data_cursor_init(cursor); __ceph_msg_data_cursor_init(cursor);
new_piece = true; new_piece = true;
} }
...@@ -1197,7 +1219,6 @@ static void prepare_write_message_footer(struct ceph_connection *con) ...@@ -1197,7 +1219,6 @@ static void prepare_write_message_footer(struct ceph_connection *con)
m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE; m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
dout("prepare_write_message_footer %p\n", con); dout("prepare_write_message_footer %p\n", con);
con->out_kvec_is_msg = true;
con->out_kvec[v].iov_base = &m->footer; con->out_kvec[v].iov_base = &m->footer;
if (con->peer_features & CEPH_FEATURE_MSG_AUTH) { if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
if (con->ops->sign_message) if (con->ops->sign_message)
...@@ -1225,7 +1246,6 @@ static void prepare_write_message(struct ceph_connection *con) ...@@ -1225,7 +1246,6 @@ static void prepare_write_message(struct ceph_connection *con)
u32 crc; u32 crc;
con_out_kvec_reset(con); con_out_kvec_reset(con);
con->out_kvec_is_msg = true;
con->out_msg_done = false; con->out_msg_done = false;
/* Sneak an ack in there first? If we can get it into the same /* Sneak an ack in there first? If we can get it into the same
...@@ -1265,18 +1285,19 @@ static void prepare_write_message(struct ceph_connection *con) ...@@ -1265,18 +1285,19 @@ static void prepare_write_message(struct ceph_connection *con)
/* tag + hdr + front + middle */ /* tag + hdr + front + middle */
con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
con_out_kvec_add(con, sizeof (m->hdr), &m->hdr); con_out_kvec_add(con, sizeof(con->out_hdr), &con->out_hdr);
con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
if (m->middle) if (m->middle)
con_out_kvec_add(con, m->middle->vec.iov_len, con_out_kvec_add(con, m->middle->vec.iov_len,
m->middle->vec.iov_base); m->middle->vec.iov_base);
/* fill in crc (except data pages), footer */ /* fill in hdr crc and finalize hdr */
crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc)); crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
con->out_msg->hdr.crc = cpu_to_le32(crc); con->out_msg->hdr.crc = cpu_to_le32(crc);
con->out_msg->footer.flags = 0; memcpy(&con->out_hdr, &con->out_msg->hdr, sizeof(con->out_hdr));
/* fill in front and middle crc, footer */
crc = crc32c(0, m->front.iov_base, m->front.iov_len); crc = crc32c(0, m->front.iov_base, m->front.iov_len);
con->out_msg->footer.front_crc = cpu_to_le32(crc); con->out_msg->footer.front_crc = cpu_to_le32(crc);
if (m->middle) { if (m->middle) {
...@@ -1288,6 +1309,7 @@ static void prepare_write_message(struct ceph_connection *con) ...@@ -1288,6 +1309,7 @@ static void prepare_write_message(struct ceph_connection *con)
dout("%s front_crc %u middle_crc %u\n", __func__, dout("%s front_crc %u middle_crc %u\n", __func__,
le32_to_cpu(con->out_msg->footer.front_crc), le32_to_cpu(con->out_msg->footer.front_crc),
le32_to_cpu(con->out_msg->footer.middle_crc)); le32_to_cpu(con->out_msg->footer.middle_crc));
con->out_msg->footer.flags = 0;
/* is there a data payload? */ /* is there a data payload? */
con->out_msg->footer.data_crc = 0; con->out_msg->footer.data_crc = 0;
...@@ -1492,7 +1514,6 @@ static int write_partial_kvec(struct ceph_connection *con) ...@@ -1492,7 +1514,6 @@ static int write_partial_kvec(struct ceph_connection *con)
} }
} }
con->out_kvec_left = 0; con->out_kvec_left = 0;
con->out_kvec_is_msg = false;
ret = 1; ret = 1;
out: out:
dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con, dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
...@@ -1584,6 +1605,7 @@ static int write_partial_skip(struct ceph_connection *con) ...@@ -1584,6 +1605,7 @@ static int write_partial_skip(struct ceph_connection *con)
{ {
int ret; int ret;
dout("%s %p %d left\n", __func__, con, con->out_skip);
while (con->out_skip > 0) { while (con->out_skip > 0) {
size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE); size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE);
...@@ -2506,13 +2528,13 @@ static int try_write(struct ceph_connection *con) ...@@ -2506,13 +2528,13 @@ static int try_write(struct ceph_connection *con)
more_kvec: more_kvec:
/* kvec data queued? */ /* kvec data queued? */
if (con->out_skip) { if (con->out_kvec_left) {
ret = write_partial_skip(con); ret = write_partial_kvec(con);
if (ret <= 0) if (ret <= 0)
goto out; goto out;
} }
if (con->out_kvec_left) { if (con->out_skip) {
ret = write_partial_kvec(con); ret = write_partial_skip(con);
if (ret <= 0) if (ret <= 0)
goto out; goto out;
} }
...@@ -2805,13 +2827,17 @@ static bool con_backoff(struct ceph_connection *con) ...@@ -2805,13 +2827,17 @@ static bool con_backoff(struct ceph_connection *con)
static void con_fault_finish(struct ceph_connection *con) static void con_fault_finish(struct ceph_connection *con)
{ {
dout("%s %p\n", __func__, con);
/* /*
* in case we faulted due to authentication, invalidate our * in case we faulted due to authentication, invalidate our
* current tickets so that we can get new ones. * current tickets so that we can get new ones.
*/ */
if (con->auth_retry && con->ops->invalidate_authorizer) { if (con->auth_retry) {
dout("calling invalidate_authorizer()\n"); dout("auth_retry %d, invalidating\n", con->auth_retry);
con->ops->invalidate_authorizer(con); if (con->ops->invalidate_authorizer)
con->ops->invalidate_authorizer(con);
con->auth_retry = 0;
} }
if (con->ops->fault) if (con->ops->fault)
...@@ -3050,16 +3076,31 @@ void ceph_msg_revoke(struct ceph_msg *msg) ...@@ -3050,16 +3076,31 @@ void ceph_msg_revoke(struct ceph_msg *msg)
ceph_msg_put(msg); ceph_msg_put(msg);
} }
if (con->out_msg == msg) { if (con->out_msg == msg) {
dout("%s %p msg %p - was sending\n", __func__, con, msg); BUG_ON(con->out_skip);
con->out_msg = NULL; /* footer */
if (con->out_kvec_is_msg) { if (con->out_msg_done) {
con->out_skip = con->out_kvec_bytes; con->out_skip += con_out_kvec_skip(con);
con->out_kvec_is_msg = false; } else {
BUG_ON(!msg->data_length);
if (con->peer_features & CEPH_FEATURE_MSG_AUTH)
con->out_skip += sizeof(msg->footer);
else
con->out_skip += sizeof(msg->old_footer);
} }
/* data, middle, front */
if (msg->data_length)
con->out_skip += msg->cursor.total_resid;
if (msg->middle)
con->out_skip += con_out_kvec_skip(con);
con->out_skip += con_out_kvec_skip(con);
dout("%s %p msg %p - was sending, will write %d skip %d\n",
__func__, con, msg, con->out_kvec_bytes, con->out_skip);
msg->hdr.seq = 0; msg->hdr.seq = 0;
con->out_msg = NULL;
ceph_msg_put(msg); ceph_msg_put(msg);
} }
mutex_unlock(&con->mutex); mutex_unlock(&con->mutex);
} }
...@@ -3361,9 +3402,7 @@ static void ceph_msg_free(struct ceph_msg *m) ...@@ -3361,9 +3402,7 @@ static void ceph_msg_free(struct ceph_msg *m)
static void ceph_msg_release(struct kref *kref) static void ceph_msg_release(struct kref *kref)
{ {
struct ceph_msg *m = container_of(kref, struct ceph_msg, kref); struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
LIST_HEAD(data); struct ceph_msg_data *data, *next;
struct list_head *links;
struct list_head *next;
dout("%s %p\n", __func__, m); dout("%s %p\n", __func__, m);
WARN_ON(!list_empty(&m->list_head)); WARN_ON(!list_empty(&m->list_head));
...@@ -3376,12 +3415,8 @@ static void ceph_msg_release(struct kref *kref) ...@@ -3376,12 +3415,8 @@ static void ceph_msg_release(struct kref *kref)
m->middle = NULL; m->middle = NULL;
} }
list_splice_init(&m->data, &data); list_for_each_entry_safe(data, next, &m->data, links) {
list_for_each_safe(links, next, &data) { list_del_init(&data->links);
struct ceph_msg_data *data;
data = list_entry(links, struct ceph_msg_data, links);
list_del_init(links);
ceph_msg_data_destroy(data); ceph_msg_data_destroy(data);
} }
m->data_length = 0; m->data_length = 0;
......
...@@ -364,10 +364,6 @@ static bool have_debugfs_info(struct ceph_mon_client *monc) ...@@ -364,10 +364,6 @@ static bool have_debugfs_info(struct ceph_mon_client *monc)
return monc->client->have_fsid && monc->auth->global_id > 0; return monc->client->have_fsid && monc->auth->global_id > 0;
} }
/*
* The monitor responds with mount ack indicate mount success. The
* included client ticket allows the client to talk to MDSs and OSDs.
*/
static void ceph_monc_handle_map(struct ceph_mon_client *monc, static void ceph_monc_handle_map(struct ceph_mon_client *monc,
struct ceph_msg *msg) struct ceph_msg *msg)
{ {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment