Commit 9e95dae7 authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'ceph-for-4.16-rc1' of git://github.com/ceph/ceph-client

Pull ceph updates from Ilya Dryomov:
 "Things have been very quiet on the rbd side, as work continues on the
  big ticket items slated for the next merge window.

  On the CephFS side we have a large number of cap handling
  improvements, a fix for our long-standing abuse of ->journal_info in
  ceph_readpages() and yet another dentry pointer management patch"

* tag 'ceph-for-4.16-rc1' of git://github.com/ceph/ceph-client:
  ceph: improving efficiency of syncfs
  libceph: check kstrndup() return value
  ceph: try to allocate enough memory for reserved caps
  ceph: fix race of queuing delayed caps
  ceph: delete unreachable code in ceph_check_caps()
  ceph: limit rate of cap import/export error messages
  ceph: fix incorrect snaprealm when adding caps
  ceph: fix un-balanced fsc->writeback_count update
  ceph: track read contexts in ceph_file_info
  ceph: avoid dereferencing invalid pointer during cached readdir
  ceph: use atomic_t for ceph_inode_info::i_shared_gen
  ceph: cleanup traceless reply handling for rename
  ceph: voluntarily drop Fx cap for readdir request
  ceph: properly drop caps for setattr request
  ceph: voluntarily drop Lx cap for link/rename requests
  ceph: voluntarily drop Ax cap for requests that create new inode
  rbd: whitelist RBD_FEATURE_OPERATIONS feature bit
  rbd: don't NULL out ->obj_request in rbd_img_obj_parent_read_full()
  rbd: use kmem_cache_zalloc() in rbd_img_request_create()
  rbd: obj_request->completion is unused
parents a8c6db00 16515a6d
...@@ -124,11 +124,13 @@ static int atomic_dec_return_safe(atomic_t *v) ...@@ -124,11 +124,13 @@ static int atomic_dec_return_safe(atomic_t *v)
#define RBD_FEATURE_STRIPINGV2 (1ULL<<1) #define RBD_FEATURE_STRIPINGV2 (1ULL<<1)
#define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2) #define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2)
#define RBD_FEATURE_DATA_POOL (1ULL<<7) #define RBD_FEATURE_DATA_POOL (1ULL<<7)
#define RBD_FEATURE_OPERATIONS (1ULL<<8)
#define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \ #define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
RBD_FEATURE_STRIPINGV2 | \ RBD_FEATURE_STRIPINGV2 | \
RBD_FEATURE_EXCLUSIVE_LOCK | \ RBD_FEATURE_EXCLUSIVE_LOCK | \
RBD_FEATURE_DATA_POOL) RBD_FEATURE_DATA_POOL | \
RBD_FEATURE_OPERATIONS)
/* Features supported by this (client software) implementation. */ /* Features supported by this (client software) implementation. */
...@@ -281,7 +283,6 @@ struct rbd_obj_request { ...@@ -281,7 +283,6 @@ struct rbd_obj_request {
int result; int result;
rbd_obj_callback_t callback; rbd_obj_callback_t callback;
struct completion completion;
struct kref kref; struct kref kref;
}; };
...@@ -1734,10 +1735,7 @@ static void rbd_obj_request_complete(struct rbd_obj_request *obj_request) ...@@ -1734,10 +1735,7 @@ static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
{ {
dout("%s: obj %p cb %p\n", __func__, obj_request, dout("%s: obj %p cb %p\n", __func__, obj_request,
obj_request->callback); obj_request->callback);
if (obj_request->callback) obj_request->callback(obj_request);
obj_request->callback(obj_request);
else
complete_all(&obj_request->completion);
} }
static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err) static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err)
...@@ -2013,7 +2011,6 @@ rbd_obj_request_create(enum obj_request_type type) ...@@ -2013,7 +2011,6 @@ rbd_obj_request_create(enum obj_request_type type)
obj_request->which = BAD_WHICH; obj_request->which = BAD_WHICH;
obj_request->type = type; obj_request->type = type;
INIT_LIST_HEAD(&obj_request->links); INIT_LIST_HEAD(&obj_request->links);
init_completion(&obj_request->completion);
kref_init(&obj_request->kref); kref_init(&obj_request->kref);
dout("%s %p\n", __func__, obj_request); dout("%s %p\n", __func__, obj_request);
...@@ -2129,15 +2126,13 @@ static struct rbd_img_request *rbd_img_request_create( ...@@ -2129,15 +2126,13 @@ static struct rbd_img_request *rbd_img_request_create(
{ {
struct rbd_img_request *img_request; struct rbd_img_request *img_request;
img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO); img_request = kmem_cache_zalloc(rbd_img_request_cache, GFP_NOIO);
if (!img_request) if (!img_request)
return NULL; return NULL;
img_request->rq = NULL;
img_request->rbd_dev = rbd_dev; img_request->rbd_dev = rbd_dev;
img_request->offset = offset; img_request->offset = offset;
img_request->length = length; img_request->length = length;
img_request->flags = 0;
if (op_type == OBJ_OP_DISCARD) { if (op_type == OBJ_OP_DISCARD) {
img_request_discard_set(img_request); img_request_discard_set(img_request);
img_request->snapc = snapc; img_request->snapc = snapc;
...@@ -2149,11 +2144,8 @@ static struct rbd_img_request *rbd_img_request_create( ...@@ -2149,11 +2144,8 @@ static struct rbd_img_request *rbd_img_request_create(
} }
if (rbd_dev_parent_get(rbd_dev)) if (rbd_dev_parent_get(rbd_dev))
img_request_layered_set(img_request); img_request_layered_set(img_request);
spin_lock_init(&img_request->completion_lock); spin_lock_init(&img_request->completion_lock);
img_request->next_completion = 0;
img_request->callback = NULL;
img_request->result = 0;
img_request->obj_request_count = 0;
INIT_LIST_HEAD(&img_request->obj_requests); INIT_LIST_HEAD(&img_request->obj_requests);
kref_init(&img_request->kref); kref_init(&img_request->kref);
...@@ -2692,8 +2684,6 @@ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request) ...@@ -2692,8 +2684,6 @@ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
parent_request->copyup_pages = NULL; parent_request->copyup_pages = NULL;
parent_request->copyup_page_count = 0; parent_request->copyup_page_count = 0;
parent_request->obj_request = NULL;
rbd_obj_request_put(obj_request);
out_err: out_err:
if (pages) if (pages)
ceph_release_page_vector(pages, page_count); ceph_release_page_vector(pages, page_count);
......
...@@ -299,7 +299,8 @@ static void finish_read(struct ceph_osd_request *req) ...@@ -299,7 +299,8 @@ static void finish_read(struct ceph_osd_request *req)
* start an async read(ahead) operation. return nr_pages we submitted * start an async read(ahead) operation. return nr_pages we submitted
* a read for on success, or negative error code. * a read for on success, or negative error code.
*/ */
static int start_read(struct inode *inode, struct list_head *page_list, int max) static int start_read(struct inode *inode, struct ceph_rw_context *rw_ctx,
struct list_head *page_list, int max)
{ {
struct ceph_osd_client *osdc = struct ceph_osd_client *osdc =
&ceph_inode_to_client(inode)->client->osdc; &ceph_inode_to_client(inode)->client->osdc;
...@@ -316,7 +317,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) ...@@ -316,7 +317,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
int got = 0; int got = 0;
int ret = 0; int ret = 0;
if (!current->journal_info) { if (!rw_ctx) {
/* caller of readpages does not hold buffer and read caps /* caller of readpages does not hold buffer and read caps
* (fadvise, madvise and readahead cases) */ * (fadvise, madvise and readahead cases) */
int want = CEPH_CAP_FILE_CACHE; int want = CEPH_CAP_FILE_CACHE;
...@@ -437,6 +438,8 @@ static int ceph_readpages(struct file *file, struct address_space *mapping, ...@@ -437,6 +438,8 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
{ {
struct inode *inode = file_inode(file); struct inode *inode = file_inode(file);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_file_info *ci = file->private_data;
struct ceph_rw_context *rw_ctx;
int rc = 0; int rc = 0;
int max = 0; int max = 0;
...@@ -449,11 +452,12 @@ static int ceph_readpages(struct file *file, struct address_space *mapping, ...@@ -449,11 +452,12 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
if (rc == 0) if (rc == 0)
goto out; goto out;
rw_ctx = ceph_find_rw_context(ci);
max = fsc->mount_options->rsize >> PAGE_SHIFT; max = fsc->mount_options->rsize >> PAGE_SHIFT;
dout("readpages %p file %p nr_pages %d max %d\n", dout("readpages %p file %p ctx %p nr_pages %d max %d\n",
inode, file, nr_pages, max); inode, file, rw_ctx, nr_pages, max);
while (!list_empty(page_list)) { while (!list_empty(page_list)) {
rc = start_read(inode, page_list, max); rc = start_read(inode, rw_ctx, page_list, max);
if (rc < 0) if (rc < 0)
goto out; goto out;
} }
...@@ -574,7 +578,6 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) ...@@ -574,7 +578,6 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
struct ceph_fs_client *fsc; struct ceph_fs_client *fsc;
struct ceph_snap_context *snapc, *oldest; struct ceph_snap_context *snapc, *oldest;
loff_t page_off = page_offset(page); loff_t page_off = page_offset(page);
long writeback_stat;
int err, len = PAGE_SIZE; int err, len = PAGE_SIZE;
struct ceph_writeback_ctl ceph_wbc; struct ceph_writeback_ctl ceph_wbc;
...@@ -615,8 +618,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) ...@@ -615,8 +618,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
dout("writepage %p page %p index %lu on %llu~%u snapc %p seq %lld\n", dout("writepage %p page %p index %lu on %llu~%u snapc %p seq %lld\n",
inode, page, page->index, page_off, len, snapc, snapc->seq); inode, page, page->index, page_off, len, snapc, snapc->seq);
writeback_stat = atomic_long_inc_return(&fsc->writeback_count); if (atomic_long_inc_return(&fsc->writeback_count) >
if (writeback_stat >
CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb)) CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb))
set_bdi_congested(inode_to_bdi(inode), BLK_RW_ASYNC); set_bdi_congested(inode_to_bdi(inode), BLK_RW_ASYNC);
...@@ -651,6 +653,11 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) ...@@ -651,6 +653,11 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
end_page_writeback(page); end_page_writeback(page);
ceph_put_wrbuffer_cap_refs(ci, 1, snapc); ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
ceph_put_snap_context(snapc); /* page's reference */ ceph_put_snap_context(snapc); /* page's reference */
if (atomic_long_dec_return(&fsc->writeback_count) <
CONGESTION_OFF_THRESH(fsc->mount_options->congestion_kb))
clear_bdi_congested(inode_to_bdi(inode), BLK_RW_ASYNC);
return err; return err;
} }
...@@ -1450,9 +1457,10 @@ static int ceph_filemap_fault(struct vm_fault *vmf) ...@@ -1450,9 +1457,10 @@ static int ceph_filemap_fault(struct vm_fault *vmf)
if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) || if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) ||
ci->i_inline_version == CEPH_INLINE_NONE) { ci->i_inline_version == CEPH_INLINE_NONE) {
current->journal_info = vma->vm_file; CEPH_DEFINE_RW_CONTEXT(rw_ctx, got);
ceph_add_rw_context(fi, &rw_ctx);
ret = filemap_fault(vmf); ret = filemap_fault(vmf);
current->journal_info = NULL; ceph_del_rw_context(fi, &rw_ctx);
} else } else
ret = -EAGAIN; ret = -EAGAIN;
......
...@@ -154,13 +154,19 @@ void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta) ...@@ -154,13 +154,19 @@ void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta)
spin_unlock(&mdsc->caps_list_lock); spin_unlock(&mdsc->caps_list_lock);
} }
void ceph_reserve_caps(struct ceph_mds_client *mdsc, /*
* Called under mdsc->mutex.
*/
int ceph_reserve_caps(struct ceph_mds_client *mdsc,
struct ceph_cap_reservation *ctx, int need) struct ceph_cap_reservation *ctx, int need)
{ {
int i; int i, j;
struct ceph_cap *cap; struct ceph_cap *cap;
int have; int have;
int alloc = 0; int alloc = 0;
int max_caps;
bool trimmed = false;
struct ceph_mds_session *s;
LIST_HEAD(newcaps); LIST_HEAD(newcaps);
dout("reserve caps ctx=%p need=%d\n", ctx, need); dout("reserve caps ctx=%p need=%d\n", ctx, need);
...@@ -179,16 +185,37 @@ void ceph_reserve_caps(struct ceph_mds_client *mdsc, ...@@ -179,16 +185,37 @@ void ceph_reserve_caps(struct ceph_mds_client *mdsc,
spin_unlock(&mdsc->caps_list_lock); spin_unlock(&mdsc->caps_list_lock);
for (i = have; i < need; i++) { for (i = have; i < need; i++) {
retry:
cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS); cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
if (!cap) if (!cap) {
break; if (!trimmed) {
for (j = 0; j < mdsc->max_sessions; j++) {
s = __ceph_lookup_mds_session(mdsc, j);
if (!s)
continue;
mutex_unlock(&mdsc->mutex);
mutex_lock(&s->s_mutex);
max_caps = s->s_nr_caps - (need - i);
ceph_trim_caps(mdsc, s, max_caps);
mutex_unlock(&s->s_mutex);
ceph_put_mds_session(s);
mutex_lock(&mdsc->mutex);
}
trimmed = true;
goto retry;
} else {
pr_warn("reserve caps ctx=%p ENOMEM "
"need=%d got=%d\n",
ctx, need, have + alloc);
goto out_nomem;
}
}
list_add(&cap->caps_item, &newcaps); list_add(&cap->caps_item, &newcaps);
alloc++; alloc++;
} }
/* we didn't manage to reserve as much as we needed */ BUG_ON(have + alloc != need);
if (have + alloc != need)
pr_warn("reserve caps ctx=%p ENOMEM need=%d got=%d\n",
ctx, need, have + alloc);
spin_lock(&mdsc->caps_list_lock); spin_lock(&mdsc->caps_list_lock);
mdsc->caps_total_count += alloc; mdsc->caps_total_count += alloc;
...@@ -204,6 +231,24 @@ void ceph_reserve_caps(struct ceph_mds_client *mdsc, ...@@ -204,6 +231,24 @@ void ceph_reserve_caps(struct ceph_mds_client *mdsc,
dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n", dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n",
ctx, mdsc->caps_total_count, mdsc->caps_use_count, ctx, mdsc->caps_total_count, mdsc->caps_use_count,
mdsc->caps_reserve_count, mdsc->caps_avail_count); mdsc->caps_reserve_count, mdsc->caps_avail_count);
return 0;
out_nomem:
while (!list_empty(&newcaps)) {
cap = list_first_entry(&newcaps,
struct ceph_cap, caps_item);
list_del(&cap->caps_item);
kmem_cache_free(ceph_cap_cachep, cap);
}
spin_lock(&mdsc->caps_list_lock);
mdsc->caps_avail_count += have;
mdsc->caps_reserve_count -= have;
BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
mdsc->caps_reserve_count +
mdsc->caps_avail_count);
spin_unlock(&mdsc->caps_list_lock);
return -ENOMEM;
} }
int ceph_unreserve_caps(struct ceph_mds_client *mdsc, int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
...@@ -498,7 +543,7 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap, ...@@ -498,7 +543,7 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
*/ */
if ((issued & CEPH_CAP_FILE_SHARED) != (had & CEPH_CAP_FILE_SHARED)) { if ((issued & CEPH_CAP_FILE_SHARED) != (had & CEPH_CAP_FILE_SHARED)) {
if (issued & CEPH_CAP_FILE_SHARED) if (issued & CEPH_CAP_FILE_SHARED)
ci->i_shared_gen++; atomic_inc(&ci->i_shared_gen);
if (S_ISDIR(ci->vfs_inode.i_mode)) { if (S_ISDIR(ci->vfs_inode.i_mode)) {
dout(" marking %p NOT complete\n", &ci->vfs_inode); dout(" marking %p NOT complete\n", &ci->vfs_inode);
__ceph_dir_clear_complete(ci); __ceph_dir_clear_complete(ci);
...@@ -577,18 +622,30 @@ void ceph_add_cap(struct inode *inode, ...@@ -577,18 +622,30 @@ void ceph_add_cap(struct inode *inode,
} }
} }
if (!ci->i_snap_realm) { if (!ci->i_snap_realm ||
((flags & CEPH_CAP_FLAG_AUTH) &&
realmino != (u64)-1 && ci->i_snap_realm->ino != realmino)) {
/* /*
* add this inode to the appropriate snap realm * add this inode to the appropriate snap realm
*/ */
struct ceph_snap_realm *realm = ceph_lookup_snap_realm(mdsc, struct ceph_snap_realm *realm = ceph_lookup_snap_realm(mdsc,
realmino); realmino);
if (realm) { if (realm) {
struct ceph_snap_realm *oldrealm = ci->i_snap_realm;
if (oldrealm) {
spin_lock(&oldrealm->inodes_with_caps_lock);
list_del_init(&ci->i_snap_realm_item);
spin_unlock(&oldrealm->inodes_with_caps_lock);
}
spin_lock(&realm->inodes_with_caps_lock); spin_lock(&realm->inodes_with_caps_lock);
ci->i_snap_realm = realm; ci->i_snap_realm = realm;
list_add(&ci->i_snap_realm_item, list_add(&ci->i_snap_realm_item,
&realm->inodes_with_caps); &realm->inodes_with_caps);
spin_unlock(&realm->inodes_with_caps_lock); spin_unlock(&realm->inodes_with_caps_lock);
if (oldrealm)
ceph_put_snap_realm(mdsc, oldrealm);
} else { } else {
pr_err("ceph_add_cap: couldn't find snap realm %llx\n", pr_err("ceph_add_cap: couldn't find snap realm %llx\n",
realmino); realmino);
...@@ -890,6 +947,11 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check) ...@@ -890,6 +947,11 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check)
/* /*
* called under i_ceph_lock * called under i_ceph_lock
*/ */
static int __ceph_is_single_caps(struct ceph_inode_info *ci)
{
return rb_first(&ci->i_caps) == rb_last(&ci->i_caps);
}
static int __ceph_is_any_caps(struct ceph_inode_info *ci) static int __ceph_is_any_caps(struct ceph_inode_info *ci)
{ {
return !RB_EMPTY_ROOT(&ci->i_caps); return !RB_EMPTY_ROOT(&ci->i_caps);
...@@ -1703,21 +1765,24 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, ...@@ -1703,21 +1765,24 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
int mds = -1; /* keep track of how far we've gone through i_caps list int mds = -1; /* keep track of how far we've gone through i_caps list
to avoid an infinite loop on retry */ to avoid an infinite loop on retry */
struct rb_node *p; struct rb_node *p;
int delayed = 0, sent = 0, num; int delayed = 0, sent = 0;
bool is_delayed = flags & CHECK_CAPS_NODELAY; bool no_delay = flags & CHECK_CAPS_NODELAY;
bool queue_invalidate = false; bool queue_invalidate = false;
bool force_requeue = false;
bool tried_invalidate = false; bool tried_invalidate = false;
/* if we are unmounting, flush any unused caps immediately. */ /* if we are unmounting, flush any unused caps immediately. */
if (mdsc->stopping) if (mdsc->stopping)
is_delayed = true; no_delay = true;
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
if (ci->i_ceph_flags & CEPH_I_FLUSH) if (ci->i_ceph_flags & CEPH_I_FLUSH)
flags |= CHECK_CAPS_FLUSH; flags |= CHECK_CAPS_FLUSH;
if (!(flags & CHECK_CAPS_AUTHONLY) ||
(ci->i_auth_cap && __ceph_is_single_caps(ci)))
__cap_delay_cancel(mdsc, ci);
goto retry_locked; goto retry_locked;
retry: retry:
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
...@@ -1772,7 +1837,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, ...@@ -1772,7 +1837,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
* have cached pages, but don't want them, then try to invalidate. * have cached pages, but don't want them, then try to invalidate.
* If we fail, it's because pages are locked.... try again later. * If we fail, it's because pages are locked.... try again later.
*/ */
if ((!is_delayed || mdsc->stopping) && if ((!no_delay || mdsc->stopping) &&
!S_ISDIR(inode->i_mode) && /* ignore readdir cache */ !S_ISDIR(inode->i_mode) && /* ignore readdir cache */
!(ci->i_wb_ref || ci->i_wrbuffer_ref) && /* no dirty pages... */ !(ci->i_wb_ref || ci->i_wrbuffer_ref) && /* no dirty pages... */
inode->i_data.nrpages && /* have cached pages */ inode->i_data.nrpages && /* have cached pages */
...@@ -1781,27 +1846,16 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, ...@@ -1781,27 +1846,16 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
!tried_invalidate) { !tried_invalidate) {
dout("check_caps trying to invalidate on %p\n", inode); dout("check_caps trying to invalidate on %p\n", inode);
if (try_nonblocking_invalidate(inode) < 0) { if (try_nonblocking_invalidate(inode) < 0) {
if (revoking & (CEPH_CAP_FILE_CACHE| dout("check_caps queuing invalidate\n");
CEPH_CAP_FILE_LAZYIO)) { queue_invalidate = true;
dout("check_caps queuing invalidate\n"); ci->i_rdcache_revoking = ci->i_rdcache_gen;
queue_invalidate = true;
ci->i_rdcache_revoking = ci->i_rdcache_gen;
} else {
dout("check_caps failed to invalidate pages\n");
/* we failed to invalidate pages. check these
caps again later. */
force_requeue = true;
__cap_set_timeouts(mdsc, ci);
}
} }
tried_invalidate = true; tried_invalidate = true;
goto retry_locked; goto retry_locked;
} }
num = 0;
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
cap = rb_entry(p, struct ceph_cap, ci_node); cap = rb_entry(p, struct ceph_cap, ci_node);
num++;
/* avoid looping forever */ /* avoid looping forever */
if (mds >= cap->mds || if (mds >= cap->mds ||
...@@ -1864,7 +1918,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, ...@@ -1864,7 +1918,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
cap->mds_wanted == want) cap->mds_wanted == want)
continue; /* nope, all good */ continue; /* nope, all good */
if (is_delayed) if (no_delay)
goto ack; goto ack;
/* delay? */ /* delay? */
...@@ -1955,15 +2009,8 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, ...@@ -1955,15 +2009,8 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
goto retry; /* retake i_ceph_lock and restart our cap scan. */ goto retry; /* retake i_ceph_lock and restart our cap scan. */
} }
/* /* Reschedule delayed caps release if we delayed anything */
* Reschedule delayed caps release if we delayed anything, if (delayed)
* otherwise cancel.
*/
if (delayed && is_delayed)
force_requeue = true; /* __send_cap delayed release; requeue */
if (!delayed && !is_delayed)
__cap_delay_cancel(mdsc, ci);
else if (!is_delayed || force_requeue)
__cap_delay_requeue(mdsc, ci); __cap_delay_requeue(mdsc, ci);
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
...@@ -2160,7 +2207,7 @@ int ceph_write_inode(struct inode *inode, struct writeback_control *wbc) ...@@ -2160,7 +2207,7 @@ int ceph_write_inode(struct inode *inode, struct writeback_control *wbc)
u64 flush_tid; u64 flush_tid;
int err = 0; int err = 0;
int dirty; int dirty;
int wait = wbc->sync_mode == WB_SYNC_ALL; int wait = (wbc->sync_mode == WB_SYNC_ALL && !wbc->for_sync);
dout("write_inode %p wait=%d\n", inode, wait); dout("write_inode %p wait=%d\n", inode, wait);
if (wait) { if (wait) {
...@@ -3426,7 +3473,14 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, ...@@ -3426,7 +3473,14 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
*/ */
issued = cap->issued; issued = cap->issued;
WARN_ON(issued != cap->implemented); if (issued != cap->implemented)
pr_err_ratelimited("handle_cap_export: issued != implemented: "
"ino (%llx.%llx) mds%d seq %d mseq %d "
"issued %s implemented %s\n",
ceph_vinop(inode), mds, cap->seq, cap->mseq,
ceph_cap_string(issued),
ceph_cap_string(cap->implemented));
tcap = __get_cap_for_mds(ci, target); tcap = __get_cap_for_mds(ci, target);
if (tcap) { if (tcap) {
...@@ -3572,12 +3626,13 @@ static void handle_cap_import(struct ceph_mds_client *mdsc, ...@@ -3572,12 +3626,13 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
if ((ph->flags & CEPH_CAP_FLAG_AUTH) && if ((ph->flags & CEPH_CAP_FLAG_AUTH) &&
(ocap->seq != le32_to_cpu(ph->seq) || (ocap->seq != le32_to_cpu(ph->seq) ||
ocap->mseq != le32_to_cpu(ph->mseq))) { ocap->mseq != le32_to_cpu(ph->mseq))) {
pr_err("handle_cap_import: mismatched seq/mseq: " pr_err_ratelimited("handle_cap_import: "
"ino (%llx.%llx) mds%d seq %d mseq %d " "mismatched seq/mseq: ino (%llx.%llx) "
"importer mds%d has peer seq %d mseq %d\n", "mds%d seq %d mseq %d importer mds%d "
ceph_vinop(inode), peer, ocap->seq, "has peer seq %d mseq %d\n",
ocap->mseq, mds, le32_to_cpu(ph->seq), ceph_vinop(inode), peer, ocap->seq,
le32_to_cpu(ph->mseq)); ocap->mseq, mds, le32_to_cpu(ph->seq),
le32_to_cpu(ph->mseq));
} }
__ceph_remove_cap(ocap, (ph->flags & CEPH_CAP_FLAG_RELEASE)); __ceph_remove_cap(ocap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
} }
...@@ -3939,11 +3994,20 @@ int ceph_encode_inode_release(void **p, struct inode *inode, ...@@ -3939,11 +3994,20 @@ int ceph_encode_inode_release(void **p, struct inode *inode,
cap = __get_cap_for_mds(ci, mds); cap = __get_cap_for_mds(ci, mds);
if (cap && __cap_is_valid(cap)) { if (cap && __cap_is_valid(cap)) {
if (force || unless &= cap->issued;
((cap->issued & drop) && if (unless) {
(cap->issued & unless) == 0)) { if (unless & CEPH_CAP_AUTH_EXCL)
if ((cap->issued & drop) && drop &= ~CEPH_CAP_AUTH_SHARED;
(cap->issued & unless) == 0) { if (unless & CEPH_CAP_LINK_EXCL)
drop &= ~CEPH_CAP_LINK_SHARED;
if (unless & CEPH_CAP_XATTR_EXCL)
drop &= ~CEPH_CAP_XATTR_SHARED;
if (unless & CEPH_CAP_FILE_EXCL)
drop &= ~CEPH_CAP_FILE_SHARED;
}
if (force || (cap->issued & drop)) {
if (cap->issued & drop) {
int wanted = __ceph_caps_wanted(ci); int wanted = __ceph_caps_wanted(ci);
if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0) if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0)
wanted |= cap->mds_wanted; wanted |= cap->mds_wanted;
...@@ -3975,7 +4039,7 @@ int ceph_encode_inode_release(void **p, struct inode *inode, ...@@ -3975,7 +4039,7 @@ int ceph_encode_inode_release(void **p, struct inode *inode,
*p += sizeof(*rel); *p += sizeof(*rel);
ret = 1; ret = 1;
} else { } else {
dout("encode_inode_release %p cap %p %s\n", dout("encode_inode_release %p cap %p %s (noop)\n",
inode, cap, ceph_cap_string(cap->issued)); inode, cap, ceph_cap_string(cap->issued));
} }
} }
......
...@@ -173,7 +173,7 @@ __dcache_find_get_entry(struct dentry *parent, u64 idx, ...@@ -173,7 +173,7 @@ __dcache_find_get_entry(struct dentry *parent, u64 idx,
* the MDS if/when the directory is modified). * the MDS if/when the directory is modified).
*/ */
static int __dcache_readdir(struct file *file, struct dir_context *ctx, static int __dcache_readdir(struct file *file, struct dir_context *ctx,
u32 shared_gen) int shared_gen)
{ {
struct ceph_file_info *fi = file->private_data; struct ceph_file_info *fi = file->private_data;
struct dentry *parent = file->f_path.dentry; struct dentry *parent = file->f_path.dentry;
...@@ -184,7 +184,7 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx, ...@@ -184,7 +184,7 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx,
u64 idx = 0; u64 idx = 0;
int err = 0; int err = 0;
dout("__dcache_readdir %p v%u at %llx\n", dir, shared_gen, ctx->pos); dout("__dcache_readdir %p v%u at %llx\n", dir, (unsigned)shared_gen, ctx->pos);
/* search start position */ /* search start position */
if (ctx->pos > 2) { if (ctx->pos > 2) {
...@@ -231,11 +231,17 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx, ...@@ -231,11 +231,17 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx,
goto out; goto out;
} }
di = ceph_dentry(dentry);
spin_lock(&dentry->d_lock); spin_lock(&dentry->d_lock);
if (di->lease_shared_gen == shared_gen && di = ceph_dentry(dentry);
d_really_is_positive(dentry) && if (d_unhashed(dentry) ||
fpos_cmp(ctx->pos, di->offset) <= 0) { d_really_is_negative(dentry) ||
di->lease_shared_gen != shared_gen) {
spin_unlock(&dentry->d_lock);
dput(dentry);
err = -EAGAIN;
goto out;
}
if (fpos_cmp(ctx->pos, di->offset) <= 0) {
emit_dentry = true; emit_dentry = true;
} }
spin_unlock(&dentry->d_lock); spin_unlock(&dentry->d_lock);
...@@ -333,7 +339,7 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) ...@@ -333,7 +339,7 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
ceph_snap(inode) != CEPH_SNAPDIR && ceph_snap(inode) != CEPH_SNAPDIR &&
__ceph_dir_is_complete_ordered(ci) && __ceph_dir_is_complete_ordered(ci) &&
__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) { __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) {
u32 shared_gen = ci->i_shared_gen; int shared_gen = atomic_read(&ci->i_shared_gen);
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
err = __dcache_readdir(file, ctx, shared_gen); err = __dcache_readdir(file, ctx, shared_gen);
if (err != -EAGAIN) if (err != -EAGAIN)
...@@ -381,6 +387,7 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) ...@@ -381,6 +387,7 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
if (op == CEPH_MDS_OP_READDIR) { if (op == CEPH_MDS_OP_READDIR) {
req->r_direct_hash = ceph_frag_value(frag); req->r_direct_hash = ceph_frag_value(frag);
__set_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); __set_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
req->r_inode_drop = CEPH_CAP_FILE_EXCL;
} }
if (fi->last_name) { if (fi->last_name) {
req->r_path2 = kstrdup(fi->last_name, GFP_KERNEL); req->r_path2 = kstrdup(fi->last_name, GFP_KERNEL);
...@@ -750,7 +757,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry, ...@@ -750,7 +757,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
dout(" dir %p complete, -ENOENT\n", dir); dout(" dir %p complete, -ENOENT\n", dir);
d_add(dentry, NULL); d_add(dentry, NULL);
di->lease_shared_gen = ci->i_shared_gen; di->lease_shared_gen = atomic_read(&ci->i_shared_gen);
return NULL; return NULL;
} }
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
...@@ -835,7 +842,7 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry, ...@@ -835,7 +842,7 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry,
set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
req->r_args.mknod.mode = cpu_to_le32(mode); req->r_args.mknod.mode = cpu_to_le32(mode);
req->r_args.mknod.rdev = cpu_to_le32(rdev); req->r_args.mknod.rdev = cpu_to_le32(rdev);
req->r_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL;
req->r_dentry_unless = CEPH_CAP_FILE_EXCL; req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
if (acls.pagelist) { if (acls.pagelist) {
req->r_pagelist = acls.pagelist; req->r_pagelist = acls.pagelist;
...@@ -887,7 +894,7 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry, ...@@ -887,7 +894,7 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry,
set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
req->r_dentry = dget(dentry); req->r_dentry = dget(dentry);
req->r_num_caps = 2; req->r_num_caps = 2;
req->r_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL;
req->r_dentry_unless = CEPH_CAP_FILE_EXCL; req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
err = ceph_mdsc_do_request(mdsc, dir, req); err = ceph_mdsc_do_request(mdsc, dir, req);
if (!err && !req->r_reply_info.head->is_dentry) if (!err && !req->r_reply_info.head->is_dentry)
...@@ -936,7 +943,7 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode) ...@@ -936,7 +943,7 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
req->r_parent = dir; req->r_parent = dir;
set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
req->r_args.mkdir.mode = cpu_to_le32(mode); req->r_args.mkdir.mode = cpu_to_le32(mode);
req->r_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL;
req->r_dentry_unless = CEPH_CAP_FILE_EXCL; req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
if (acls.pagelist) { if (acls.pagelist) {
req->r_pagelist = acls.pagelist; req->r_pagelist = acls.pagelist;
...@@ -983,7 +990,7 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir, ...@@ -983,7 +990,7 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
req->r_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
req->r_dentry_unless = CEPH_CAP_FILE_EXCL; req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
/* release LINK_SHARED on source inode (mds will lock it) */ /* release LINK_SHARED on source inode (mds will lock it) */
req->r_old_inode_drop = CEPH_CAP_LINK_SHARED; req->r_old_inode_drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL;
err = ceph_mdsc_do_request(mdsc, dir, req); err = ceph_mdsc_do_request(mdsc, dir, req);
if (err) { if (err) {
d_drop(dentry); d_drop(dentry);
...@@ -1096,7 +1103,7 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry, ...@@ -1096,7 +1103,7 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
req->r_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
req->r_dentry_unless = CEPH_CAP_FILE_EXCL; req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
/* release LINK_RDCACHE on source inode (mds will lock it) */ /* release LINK_RDCACHE on source inode (mds will lock it) */
req->r_old_inode_drop = CEPH_CAP_LINK_SHARED; req->r_old_inode_drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL;
if (d_really_is_positive(new_dentry)) if (d_really_is_positive(new_dentry))
req->r_inode_drop = drop_caps_for_unlink(d_inode(new_dentry)); req->r_inode_drop = drop_caps_for_unlink(d_inode(new_dentry));
err = ceph_mdsc_do_request(mdsc, old_dir, req); err = ceph_mdsc_do_request(mdsc, old_dir, req);
...@@ -1106,16 +1113,7 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry, ...@@ -1106,16 +1113,7 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
* do_request, above). If there is no trace, we need * do_request, above). If there is no trace, we need
* to do it here. * to do it here.
*/ */
/* d_move screws up sibling dentries' offsets */
ceph_dir_clear_complete(old_dir);
ceph_dir_clear_complete(new_dir);
d_move(old_dentry, new_dentry); d_move(old_dentry, new_dentry);
/* ensure target dentry is invalidated, despite
rehashing bug in vfs_rename_dir */
ceph_invalidate_dentry_lease(new_dentry);
} }
ceph_mdsc_put_request(req); ceph_mdsc_put_request(req);
return err; return err;
...@@ -1199,12 +1197,12 @@ static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry) ...@@ -1199,12 +1197,12 @@ static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry)
int valid = 0; int valid = 0;
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
if (ci->i_shared_gen == di->lease_shared_gen) if (atomic_read(&ci->i_shared_gen) == di->lease_shared_gen)
valid = __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1); valid = __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1);
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
dout("dir_lease_is_valid dir %p v%u dentry %p v%u = %d\n", dout("dir_lease_is_valid dir %p v%u dentry %p v%u = %d\n",
dir, (unsigned)ci->i_shared_gen, dentry, dir, (unsigned)atomic_read(&ci->i_shared_gen),
(unsigned)di->lease_shared_gen, valid); dentry, (unsigned)di->lease_shared_gen, valid);
return valid; return valid;
} }
...@@ -1332,24 +1330,37 @@ static void ceph_d_release(struct dentry *dentry) ...@@ -1332,24 +1330,37 @@ static void ceph_d_release(struct dentry *dentry)
*/ */
static void ceph_d_prune(struct dentry *dentry) static void ceph_d_prune(struct dentry *dentry)
{ {
dout("ceph_d_prune %p\n", dentry); struct ceph_inode_info *dir_ci;
struct ceph_dentry_info *di;
dout("ceph_d_prune %pd %p\n", dentry, dentry);
/* do we have a valid parent? */ /* do we have a valid parent? */
if (IS_ROOT(dentry)) if (IS_ROOT(dentry))
return; return;
/* if we are not hashed, we don't affect dir's completeness */ /* we hold d_lock, so d_parent is stable */
if (d_unhashed(dentry)) dir_ci = ceph_inode(d_inode(dentry->d_parent));
if (dir_ci->i_vino.snap == CEPH_SNAPDIR)
return; return;
if (ceph_snap(d_inode(dentry->d_parent)) == CEPH_SNAPDIR) /* who calls d_delete() should also disable dcache readdir */
if (d_really_is_negative(dentry))
return; return;
/* /* d_fsdata does not get cleared until d_release */
* we hold d_lock, so d_parent is stable, and d_fsdata is never if (!d_unhashed(dentry)) {
* cleared until d_release __ceph_dir_clear_complete(dir_ci);
*/ return;
ceph_dir_clear_complete(d_inode(dentry->d_parent)); }
/* Disable dcache readdir just in case that someone called d_drop()
* or d_invalidate(), but MDS didn't revoke CEPH_CAP_FILE_SHARED
* properly (dcache readdir is still enabled) */
di = ceph_dentry(dentry);
if (di->offset > 0 &&
di->lease_shared_gen == atomic_read(&dir_ci->i_shared_gen))
__ceph_dir_clear_ordered(dir_ci);
} }
/* /*
......
...@@ -181,6 +181,10 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode) ...@@ -181,6 +181,10 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode)
return -ENOMEM; return -ENOMEM;
} }
cf->fmode = fmode; cf->fmode = fmode;
spin_lock_init(&cf->rw_contexts_lock);
INIT_LIST_HEAD(&cf->rw_contexts);
cf->next_offset = 2; cf->next_offset = 2;
cf->readdir_cache_idx = -1; cf->readdir_cache_idx = -1;
file->private_data = cf; file->private_data = cf;
...@@ -396,7 +400,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, ...@@ -396,7 +400,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
req->r_dentry = dget(dentry); req->r_dentry = dget(dentry);
req->r_num_caps = 2; req->r_num_caps = 2;
if (flags & O_CREAT) { if (flags & O_CREAT) {
req->r_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL;
req->r_dentry_unless = CEPH_CAP_FILE_EXCL; req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
if (acls.pagelist) { if (acls.pagelist) {
req->r_pagelist = acls.pagelist; req->r_pagelist = acls.pagelist;
...@@ -464,6 +468,7 @@ int ceph_release(struct inode *inode, struct file *file) ...@@ -464,6 +468,7 @@ int ceph_release(struct inode *inode, struct file *file)
ceph_mdsc_put_request(cf->last_readdir); ceph_mdsc_put_request(cf->last_readdir);
kfree(cf->last_name); kfree(cf->last_name);
kfree(cf->dir_info); kfree(cf->dir_info);
WARN_ON(!list_empty(&cf->rw_contexts));
kmem_cache_free(ceph_file_cachep, cf); kmem_cache_free(ceph_file_cachep, cf);
/* wake up anyone waiting for caps on this inode */ /* wake up anyone waiting for caps on this inode */
...@@ -1199,12 +1204,13 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to) ...@@ -1199,12 +1204,13 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
retry_op = READ_INLINE; retry_op = READ_INLINE;
} }
} else { } else {
CEPH_DEFINE_RW_CONTEXT(rw_ctx, got);
dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n", dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
ceph_cap_string(got)); ceph_cap_string(got));
current->journal_info = filp; ceph_add_rw_context(fi, &rw_ctx);
ret = generic_file_read_iter(iocb, to); ret = generic_file_read_iter(iocb, to);
current->journal_info = NULL; ceph_del_rw_context(fi, &rw_ctx);
} }
dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n", dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret); inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
......
...@@ -494,7 +494,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb) ...@@ -494,7 +494,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
ci->i_wrbuffer_ref = 0; ci->i_wrbuffer_ref = 0;
ci->i_wrbuffer_ref_head = 0; ci->i_wrbuffer_ref_head = 0;
atomic_set(&ci->i_filelock_ref, 0); atomic_set(&ci->i_filelock_ref, 0);
ci->i_shared_gen = 0; atomic_set(&ci->i_shared_gen, 0);
ci->i_rdcache_gen = 0; ci->i_rdcache_gen = 0;
ci->i_rdcache_revoking = 0; ci->i_rdcache_revoking = 0;
...@@ -1041,7 +1041,7 @@ static void update_dentry_lease(struct dentry *dentry, ...@@ -1041,7 +1041,7 @@ static void update_dentry_lease(struct dentry *dentry,
if (ceph_snap(dir) != CEPH_NOSNAP) if (ceph_snap(dir) != CEPH_NOSNAP)
goto out_unlock; goto out_unlock;
di->lease_shared_gen = ceph_inode(dir)->i_shared_gen; di->lease_shared_gen = atomic_read(&ceph_inode(dir)->i_shared_gen);
if (duration == 0) if (duration == 0)
goto out_unlock; goto out_unlock;
...@@ -1080,6 +1080,27 @@ static struct dentry *splice_dentry(struct dentry *dn, struct inode *in) ...@@ -1080,6 +1080,27 @@ static struct dentry *splice_dentry(struct dentry *dn, struct inode *in)
BUG_ON(d_inode(dn)); BUG_ON(d_inode(dn));
if (S_ISDIR(in->i_mode)) {
/* If inode is directory, d_splice_alias() below will remove
* 'realdn' from its origin parent. We need to ensure that
* origin parent's readdir cache will not reference 'realdn'
*/
realdn = d_find_any_alias(in);
if (realdn) {
struct ceph_dentry_info *di = ceph_dentry(realdn);
spin_lock(&realdn->d_lock);
realdn->d_op->d_prune(realdn);
di->time = jiffies;
di->lease_shared_gen = 0;
di->offset = 0;
spin_unlock(&realdn->d_lock);
dput(realdn);
}
}
/* dn must be unhashed */ /* dn must be unhashed */
if (!d_unhashed(dn)) if (!d_unhashed(dn))
d_drop(dn); d_drop(dn);
...@@ -1295,8 +1316,8 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req) ...@@ -1295,8 +1316,8 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req)
if (!rinfo->head->is_target) { if (!rinfo->head->is_target) {
dout("fill_trace null dentry\n"); dout("fill_trace null dentry\n");
if (d_really_is_positive(dn)) { if (d_really_is_positive(dn)) {
ceph_dir_clear_ordered(dir);
dout("d_delete %p\n", dn); dout("d_delete %p\n", dn);
ceph_dir_clear_ordered(dir);
d_delete(dn); d_delete(dn);
} else if (have_lease) { } else if (have_lease) {
if (d_unhashed(dn)) if (d_unhashed(dn))
...@@ -1323,7 +1344,6 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req) ...@@ -1323,7 +1344,6 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req)
dout(" %p links to %p %llx.%llx, not %llx.%llx\n", dout(" %p links to %p %llx.%llx, not %llx.%llx\n",
dn, d_inode(dn), ceph_vinop(d_inode(dn)), dn, d_inode(dn), ceph_vinop(d_inode(dn)),
ceph_vinop(in)); ceph_vinop(in));
ceph_dir_clear_ordered(dir);
d_invalidate(dn); d_invalidate(dn);
have_lease = false; have_lease = false;
} }
...@@ -1573,9 +1593,19 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, ...@@ -1573,9 +1593,19 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
} else if (d_really_is_positive(dn) && } else if (d_really_is_positive(dn) &&
(ceph_ino(d_inode(dn)) != tvino.ino || (ceph_ino(d_inode(dn)) != tvino.ino ||
ceph_snap(d_inode(dn)) != tvino.snap)) { ceph_snap(d_inode(dn)) != tvino.snap)) {
struct ceph_dentry_info *di = ceph_dentry(dn);
dout(" dn %p points to wrong inode %p\n", dout(" dn %p points to wrong inode %p\n",
dn, d_inode(dn)); dn, d_inode(dn));
__ceph_dir_clear_ordered(ci);
spin_lock(&dn->d_lock);
if (di->offset > 0 &&
di->lease_shared_gen ==
atomic_read(&ci->i_shared_gen)) {
__ceph_dir_clear_ordered(ci);
di->offset = 0;
}
spin_unlock(&dn->d_lock);
d_delete(dn); d_delete(dn);
dput(dn); dput(dn);
goto retry_lookup; goto retry_lookup;
...@@ -1600,9 +1630,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, ...@@ -1600,9 +1630,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
&req->r_caps_reservation); &req->r_caps_reservation);
if (ret < 0) { if (ret < 0) {
pr_err("fill_inode badness on %p\n", in); pr_err("fill_inode badness on %p\n", in);
if (d_really_is_positive(dn)) if (d_really_is_negative(dn))
__ceph_dir_clear_ordered(ci);
else
iput(in); iput(in);
d_drop(dn); d_drop(dn);
err = ret; err = ret;
...@@ -2000,8 +2028,8 @@ int __ceph_setattr(struct inode *inode, struct iattr *attr) ...@@ -2000,8 +2028,8 @@ int __ceph_setattr(struct inode *inode, struct iattr *attr)
ceph_encode_timespec(&req->r_args.setattr.atime, ceph_encode_timespec(&req->r_args.setattr.atime,
&attr->ia_atime); &attr->ia_atime);
mask |= CEPH_SETATTR_ATIME; mask |= CEPH_SETATTR_ATIME;
release |= CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_RD | release |= CEPH_CAP_FILE_SHARED |
CEPH_CAP_FILE_WR; CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR;
} }
} }
if (ia_valid & ATTR_MTIME) { if (ia_valid & ATTR_MTIME) {
...@@ -2022,8 +2050,8 @@ int __ceph_setattr(struct inode *inode, struct iattr *attr) ...@@ -2022,8 +2050,8 @@ int __ceph_setattr(struct inode *inode, struct iattr *attr)
ceph_encode_timespec(&req->r_args.setattr.mtime, ceph_encode_timespec(&req->r_args.setattr.mtime,
&attr->ia_mtime); &attr->ia_mtime);
mask |= CEPH_SETATTR_MTIME; mask |= CEPH_SETATTR_MTIME;
release |= CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_RD | release |= CEPH_CAP_FILE_SHARED |
CEPH_CAP_FILE_WR; CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR;
} }
} }
if (ia_valid & ATTR_SIZE) { if (ia_valid & ATTR_SIZE) {
...@@ -2041,8 +2069,8 @@ int __ceph_setattr(struct inode *inode, struct iattr *attr) ...@@ -2041,8 +2069,8 @@ int __ceph_setattr(struct inode *inode, struct iattr *attr)
req->r_args.setattr.old_size = req->r_args.setattr.old_size =
cpu_to_le64(inode->i_size); cpu_to_le64(inode->i_size);
mask |= CEPH_SETATTR_SIZE; mask |= CEPH_SETATTR_SIZE;
release |= CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_RD | release |= CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL |
CEPH_CAP_FILE_WR; CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR;
} }
} }
......
...@@ -604,10 +604,20 @@ static void __register_request(struct ceph_mds_client *mdsc, ...@@ -604,10 +604,20 @@ static void __register_request(struct ceph_mds_client *mdsc,
struct ceph_mds_request *req, struct ceph_mds_request *req,
struct inode *dir) struct inode *dir)
{ {
int ret = 0;
req->r_tid = ++mdsc->last_tid; req->r_tid = ++mdsc->last_tid;
if (req->r_num_caps) if (req->r_num_caps) {
ceph_reserve_caps(mdsc, &req->r_caps_reservation, ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
req->r_num_caps); req->r_num_caps);
if (ret < 0) {
pr_err("__register_request %p "
"failed to reserve caps: %d\n", req, ret);
/* set req->r_err to fail early from __do_request */
req->r_err = ret;
return;
}
}
dout("__register_request %p tid %lld\n", req, req->r_tid); dout("__register_request %p tid %lld\n", req, req->r_tid);
ceph_mdsc_get_request(req); ceph_mdsc_get_request(req);
insert_request(&mdsc->request_tree, req); insert_request(&mdsc->request_tree, req);
...@@ -1545,9 +1555,9 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) ...@@ -1545,9 +1555,9 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
/* /*
* Trim session cap count down to some max number. * Trim session cap count down to some max number.
*/ */
static int trim_caps(struct ceph_mds_client *mdsc, int ceph_trim_caps(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session, struct ceph_mds_session *session,
int max_caps) int max_caps)
{ {
int trim_caps = session->s_nr_caps - max_caps; int trim_caps = session->s_nr_caps - max_caps;
...@@ -2438,11 +2448,14 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, ...@@ -2438,11 +2448,14 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
*/ */
void ceph_invalidate_dir_request(struct ceph_mds_request *req) void ceph_invalidate_dir_request(struct ceph_mds_request *req)
{ {
struct inode *inode = req->r_parent; struct inode *dir = req->r_parent;
struct inode *old_dir = req->r_old_dentry_dir;
dout("invalidate_dir_request %p (complete, lease(s))\n", inode); dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir);
ceph_dir_clear_complete(inode); ceph_dir_clear_complete(dir);
if (old_dir)
ceph_dir_clear_complete(old_dir);
if (req->r_dentry) if (req->r_dentry)
ceph_invalidate_dentry_lease(req->r_dentry); ceph_invalidate_dentry_lease(req->r_dentry);
if (req->r_old_dentry) if (req->r_old_dentry)
...@@ -2773,7 +2786,7 @@ static void handle_session(struct ceph_mds_session *session, ...@@ -2773,7 +2786,7 @@ static void handle_session(struct ceph_mds_session *session,
break; break;
case CEPH_SESSION_RECALL_STATE: case CEPH_SESSION_RECALL_STATE:
trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
break; break;
case CEPH_SESSION_FLUSHMSG: case CEPH_SESSION_FLUSHMSG:
......
...@@ -444,4 +444,7 @@ ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target); ...@@ -444,4 +444,7 @@ ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target);
extern void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, extern void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session); struct ceph_mds_session *session);
extern int ceph_trim_caps(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session,
int max_caps);
#endif #endif
...@@ -922,13 +922,17 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc, ...@@ -922,13 +922,17 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
/* /*
* Move the inode to the new realm * Move the inode to the new realm
*/ */
spin_lock(&realm->inodes_with_caps_lock); oldrealm = ci->i_snap_realm;
spin_lock(&oldrealm->inodes_with_caps_lock);
list_del_init(&ci->i_snap_realm_item); list_del_init(&ci->i_snap_realm_item);
spin_unlock(&oldrealm->inodes_with_caps_lock);
spin_lock(&realm->inodes_with_caps_lock);
list_add(&ci->i_snap_realm_item, list_add(&ci->i_snap_realm_item,
&realm->inodes_with_caps); &realm->inodes_with_caps);
oldrealm = ci->i_snap_realm;
ci->i_snap_realm = realm; ci->i_snap_realm = realm;
spin_unlock(&realm->inodes_with_caps_lock); spin_unlock(&realm->inodes_with_caps_lock);
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
ceph_get_snap_realm(mdsc, realm); ceph_get_snap_realm(mdsc, realm);
......
...@@ -256,7 +256,8 @@ struct ceph_inode_xattr { ...@@ -256,7 +256,8 @@ struct ceph_inode_xattr {
*/ */
struct ceph_dentry_info { struct ceph_dentry_info {
struct ceph_mds_session *lease_session; struct ceph_mds_session *lease_session;
u32 lease_gen, lease_shared_gen; int lease_shared_gen;
u32 lease_gen;
u32 lease_seq; u32 lease_seq;
unsigned long lease_renew_after, lease_renew_from; unsigned long lease_renew_after, lease_renew_from;
struct list_head lru; struct list_head lru;
...@@ -353,7 +354,7 @@ struct ceph_inode_info { ...@@ -353,7 +354,7 @@ struct ceph_inode_info {
int i_rd_ref, i_rdcache_ref, i_wr_ref, i_wb_ref; int i_rd_ref, i_rdcache_ref, i_wr_ref, i_wb_ref;
int i_wrbuffer_ref, i_wrbuffer_ref_head; int i_wrbuffer_ref, i_wrbuffer_ref_head;
atomic_t i_filelock_ref; atomic_t i_filelock_ref;
u32 i_shared_gen; /* increment each time we get FILE_SHARED */ atomic_t i_shared_gen; /* increment each time we get FILE_SHARED */
u32 i_rdcache_gen; /* incremented each time we get FILE_CACHE. */ u32 i_rdcache_gen; /* incremented each time we get FILE_CACHE. */
u32 i_rdcache_revoking; /* RDCACHE gen to async invalidate, if any */ u32 i_rdcache_revoking; /* RDCACHE gen to async invalidate, if any */
...@@ -648,7 +649,7 @@ extern int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check); ...@@ -648,7 +649,7 @@ extern int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check);
extern void ceph_caps_init(struct ceph_mds_client *mdsc); extern void ceph_caps_init(struct ceph_mds_client *mdsc);
extern void ceph_caps_finalize(struct ceph_mds_client *mdsc); extern void ceph_caps_finalize(struct ceph_mds_client *mdsc);
extern void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta); extern void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta);
extern void ceph_reserve_caps(struct ceph_mds_client *mdsc, extern int ceph_reserve_caps(struct ceph_mds_client *mdsc,
struct ceph_cap_reservation *ctx, int need); struct ceph_cap_reservation *ctx, int need);
extern int ceph_unreserve_caps(struct ceph_mds_client *mdsc, extern int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
struct ceph_cap_reservation *ctx); struct ceph_cap_reservation *ctx);
...@@ -668,6 +669,9 @@ struct ceph_file_info { ...@@ -668,6 +669,9 @@ struct ceph_file_info {
short fmode; /* initialized on open */ short fmode; /* initialized on open */
short flags; /* CEPH_F_* */ short flags; /* CEPH_F_* */
spinlock_t rw_contexts_lock;
struct list_head rw_contexts;
/* readdir: position within the dir */ /* readdir: position within the dir */
u32 frag; u32 frag;
struct ceph_mds_request *last_readdir; struct ceph_mds_request *last_readdir;
...@@ -684,6 +688,49 @@ struct ceph_file_info { ...@@ -684,6 +688,49 @@ struct ceph_file_info {
int dir_info_len; int dir_info_len;
}; };
struct ceph_rw_context {
struct list_head list;
struct task_struct *thread;
int caps;
};
#define CEPH_DEFINE_RW_CONTEXT(_name, _caps) \
struct ceph_rw_context _name = { \
.thread = current, \
.caps = _caps, \
}
static inline void ceph_add_rw_context(struct ceph_file_info *cf,
struct ceph_rw_context *ctx)
{
spin_lock(&cf->rw_contexts_lock);
list_add(&ctx->list, &cf->rw_contexts);
spin_unlock(&cf->rw_contexts_lock);
}
static inline void ceph_del_rw_context(struct ceph_file_info *cf,
struct ceph_rw_context *ctx)
{
spin_lock(&cf->rw_contexts_lock);
list_del(&ctx->list);
spin_unlock(&cf->rw_contexts_lock);
}
static inline struct ceph_rw_context*
ceph_find_rw_context(struct ceph_file_info *cf)
{
struct ceph_rw_context *ctx, *found = NULL;
spin_lock(&cf->rw_contexts_lock);
list_for_each_entry(ctx, &cf->rw_contexts, list) {
if (ctx->thread == current) {
found = ctx;
break;
}
}
spin_unlock(&cf->rw_contexts_lock);
return found;
}
struct ceph_readdir_cache_control { struct ceph_readdir_cache_control {
struct page *page; struct page *page;
struct dentry **dentries; struct dentry **dentries;
......
...@@ -421,6 +421,10 @@ ceph_parse_options(char *options, const char *dev_name, ...@@ -421,6 +421,10 @@ ceph_parse_options(char *options, const char *dev_name,
opt->name = kstrndup(argstr[0].from, opt->name = kstrndup(argstr[0].from,
argstr[0].to-argstr[0].from, argstr[0].to-argstr[0].from,
GFP_KERNEL); GFP_KERNEL);
if (!opt->name) {
err = -ENOMEM;
goto out;
}
break; break;
case Opt_secret: case Opt_secret:
opt->key = kzalloc(sizeof(*opt->key), GFP_KERNEL); opt->key = kzalloc(sizeof(*opt->key), GFP_KERNEL);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment