Commit a10c38a4 authored by Linus Torvalds's avatar Linus Torvalds

Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

Pull Ceph updates from Sage Weil:
 "This changeset has a few main parts:

   - Ilya has finished a huge refactoring effort to sync up the
     client-side logic in libceph with the user-space client code, which
     has evolved significantly over the last couple years, with lots of
     additional behaviors (e.g., how requests are handled when cluster
     is full and transitions from full to non-full).

     This structure of the code is more closely aligned with userspace
     now such that it will be much easier to maintain going forward when
     behavior changes take place.  There are some locking improvements
     bundled in as well.

   - Zheng adds multi-filesystem support (multiple namespaces within the
     same Ceph cluster)

   - Zheng has changed the readdir offsets and directory enumeration so
     that dentry offsets are hash-based and therefore stable across
     directory fragmentation events on the MDS.

   - Zheng has a smorgasbord of bug fixes across fs/ceph"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (71 commits)
  ceph: fix wake_up_session_cb()
  ceph: don't use truncate_pagecache() to invalidate read cache
  ceph: SetPageError() for writeback pages if writepages fails
  ceph: handle interrupted ceph_writepage()
  ceph: make ceph_update_writeable_page() uninterruptible
  libceph: make ceph_osdc_wait_request() uninterruptible
  ceph: handle -EAGAIN returned by ceph_update_writeable_page()
  ceph: make fault/page_mkwrite return VM_FAULT_OOM for -ENOMEM
  ceph: block non-fatal signals for fault/page_mkwrite
  ceph: make logical calculation functions return bool
  ceph: tolerate bad i_size for symlink inode
  ceph: improve fragtree change detection
  ceph: keep leaf frag when updating fragtree
  ceph: fix dir_auth check in ceph_fill_dirfrag()
  ceph: don't assume frag tree splits in mds reply are sorted
  ceph: fix inode reference leak
  ceph: using hash value to compose dentry offset
  ceph: don't forbid marking directory complete after forward seek
  ceph: record 'offset' for each entry of readdir result
  ceph: define 'end/complete' in readdir reply as bit flags
  ...
parents ea8ea737 e5360309
This diff is collapsed.
This diff is collapsed.
......@@ -236,7 +236,7 @@ static void ceph_vfs_readpage_complete_unlock(struct page *page, void *data, int
unlock_page(page);
}
static inline int cache_valid(struct ceph_inode_info *ci)
static inline bool cache_valid(struct ceph_inode_info *ci)
{
return ((ceph_caps_issued(ci) & CEPH_CAP_FILE_CACHE) &&
(ci->i_fscache_gen == ci->i_rdcache_gen));
......
......@@ -1656,7 +1656,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
*/
if ((!is_delayed || mdsc->stopping) &&
!S_ISDIR(inode->i_mode) && /* ignore readdir cache */
ci->i_wrbuffer_ref == 0 && /* no dirty pages... */
!(ci->i_wb_ref || ci->i_wrbuffer_ref) && /* no dirty pages... */
inode->i_data.nrpages && /* have cached pages */
(revoking & (CEPH_CAP_FILE_CACHE|
CEPH_CAP_FILE_LAZYIO)) && /* or revoking cache */
......@@ -1698,8 +1698,8 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
revoking = cap->implemented & ~cap->issued;
dout(" mds%d cap %p used %s issued %s implemented %s revoking %s\n",
cap->mds, cap, ceph_cap_string(cap->issued),
ceph_cap_string(cap_used),
cap->mds, cap, ceph_cap_string(cap_used),
ceph_cap_string(cap->issued),
ceph_cap_string(cap->implemented),
ceph_cap_string(revoking));
......@@ -2317,7 +2317,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
/* make sure file is actually open */
file_wanted = __ceph_caps_file_wanted(ci);
if ((file_wanted & need) == 0) {
if ((file_wanted & need) != need) {
dout("try_get_cap_refs need %s file_wanted %s, EBADF\n",
ceph_cap_string(need), ceph_cap_string(file_wanted));
*err = -EBADF;
......@@ -2412,12 +2412,26 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
goto out_unlock;
}
if (!__ceph_is_any_caps(ci) &&
ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
dout("get_cap_refs %p forced umount\n", inode);
*err = -EIO;
ret = 1;
goto out_unlock;
if (ci->i_ceph_flags & CEPH_I_CAP_DROPPED) {
int mds_wanted;
if (ACCESS_ONCE(mdsc->fsc->mount_state) ==
CEPH_MOUNT_SHUTDOWN) {
dout("get_cap_refs %p forced umount\n", inode);
*err = -EIO;
ret = 1;
goto out_unlock;
}
mds_wanted = __ceph_caps_mds_wanted(ci);
if ((mds_wanted & need) != need) {
dout("get_cap_refs %p caps were dropped"
" (session killed?)\n", inode);
*err = -ESTALE;
ret = 1;
goto out_unlock;
}
if ((mds_wanted & file_wanted) ==
(file_wanted & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR)))
ci->i_ceph_flags &= ~CEPH_I_CAP_DROPPED;
}
dout("get_cap_refs %p have %s needed %s\n", inode,
......@@ -2487,7 +2501,7 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
if (err == -EAGAIN)
continue;
if (err < 0)
return err;
ret = err;
} else {
ret = wait_event_interruptible(ci->i_cap_wq,
try_get_cap_refs(ci, need, want, endoff,
......@@ -2496,8 +2510,15 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
continue;
if (err < 0)
ret = err;
if (ret < 0)
return ret;
}
if (ret < 0) {
if (err == -ESTALE) {
/* session was killed, try renew caps */
ret = ceph_renew_caps(&ci->vfs_inode);
if (ret == 0)
continue;
}
return ret;
}
if (ci->i_inline_version != CEPH_INLINE_NONE &&
......@@ -2807,7 +2828,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
if (!S_ISDIR(inode->i_mode) && /* don't invalidate readdir cache */
((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
(newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
!ci->i_wrbuffer_ref) {
!(ci->i_wrbuffer_ref || ci->i_wb_ref)) {
if (try_nonblocking_invalidate(inode)) {
/* there were locked pages.. invalidate later
in a separate thread. */
......@@ -3226,6 +3247,8 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
if (target < 0) {
__ceph_remove_cap(cap, false);
if (!ci->i_auth_cap)
ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
goto out_unlock;
}
......
......@@ -109,7 +109,7 @@ static int mdsc_show(struct seq_file *s, void *p)
path ? path : "");
spin_unlock(&req->r_old_dentry->d_lock);
kfree(path);
} else if (req->r_path2) {
} else if (req->r_path2 && req->r_op != CEPH_MDS_OP_SYMLINK) {
if (req->r_ino2.ino)
seq_printf(s, " #%llx/%s", req->r_ino2.ino,
req->r_path2);
......
This diff is collapsed.
......@@ -191,6 +191,59 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode)
return ret;
}
/*
* try renew caps after session gets killed.
*/
int ceph_renew_caps(struct inode *inode)
{
struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_mds_request *req;
int err, flags, wanted;
spin_lock(&ci->i_ceph_lock);
wanted = __ceph_caps_file_wanted(ci);
if (__ceph_is_any_real_caps(ci) &&
(!(wanted & CEPH_CAP_ANY_WR) == 0 || ci->i_auth_cap)) {
int issued = __ceph_caps_issued(ci, NULL);
spin_unlock(&ci->i_ceph_lock);
dout("renew caps %p want %s issued %s updating mds_wanted\n",
inode, ceph_cap_string(wanted), ceph_cap_string(issued));
ceph_check_caps(ci, 0, NULL);
return 0;
}
spin_unlock(&ci->i_ceph_lock);
flags = 0;
if ((wanted & CEPH_CAP_FILE_RD) && (wanted & CEPH_CAP_FILE_WR))
flags = O_RDWR;
else if (wanted & CEPH_CAP_FILE_RD)
flags = O_RDONLY;
else if (wanted & CEPH_CAP_FILE_WR)
flags = O_WRONLY;
#ifdef O_LAZY
if (wanted & CEPH_CAP_FILE_LAZYIO)
flags |= O_LAZY;
#endif
req = prepare_open_request(inode->i_sb, flags, 0);
if (IS_ERR(req)) {
err = PTR_ERR(req);
goto out;
}
req->r_inode = inode;
ihold(inode);
req->r_num_caps = 1;
req->r_fmode = -1;
err = ceph_mdsc_do_request(mdsc, NULL, req);
ceph_mdsc_put_request(req);
out:
dout("renew caps %p open result=%d\n", inode, err);
return err < 0 ? err : 0;
}
/*
* If we already have the requisite capabilities, we can satisfy
* the open request locally (no need to request new caps from the
......@@ -616,8 +669,7 @@ static void ceph_aio_complete(struct inode *inode,
kfree(aio_req);
}
static void ceph_aio_complete_req(struct ceph_osd_request *req,
struct ceph_msg *msg)
static void ceph_aio_complete_req(struct ceph_osd_request *req)
{
int rc = req->r_result;
struct inode *inode = req->r_inode;
......@@ -714,14 +766,21 @@ static void ceph_aio_retry_work(struct work_struct *work)
req->r_flags = CEPH_OSD_FLAG_ORDERSNAP |
CEPH_OSD_FLAG_ONDISK |
CEPH_OSD_FLAG_WRITE;
req->r_base_oloc = orig_req->r_base_oloc;
req->r_base_oid = orig_req->r_base_oid;
ceph_oloc_copy(&req->r_base_oloc, &orig_req->r_base_oloc);
ceph_oid_copy(&req->r_base_oid, &orig_req->r_base_oid);
ret = ceph_osdc_alloc_messages(req, GFP_NOFS);
if (ret) {
ceph_osdc_put_request(req);
req = orig_req;
goto out;
}
req->r_ops[0] = orig_req->r_ops[0];
osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
ceph_osdc_build_request(req, req->r_ops[0].extent.offset,
snapc, CEPH_NOSNAP, &aio_req->mtime);
req->r_mtime = aio_req->mtime;
req->r_data_offset = req->r_ops[0].extent.offset;
ceph_osdc_put_request(orig_req);
......@@ -733,7 +792,7 @@ static void ceph_aio_retry_work(struct work_struct *work)
out:
if (ret < 0) {
req->r_result = ret;
ceph_aio_complete_req(req, NULL);
ceph_aio_complete_req(req);
}
ceph_put_snap_context(snapc);
......@@ -764,6 +823,8 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
list_add_tail(&req->r_unsafe_item,
&ci->i_unsafe_writes);
spin_unlock(&ci->i_unsafe_lock);
complete_all(&req->r_completion);
} else {
spin_lock(&ci->i_unsafe_lock);
list_del_init(&req->r_unsafe_item);
......@@ -875,14 +936,12 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
(pos+len) | (PAGE_SIZE - 1));
osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
req->r_mtime = mtime;
}
osd_req_op_extent_osd_data_pages(req, 0, pages, len, start,
false, false);
ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
if (aio_req) {
aio_req->total_len += len;
aio_req->num_reqs++;
......@@ -956,7 +1015,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
req, false);
if (ret < 0) {
req->r_result = ret;
ceph_aio_complete_req(req, NULL);
ceph_aio_complete_req(req);
}
}
return -EIOCBQUEUED;
......@@ -1067,9 +1126,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
false, true);
/* BUG_ON(vino.snap != CEPH_NOSNAP); */
ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
req->r_mtime = mtime;
ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
if (!ret)
ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
......@@ -1524,9 +1581,7 @@ static int ceph_zero_partial_object(struct inode *inode,
goto out;
}
ceph_osdc_build_request(req, offset, NULL, ceph_vino(inode).snap,
&inode->i_mtime);
req->r_mtime = inode->i_mtime;
ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
if (!ret) {
ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
......
This diff is collapsed.
......@@ -193,12 +193,12 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
if (copy_from_user(&dl, arg, sizeof(dl)))
return -EFAULT;
down_read(&osdc->map_sem);
down_read(&osdc->lock);
r = ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, len,
&dl.object_no, &dl.object_offset,
&olen);
if (r < 0) {
up_read(&osdc->map_sem);
up_read(&osdc->lock);
return -EIO;
}
dl.file_offset -= dl.object_offset;
......@@ -213,15 +213,15 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
ceph_ino(inode), dl.object_no);
oloc.pool = ceph_file_layout_pg_pool(ci->i_layout);
ceph_oid_set_name(&oid, dl.object_name);
ceph_oid_printf(&oid, "%s", dl.object_name);
r = ceph_oloc_oid_to_pg(osdc->osdmap, &oloc, &oid, &pgid);
r = ceph_object_locator_to_pg(osdc->osdmap, &oid, &oloc, &pgid);
if (r < 0) {
up_read(&osdc->map_sem);
up_read(&osdc->lock);
return r;
}
dl.osd = ceph_calc_pg_primary(osdc->osdmap, pgid);
dl.osd = ceph_pg_to_acting_primary(osdc->osdmap, &pgid);
if (dl.osd >= 0) {
struct ceph_entity_addr *a =
ceph_osd_addr(osdc->osdmap, dl.osd);
......@@ -230,7 +230,7 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
} else {
memset(&dl.osd_addr, 0, sizeof(dl.osd_addr));
}
up_read(&osdc->map_sem);
up_read(&osdc->lock);
/* send result back to user */
if (copy_to_user(arg, &dl, sizeof(dl)))
......
......@@ -181,17 +181,18 @@ static int parse_reply_info_dir(void **p, void *end,
ceph_decode_need(p, end, sizeof(num) + 2, bad);
num = ceph_decode_32(p);
info->dir_end = ceph_decode_8(p);
info->dir_complete = ceph_decode_8(p);
{
u16 flags = ceph_decode_16(p);
info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
}
if (num == 0)
goto done;
BUG_ON(!info->dir_in);
info->dir_dname = (void *)(info->dir_in + num);
info->dir_dname_len = (void *)(info->dir_dname + num);
info->dir_dlease = (void *)(info->dir_dname_len + num);
if ((unsigned long)(info->dir_dlease + num) >
(unsigned long)info->dir_in + info->dir_buf_size) {
BUG_ON(!info->dir_entries);
if ((unsigned long)(info->dir_entries + num) >
(unsigned long)info->dir_entries + info->dir_buf_size) {
pr_err("dir contents are larger than expected\n");
WARN_ON(1);
goto bad;
......@@ -199,21 +200,23 @@ static int parse_reply_info_dir(void **p, void *end,
info->dir_nr = num;
while (num) {
struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
/* dentry */
ceph_decode_need(p, end, sizeof(u32)*2, bad);
info->dir_dname_len[i] = ceph_decode_32(p);
ceph_decode_need(p, end, info->dir_dname_len[i], bad);
info->dir_dname[i] = *p;
*p += info->dir_dname_len[i];
dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i],
info->dir_dname[i]);
info->dir_dlease[i] = *p;
rde->name_len = ceph_decode_32(p);
ceph_decode_need(p, end, rde->name_len, bad);
rde->name = *p;
*p += rde->name_len;
dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
rde->lease = *p;
*p += sizeof(struct ceph_mds_reply_lease);
/* inode */
err = parse_reply_info_in(p, end, &info->dir_in[i], features);
err = parse_reply_info_in(p, end, &rde->inode, features);
if (err < 0)
goto out_bad;
/* ceph_readdir_prepopulate() will update it */
rde->offset = 0;
i++;
num--;
}
......@@ -345,9 +348,9 @@ static int parse_reply_info(struct ceph_msg *msg,
static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
{
if (!info->dir_in)
if (!info->dir_entries)
return;
free_pages((unsigned long)info->dir_in, get_order(info->dir_buf_size));
free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
}
......@@ -567,51 +570,23 @@ void ceph_mdsc_release_request(struct kref *kref)
kfree(req);
}
DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
/*
* lookup session, bump ref if found.
*
* called under mdsc->mutex.
*/
static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc,
u64 tid)
static struct ceph_mds_request *
lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
{
struct ceph_mds_request *req;
struct rb_node *n = mdsc->request_tree.rb_node;
while (n) {
req = rb_entry(n, struct ceph_mds_request, r_node);
if (tid < req->r_tid)
n = n->rb_left;
else if (tid > req->r_tid)
n = n->rb_right;
else {
ceph_mdsc_get_request(req);
return req;
}
}
return NULL;
}
static void __insert_request(struct ceph_mds_client *mdsc,
struct ceph_mds_request *new)
{
struct rb_node **p = &mdsc->request_tree.rb_node;
struct rb_node *parent = NULL;
struct ceph_mds_request *req = NULL;
req = lookup_request(&mdsc->request_tree, tid);
if (req)
ceph_mdsc_get_request(req);
while (*p) {
parent = *p;
req = rb_entry(parent, struct ceph_mds_request, r_node);
if (new->r_tid < req->r_tid)
p = &(*p)->rb_left;
else if (new->r_tid > req->r_tid)
p = &(*p)->rb_right;
else
BUG();
}
rb_link_node(&new->r_node, parent, p);
rb_insert_color(&new->r_node, &mdsc->request_tree);
return req;
}
/*
......@@ -630,7 +605,7 @@ static void __register_request(struct ceph_mds_client *mdsc,
req->r_num_caps);
dout("__register_request %p tid %lld\n", req, req->r_tid);
ceph_mdsc_get_request(req);
__insert_request(mdsc, req);
insert_request(&mdsc->request_tree, req);
req->r_uid = current_fsuid();
req->r_gid = current_fsgid();
......@@ -663,8 +638,7 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
}
}
rb_erase(&req->r_node, &mdsc->request_tree);
RB_CLEAR_NODE(&req->r_node);
erase_request(&mdsc->request_tree, req);
if (req->r_unsafe_dir && req->r_got_unsafe) {
struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
......@@ -868,12 +842,14 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
int metadata_bytes = 0;
int metadata_key_count = 0;
struct ceph_options *opt = mdsc->fsc->client->options;
struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
void *p;
const char* metadata[][2] = {
{"hostname", utsname()->nodename},
{"kernel_version", utsname()->release},
{"entity_id", opt->name ? opt->name : ""},
{"entity_id", opt->name ? : ""},
{"root", fsopt->server_path ? : "/"},
{NULL, NULL}
};
......@@ -1149,9 +1125,11 @@ static int iterate_session_caps(struct ceph_mds_session *session,
static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
void *arg)
{
struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
struct ceph_inode_info *ci = ceph_inode(inode);
LIST_HEAD(to_remove);
int drop = 0;
bool drop = false;
bool invalidate = false;
dout("removing cap %p, ci is %p, inode is %p\n",
cap, ci, &ci->vfs_inode);
......@@ -1159,8 +1137,13 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
__ceph_remove_cap(cap, false);
if (!ci->i_auth_cap) {
struct ceph_cap_flush *cf;
struct ceph_mds_client *mdsc =
ceph_sb_to_client(inode->i_sb)->mdsc;
struct ceph_mds_client *mdsc = fsc->mdsc;
ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
if (ci->i_wrbuffer_ref > 0 &&
ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
invalidate = true;
while (true) {
struct rb_node *n = rb_first(&ci->i_cap_flush_tree);
......@@ -1183,7 +1166,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
inode, ceph_ino(inode));
ci->i_dirty_caps = 0;
list_del_init(&ci->i_dirty_item);
drop = 1;
drop = true;
}
if (!list_empty(&ci->i_flushing_item)) {
pr_warn_ratelimited(
......@@ -1193,7 +1176,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
ci->i_flushing_caps = 0;
list_del_init(&ci->i_flushing_item);
mdsc->num_cap_flushing--;
drop = 1;
drop = true;
}
spin_unlock(&mdsc->cap_dirty_lock);
......@@ -1210,7 +1193,11 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
list_del(&cf->list);
ceph_free_cap_flush(cf);
}
while (drop--)
wake_up_all(&ci->i_cap_wq);
if (invalidate)
ceph_queue_invalidate(inode);
if (drop)
iput(inode);
return 0;
}
......@@ -1220,12 +1207,13 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
*/
static void remove_session_caps(struct ceph_mds_session *session)
{
struct ceph_fs_client *fsc = session->s_mdsc->fsc;
struct super_block *sb = fsc->sb;
dout("remove_session_caps on %p\n", session);
iterate_session_caps(session, remove_session_caps_cb, NULL);
iterate_session_caps(session, remove_session_caps_cb, fsc);
spin_lock(&session->s_cap_lock);
if (session->s_nr_caps > 0) {
struct super_block *sb = session->s_mdsc->fsc->sb;
struct inode *inode;
struct ceph_cap *cap, *prev = NULL;
struct ceph_vino vino;
......@@ -1270,13 +1258,13 @@ static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
{
struct ceph_inode_info *ci = ceph_inode(inode);
wake_up_all(&ci->i_cap_wq);
if (arg) {
spin_lock(&ci->i_ceph_lock);
ci->i_wanted_max_size = 0;
ci->i_requested_max_size = 0;
spin_unlock(&ci->i_ceph_lock);
}
wake_up_all(&ci->i_cap_wq);
return 0;
}
......@@ -1671,8 +1659,7 @@ int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
struct ceph_inode_info *ci = ceph_inode(dir);
struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
size_t size = sizeof(*rinfo->dir_in) + sizeof(*rinfo->dir_dname_len) +
sizeof(*rinfo->dir_dname) + sizeof(*rinfo->dir_dlease);
size_t size = sizeof(struct ceph_mds_reply_dir_entry);
int order, num_entries;
spin_lock(&ci->i_ceph_lock);
......@@ -1683,14 +1670,14 @@ int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
order = get_order(size * num_entries);
while (order >= 0) {
rinfo->dir_in = (void*)__get_free_pages(GFP_KERNEL |
__GFP_NOWARN,
order);
if (rinfo->dir_in)
rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
__GFP_NOWARN,
order);
if (rinfo->dir_entries)
break;
order--;
}
if (!rinfo->dir_in)
if (!rinfo->dir_entries)
return -ENOMEM;
num_entries = (PAGE_SIZE << order) / size;
......@@ -1722,6 +1709,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
INIT_LIST_HEAD(&req->r_unsafe_target_item);
req->r_fmode = -1;
kref_init(&req->r_kref);
RB_CLEAR_NODE(&req->r_node);
INIT_LIST_HEAD(&req->r_wait);
init_completion(&req->r_completion);
init_completion(&req->r_safe_completion);
......@@ -2414,7 +2402,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
/* get request, session */
tid = le64_to_cpu(msg->hdr.tid);
mutex_lock(&mdsc->mutex);
req = __lookup_request(mdsc, tid);
req = lookup_get_request(mdsc, tid);
if (!req) {
dout("handle_reply on unknown tid %llu\n", tid);
mutex_unlock(&mdsc->mutex);
......@@ -2604,7 +2592,7 @@ static void handle_forward(struct ceph_mds_client *mdsc,
fwd_seq = ceph_decode_32(&p);
mutex_lock(&mdsc->mutex);
req = __lookup_request(mdsc, tid);
req = lookup_get_request(mdsc, tid);
if (!req) {
dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
goto out; /* dup reply? */
......
......@@ -47,6 +47,14 @@ struct ceph_mds_reply_info_in {
u32 pool_ns_len;
};
struct ceph_mds_reply_dir_entry {
char *name;
u32 name_len;
struct ceph_mds_reply_lease *lease;
struct ceph_mds_reply_info_in inode;
loff_t offset;
};
/*
* parsed info about an mds reply, including information about
* either: 1) the target inode and/or its parent directory and dentry,
......@@ -73,11 +81,10 @@ struct ceph_mds_reply_info_parsed {
struct ceph_mds_reply_dirfrag *dir_dir;
size_t dir_buf_size;
int dir_nr;
char **dir_dname;
u32 *dir_dname_len;
struct ceph_mds_reply_lease **dir_dlease;
struct ceph_mds_reply_info_in *dir_in;
u8 dir_complete, dir_end;
bool dir_complete;
bool dir_end;
bool hash_order;
struct ceph_mds_reply_dir_entry *dir_entries;
};
/* for create results */
......
......@@ -54,16 +54,21 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
const void *start = *p;
int i, j, n;
int err = -EINVAL;
u16 version;
u8 mdsmap_v, mdsmap_cv;
m = kzalloc(sizeof(*m), GFP_NOFS);
if (m == NULL)
return ERR_PTR(-ENOMEM);
ceph_decode_16_safe(p, end, version, bad);
if (version > 3) {
pr_warn("got mdsmap version %d > 3, failing", version);
goto bad;
ceph_decode_need(p, end, 1 + 1, bad);
mdsmap_v = ceph_decode_8(p);
mdsmap_cv = ceph_decode_8(p);
if (mdsmap_v >= 4) {
u32 mdsmap_len;
ceph_decode_32_safe(p, end, mdsmap_len, bad);
if (end < *p + mdsmap_len)
goto bad;
end = *p + mdsmap_len;
}
ceph_decode_need(p, end, 8*sizeof(u32) + sizeof(u64), bad);
......@@ -87,16 +92,29 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
u32 namelen;
s32 mds, inc, state;
u64 state_seq;
u8 infoversion;
u8 info_v;
void *info_end = NULL;
struct ceph_entity_addr addr;
u32 num_export_targets;
void *pexport_targets = NULL;
struct ceph_timespec laggy_since;
struct ceph_mds_info *info;
ceph_decode_need(p, end, sizeof(u64)*2 + 1 + sizeof(u32), bad);
ceph_decode_need(p, end, sizeof(u64) + 1, bad);
global_id = ceph_decode_64(p);
infoversion = ceph_decode_8(p);
info_v= ceph_decode_8(p);
if (info_v >= 4) {
u32 info_len;
u8 info_cv;
ceph_decode_need(p, end, 1 + sizeof(u32), bad);
info_cv = ceph_decode_8(p);
info_len = ceph_decode_32(p);
info_end = *p + info_len;
if (info_end > end)
goto bad;
}
ceph_decode_need(p, end, sizeof(u64) + sizeof(u32), bad);
*p += sizeof(u64);
namelen = ceph_decode_32(p); /* skip mds name */
*p += namelen;
......@@ -115,7 +133,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
*p += sizeof(u32);
ceph_decode_32_safe(p, end, namelen, bad);
*p += namelen;
if (infoversion >= 2) {
if (info_v >= 2) {
ceph_decode_32_safe(p, end, num_export_targets, bad);
pexport_targets = *p;
*p += num_export_targets * sizeof(u32);
......@@ -123,6 +141,12 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
num_export_targets = 0;
}
if (info_end && *p != info_end) {
if (*p > info_end)
goto bad;
*p = info_end;
}
dout("mdsmap_decode %d/%d %lld mds%d.%d %s %s\n",
i+1, n, global_id, mds, inc,
ceph_pr_addr(&addr.in_addr),
......@@ -163,6 +187,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
m->m_cas_pg_pool = ceph_decode_64(p);
/* ok, we don't care about the rest. */
*p = end;
dout("mdsmap_decode success epoch %u\n", m->m_epoch);
return m;
......
......@@ -108,6 +108,7 @@ static int ceph_sync_fs(struct super_block *sb, int wait)
* mount options
*/
enum {
Opt_mds_namespace,
Opt_wsize,
Opt_rsize,
Opt_rasize,
......@@ -143,6 +144,7 @@ enum {
};
static match_table_t fsopt_tokens = {
{Opt_mds_namespace, "mds_namespace=%d"},
{Opt_wsize, "wsize=%d"},
{Opt_rsize, "rsize=%d"},
{Opt_rasize, "rasize=%d"},
......@@ -212,6 +214,9 @@ static int parse_fsopt_token(char *c, void *private)
break;
/* misc */
case Opt_mds_namespace:
fsopt->mds_namespace = intval;
break;
case Opt_wsize:
fsopt->wsize = intval;
break;
......@@ -297,6 +302,7 @@ static void destroy_mount_options(struct ceph_mount_options *args)
{
dout("destroy_mount_options %p\n", args);
kfree(args->snapdir_name);
kfree(args->server_path);
kfree(args);
}
......@@ -328,14 +334,17 @@ static int compare_mount_options(struct ceph_mount_options *new_fsopt,
if (ret)
return ret;
ret = strcmp_null(fsopt1->server_path, fsopt2->server_path);
if (ret)
return ret;
return ceph_compare_options(new_opt, fsc->client);
}
static int parse_mount_options(struct ceph_mount_options **pfsopt,
struct ceph_options **popt,
int flags, char *options,
const char *dev_name,
const char **path)
const char *dev_name)
{
struct ceph_mount_options *fsopt;
const char *dev_name_end;
......@@ -367,6 +376,7 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,
fsopt->max_readdir = CEPH_MAX_READDIR_DEFAULT;
fsopt->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT;
fsopt->congestion_kb = default_congestion_kb();
fsopt->mds_namespace = CEPH_FS_CLUSTER_ID_NONE;
/*
* Distinguish the server list from the path in "dev_name".
......@@ -380,12 +390,13 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,
*/
dev_name_end = strchr(dev_name, '/');
if (dev_name_end) {
/* skip over leading '/' for path */
*path = dev_name_end + 1;
fsopt->server_path = kstrdup(dev_name_end, GFP_KERNEL);
if (!fsopt->server_path) {
err = -ENOMEM;
goto out;
}
} else {
/* path is empty */
dev_name_end = dev_name + strlen(dev_name);
*path = dev_name_end;
}
err = -EINVAL;
dev_name_end--; /* back up to ':' separator */
......@@ -395,7 +406,8 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,
goto out;
}
dout("device name '%.*s'\n", (int)(dev_name_end - dev_name), dev_name);
dout("server path '%s'\n", *path);
if (fsopt->server_path)
dout("server path '%s'\n", fsopt->server_path);
*popt = ceph_parse_options(options, dev_name, dev_name_end,
parse_fsopt_token, (void *)fsopt);
......@@ -457,6 +469,8 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
seq_puts(m, ",noacl");
#endif
if (fsopt->mds_namespace != CEPH_FS_CLUSTER_ID_NONE)
seq_printf(m, ",mds_namespace=%d", fsopt->mds_namespace);
if (fsopt->wsize)
seq_printf(m, ",wsize=%d", fsopt->wsize);
if (fsopt->rsize != CEPH_RSIZE_DEFAULT)
......@@ -511,9 +525,8 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
{
struct ceph_fs_client *fsc;
const u64 supported_features =
CEPH_FEATURE_FLOCK |
CEPH_FEATURE_DIRLAYOUTHASH |
CEPH_FEATURE_MDS_INLINE_DATA;
CEPH_FEATURE_FLOCK | CEPH_FEATURE_DIRLAYOUTHASH |
CEPH_FEATURE_MDSENC | CEPH_FEATURE_MDS_INLINE_DATA;
const u64 required_features = 0;
int page_count;
size_t size;
......@@ -530,6 +543,7 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
goto fail;
}
fsc->client->extra_mon_dispatch = extra_mon_dispatch;
fsc->client->monc.fs_cluster_id = fsopt->mds_namespace;
ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 0, true);
fsc->mount_options = fsopt;
......@@ -785,8 +799,7 @@ static struct dentry *open_root_dentry(struct ceph_fs_client *fsc,
/*
* mount: join the ceph cluster, and open root directory.
*/
static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc,
const char *path)
static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc)
{
int err;
unsigned long started = jiffies; /* note the start time */
......@@ -815,11 +828,12 @@ static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc,
goto fail;
}
if (path[0] == 0) {
if (!fsc->mount_options->server_path) {
root = fsc->sb->s_root;
dget(root);
} else {
dout("mount opening base mountpoint\n");
const char *path = fsc->mount_options->server_path + 1;
dout("mount opening path %s\n", path);
root = open_root_dentry(fsc, path, started);
if (IS_ERR(root)) {
err = PTR_ERR(root);
......@@ -935,7 +949,6 @@ static struct dentry *ceph_mount(struct file_system_type *fs_type,
struct dentry *res;
int err;
int (*compare_super)(struct super_block *, void *) = ceph_compare_super;
const char *path = NULL;
struct ceph_mount_options *fsopt = NULL;
struct ceph_options *opt = NULL;
......@@ -944,7 +957,7 @@ static struct dentry *ceph_mount(struct file_system_type *fs_type,
#ifdef CONFIG_CEPH_FS_POSIX_ACL
flags |= MS_POSIXACL;
#endif
err = parse_mount_options(&fsopt, &opt, flags, data, dev_name, &path);
err = parse_mount_options(&fsopt, &opt, flags, data, dev_name);
if (err < 0) {
res = ERR_PTR(err);
goto out_final;
......@@ -987,7 +1000,7 @@ static struct dentry *ceph_mount(struct file_system_type *fs_type,
}
}
res = ceph_real_mount(fsc, path);
res = ceph_real_mount(fsc);
if (IS_ERR(res))
goto out_splat;
dout("root %p inode %p ino %llx.%llx\n", res,
......
......@@ -62,6 +62,7 @@ struct ceph_mount_options {
int cap_release_safety;
int max_readdir; /* max readdir result (entires) */
int max_readdir_bytes; /* max readdir result (bytes) */
int mds_namespace;
/*
* everything above this point can be memcmp'd; everything below
......@@ -69,6 +70,7 @@ struct ceph_mount_options {
*/
char *snapdir_name; /* default ".snap" */
char *server_path; /* default "/" */
};
struct ceph_fs_client {
......@@ -295,6 +297,7 @@ struct ceph_inode_info {
u64 i_files, i_subdirs;
struct rb_root i_fragtree;
int i_fragtree_nsplits;
struct mutex i_fragtree_mutex;
struct ceph_inode_xattrs_info i_xattrs;
......@@ -469,6 +472,7 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
#define CEPH_I_POOL_RD (1 << 5) /* can read from pool */
#define CEPH_I_POOL_WR (1 << 6) /* can write to pool */
#define CEPH_I_SEC_INITED (1 << 7) /* security initialized */
#define CEPH_I_CAP_DROPPED (1 << 8) /* caps were forcibly dropped */
static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
long long release_count,
......@@ -537,11 +541,6 @@ static inline struct ceph_dentry_info *ceph_dentry(struct dentry *dentry)
return (struct ceph_dentry_info *)dentry->d_fsdata;
}
static inline loff_t ceph_make_fpos(unsigned frag, unsigned off)
{
return ((loff_t)frag << 32) | (loff_t)off;
}
/*
* caps helpers
*/
......@@ -632,7 +631,6 @@ struct ceph_file_info {
struct ceph_mds_request *last_readdir;
/* readdir: position within a frag */
unsigned offset; /* offset of last chunk, adjusted for . and .. */
unsigned next_offset; /* offset of next chunk (last_name's + 1) */
char *last_name; /* last entry in previous chunk */
long long dir_release_count;
......@@ -927,6 +925,7 @@ extern void ceph_pool_perm_destroy(struct ceph_mds_client* mdsc);
/* file.c */
extern const struct file_operations ceph_file_fops;
extern int ceph_renew_caps(struct inode *inode);
extern int ceph_open(struct inode *inode, struct file *file);
extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
struct file *file, unsigned flags, umode_t mode,
......@@ -942,6 +941,7 @@ extern const struct inode_operations ceph_snapdir_iops;
extern const struct dentry_operations ceph_dentry_ops, ceph_snap_dentry_ops,
ceph_snapdir_dentry_ops;
extern loff_t ceph_make_fpos(unsigned high, unsigned off, bool hash_order);
extern int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry);
extern int ceph_handle_snapdir(struct ceph_mds_request *req,
struct dentry *dentry, int err);
......
......@@ -77,7 +77,7 @@ static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val,
char buf[128];
dout("ceph_vxattrcb_layout %p\n", &ci->vfs_inode);
down_read(&osdc->map_sem);
down_read(&osdc->lock);
pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool);
if (pool_name) {
size_t len = strlen(pool_name);
......@@ -109,7 +109,7 @@ static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val,
ret = -ERANGE;
}
}
up_read(&osdc->map_sem);
up_read(&osdc->lock);
return ret;
}
......@@ -143,13 +143,13 @@ static size_t ceph_vxattrcb_layout_pool(struct ceph_inode_info *ci,
s64 pool = ceph_file_layout_pg_pool(ci->i_layout);
const char *pool_name;
down_read(&osdc->map_sem);
down_read(&osdc->lock);
pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool);
if (pool_name)
ret = snprintf(val, size, "%s", pool_name);
else
ret = snprintf(val, size, "%lld", (unsigned long long)pool);
up_read(&osdc->map_sem);
up_read(&osdc->lock);
return ret;
}
......@@ -862,6 +862,7 @@ static int ceph_sync_setxattr(struct inode *inode, const char *name,
struct ceph_mds_request *req;
struct ceph_mds_client *mdsc = fsc->mdsc;
struct ceph_pagelist *pagelist = NULL;
int op = CEPH_MDS_OP_SETXATTR;
int err;
if (size > 0) {
......@@ -875,20 +876,21 @@ static int ceph_sync_setxattr(struct inode *inode, const char *name,
if (err)
goto out;
} else if (!value) {
flags |= CEPH_XATTR_REMOVE;
if (flags & CEPH_XATTR_REPLACE)
op = CEPH_MDS_OP_RMXATTR;
else
flags |= CEPH_XATTR_REMOVE;
}
dout("setxattr value=%.*s\n", (int)size, value);
/* do request */
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETXATTR,
USE_AUTH_MDS);
req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
if (IS_ERR(req)) {
err = PTR_ERR(req);
goto out;
}
req->r_args.setxattr.flags = cpu_to_le32(flags);
req->r_path2 = kstrdup(name, GFP_NOFS);
if (!req->r_path2) {
ceph_mdsc_put_request(req);
......@@ -896,8 +898,11 @@ static int ceph_sync_setxattr(struct inode *inode, const char *name,
goto out;
}
req->r_pagelist = pagelist;
pagelist = NULL;
if (op == CEPH_MDS_OP_SETXATTR) {
req->r_args.setxattr.flags = cpu_to_le32(flags);
req->r_pagelist = pagelist;
pagelist = NULL;
}
req->r_inode = inode;
ihold(inode);
......
......@@ -51,11 +51,11 @@ static inline __u32 ceph_frag_make_child(__u32 f, int by, int i)
return ceph_frag_make(newbits,
ceph_frag_value(f) | (i << (24 - newbits)));
}
static inline int ceph_frag_is_leftmost(__u32 f)
static inline bool ceph_frag_is_leftmost(__u32 f)
{
return ceph_frag_value(f) == 0;
}
static inline int ceph_frag_is_rightmost(__u32 f)
static inline bool ceph_frag_is_rightmost(__u32 f)
{
return ceph_frag_value(f) == ceph_frag_mask(f);
}
......
......@@ -153,8 +153,9 @@ struct ceph_dir_layout {
/* watch-notify operations */
enum {
WATCH_NOTIFY = 1, /* notifying watcher */
WATCH_NOTIFY_COMPLETE = 2, /* notifier notified when done */
CEPH_WATCH_EVENT_NOTIFY = 1, /* notifying watcher */
CEPH_WATCH_EVENT_NOTIFY_COMPLETE = 2, /* notifier notified when done */
CEPH_WATCH_EVENT_DISCONNECT = 3, /* we were disconnected */
};
......@@ -207,6 +208,8 @@ struct ceph_mon_subscribe_ack {
struct ceph_fsid fsid;
} __attribute__ ((packed));
#define CEPH_FS_CLUSTER_ID_NONE -1
/*
* mdsmap flags
*/
......@@ -344,6 +347,18 @@ extern const char *ceph_mds_op_name(int op);
#define CEPH_XATTR_REPLACE (1 << 1)
#define CEPH_XATTR_REMOVE (1 << 31)
/*
* readdir request flags;
*/
#define CEPH_READDIR_REPLY_BITFLAGS (1<<0)
/*
* readdir reply flags.
*/
#define CEPH_READDIR_FRAG_END (1<<0)
#define CEPH_READDIR_FRAG_COMPLETE (1<<8)
#define CEPH_READDIR_HASH_ORDER (1<<9)
union ceph_mds_request_args {
struct {
__le32 mask; /* CEPH_CAP_* */
......@@ -361,6 +376,7 @@ union ceph_mds_request_args {
__le32 frag; /* which dir fragment */
__le32 max_entries; /* how many dentries to grab */
__le32 max_bytes;
__le16 flags;
} __attribute__ ((packed)) readdir;
struct {
__le32 mode;
......
......@@ -47,7 +47,7 @@ static inline void ceph_decode_copy(void **p, void *pv, size_t n)
/*
* bounds check input.
*/
static inline int ceph_has_room(void **p, void *end, size_t n)
static inline bool ceph_has_room(void **p, void *end, size_t n)
{
return end >= *p && n <= end - *p;
}
......
......@@ -180,6 +180,63 @@ static inline int calc_pages_for(u64 off, u64 len)
(off >> PAGE_SHIFT);
}
/*
* These are not meant to be generic - an integer key is assumed.
*/
#define DEFINE_RB_INSDEL_FUNCS(name, type, keyfld, nodefld) \
static void insert_##name(struct rb_root *root, type *t) \
{ \
struct rb_node **n = &root->rb_node; \
struct rb_node *parent = NULL; \
\
BUG_ON(!RB_EMPTY_NODE(&t->nodefld)); \
\
while (*n) { \
type *cur = rb_entry(*n, type, nodefld); \
\
parent = *n; \
if (t->keyfld < cur->keyfld) \
n = &(*n)->rb_left; \
else if (t->keyfld > cur->keyfld) \
n = &(*n)->rb_right; \
else \
BUG(); \
} \
\
rb_link_node(&t->nodefld, parent, n); \
rb_insert_color(&t->nodefld, root); \
} \
static void erase_##name(struct rb_root *root, type *t) \
{ \
BUG_ON(RB_EMPTY_NODE(&t->nodefld)); \
rb_erase(&t->nodefld, root); \
RB_CLEAR_NODE(&t->nodefld); \
}
#define DEFINE_RB_LOOKUP_FUNC(name, type, keyfld, nodefld) \
static type *lookup_##name(struct rb_root *root, \
typeof(((type *)0)->keyfld) key) \
{ \
struct rb_node *n = root->rb_node; \
\
while (n) { \
type *cur = rb_entry(n, type, nodefld); \
\
if (key < cur->keyfld) \
n = n->rb_left; \
else if (key > cur->keyfld) \
n = n->rb_right; \
else \
return cur; \
} \
\
return NULL; \
}
#define DEFINE_RB_FUNCS(name, type, keyfld, nodefld) \
DEFINE_RB_INSDEL_FUNCS(name, type, keyfld, nodefld) \
DEFINE_RB_LOOKUP_FUNC(name, type, keyfld, nodefld)
extern struct kmem_cache *ceph_inode_cachep;
extern struct kmem_cache *ceph_cap_cachep;
extern struct kmem_cache *ceph_cap_flush_cachep;
......
......@@ -39,20 +39,31 @@ struct ceph_mon_request {
ceph_monc_request_func_t do_request;
};
typedef void (*ceph_monc_callback_t)(struct ceph_mon_generic_request *);
/*
* ceph_mon_generic_request is being used for the statfs and
* mon_get_version requests which are being done a bit differently
* because we need to get data back to the caller
*/
struct ceph_mon_generic_request {
struct ceph_mon_client *monc;
struct kref kref;
u64 tid;
struct rb_node node;
int result;
void *buf;
struct completion completion;
ceph_monc_callback_t complete_cb;
u64 private_data; /* r_tid/linger_id */
struct ceph_msg *request; /* original request */
struct ceph_msg *reply; /* and reply */
union {
struct ceph_statfs *st;
u64 newest;
} u;
};
struct ceph_mon_client {
......@@ -77,7 +88,6 @@ struct ceph_mon_client {
/* pending generic requests */
struct rb_root generic_request_tree;
int num_generic_requests;
u64 last_tid;
/* subs, indexed with CEPH_SUB_* */
......@@ -86,6 +96,7 @@ struct ceph_mon_client {
bool want;
u32 have; /* epoch */
} subs[3];
int fs_cluster_id; /* "mdsmap.<id>" sub */
#ifdef CONFIG_DEBUG_FS
struct dentry *debugfs_file;
......@@ -116,16 +127,18 @@ extern const char *ceph_sub_str[];
bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
bool continuous);
void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch);
void ceph_monc_renew_subs(struct ceph_mon_client *monc);
extern void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc);
extern int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
unsigned long timeout);
extern int ceph_monc_do_statfs(struct ceph_mon_client *monc,
struct ceph_statfs *buf);
extern int ceph_monc_do_get_version(struct ceph_mon_client *monc,
const char *what, u64 *newest);
int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
u64 *newest);
int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what,
ceph_monc_callback_t cb, u64 private_data);
extern int ceph_monc_open_session(struct ceph_mon_client *monc);
......
This diff is collapsed.
......@@ -24,21 +24,29 @@ struct ceph_pg {
uint32_t seed;
};
#define CEPH_POOL_FLAG_HASHPSPOOL 1
int ceph_pg_compare(const struct ceph_pg *lhs, const struct ceph_pg *rhs);
#define CEPH_POOL_FLAG_HASHPSPOOL (1ULL << 0) /* hash pg seed and pool id
together */
#define CEPH_POOL_FLAG_FULL (1ULL << 1) /* pool is full */
struct ceph_pg_pool_info {
struct rb_node node;
s64 id;
u8 type;
u8 type; /* CEPH_POOL_TYPE_* */
u8 size;
u8 min_size;
u8 crush_ruleset;
u8 object_hash;
u32 last_force_request_resend;
u32 pg_num, pgp_num;
int pg_num_mask, pgp_num_mask;
s64 read_tier;
s64 write_tier; /* wins for read+write ops */
u64 flags;
u64 flags; /* CEPH_POOL_FLAG_* */
char *name;
bool was_full; /* for handle_one_map() */
};
static inline bool ceph_can_shift_osds(struct ceph_pg_pool_info *pool)
......@@ -57,6 +65,22 @@ struct ceph_object_locator {
s64 pool;
};
static inline void ceph_oloc_init(struct ceph_object_locator *oloc)
{
oloc->pool = -1;
}
static inline bool ceph_oloc_empty(const struct ceph_object_locator *oloc)
{
return oloc->pool == -1;
}
static inline void ceph_oloc_copy(struct ceph_object_locator *dest,
const struct ceph_object_locator *src)
{
dest->pool = src->pool;
}
/*
* Maximum supported by kernel client object name length
*
......@@ -64,11 +88,47 @@ struct ceph_object_locator {
*/
#define CEPH_MAX_OID_NAME_LEN 100
/*
* 51-char inline_name is long enough for all cephfs and all but one
* rbd requests: <imgname> in "<imgname>.rbd"/"rbd_id.<imgname>" can be
* arbitrarily long (~PAGE_SIZE). It's done once during rbd map; all
* other rbd requests fit into inline_name.
*
* Makes ceph_object_id 64 bytes on 64-bit.
*/
#define CEPH_OID_INLINE_LEN 52
/*
* Both inline and external buffers have space for a NUL-terminator,
* which is carried around. It's not required though - RADOS object
* names don't have to be NUL-terminated and may contain NULs.
*/
struct ceph_object_id {
char name[CEPH_MAX_OID_NAME_LEN];
char *name;
char inline_name[CEPH_OID_INLINE_LEN];
int name_len;
};
static inline void ceph_oid_init(struct ceph_object_id *oid)
{
oid->name = oid->inline_name;
oid->name_len = 0;
}
static inline bool ceph_oid_empty(const struct ceph_object_id *oid)
{
return oid->name == oid->inline_name && !oid->name_len;
}
void ceph_oid_copy(struct ceph_object_id *dest,
const struct ceph_object_id *src);
__printf(2, 3)
void ceph_oid_printf(struct ceph_object_id *oid, const char *fmt, ...);
__printf(3, 4)
int ceph_oid_aprintf(struct ceph_object_id *oid, gfp_t gfp,
const char *fmt, ...);
void ceph_oid_destroy(struct ceph_object_id *oid);
struct ceph_pg_mapping {
struct rb_node node;
struct ceph_pg pgid;
......@@ -87,7 +147,6 @@ struct ceph_pg_mapping {
struct ceph_osdmap {
struct ceph_fsid fsid;
u32 epoch;
u32 mkfs_epoch;
struct ceph_timespec created, modified;
u32 flags; /* CEPH_OSDMAP_* */
......@@ -113,43 +172,19 @@ struct ceph_osdmap {
int crush_scratch_ary[CEPH_PG_MAX_SIZE * 3];
};
static inline void ceph_oid_set_name(struct ceph_object_id *oid,
const char *name)
{
int len;
len = strlen(name);
if (len > sizeof(oid->name)) {
WARN(1, "ceph_oid_set_name '%s' len %d vs %zu, truncating\n",
name, len, sizeof(oid->name));
len = sizeof(oid->name);
}
memcpy(oid->name, name, len);
oid->name_len = len;
}
static inline void ceph_oid_copy(struct ceph_object_id *dest,
struct ceph_object_id *src)
{
BUG_ON(src->name_len > sizeof(dest->name));
memcpy(dest->name, src->name, src->name_len);
dest->name_len = src->name_len;
}
static inline int ceph_osd_exists(struct ceph_osdmap *map, int osd)
static inline bool ceph_osd_exists(struct ceph_osdmap *map, int osd)
{
return osd >= 0 && osd < map->max_osd &&
(map->osd_state[osd] & CEPH_OSD_EXISTS);
}
static inline int ceph_osd_is_up(struct ceph_osdmap *map, int osd)
static inline bool ceph_osd_is_up(struct ceph_osdmap *map, int osd)
{
return ceph_osd_exists(map, osd) &&
(map->osd_state[osd] & CEPH_OSD_UP);
}
static inline int ceph_osd_is_down(struct ceph_osdmap *map, int osd)
static inline bool ceph_osd_is_down(struct ceph_osdmap *map, int osd)
{
return !ceph_osd_is_up(map, osd);
}
......@@ -192,28 +227,59 @@ static inline int ceph_decode_pgid(void **p, void *end, struct ceph_pg *pgid)
return 0;
}
struct ceph_osdmap *ceph_osdmap_alloc(void);
extern struct ceph_osdmap *ceph_osdmap_decode(void **p, void *end);
extern struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
struct ceph_osdmap *map,
struct ceph_messenger *msgr);
struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
struct ceph_osdmap *map);
extern void ceph_osdmap_destroy(struct ceph_osdmap *map);
struct ceph_osds {
int osds[CEPH_PG_MAX_SIZE];
int size;
int primary; /* id, NOT index */
};
static inline void ceph_osds_init(struct ceph_osds *set)
{
set->size = 0;
set->primary = -1;
}
void ceph_osds_copy(struct ceph_osds *dest, const struct ceph_osds *src);
bool ceph_is_new_interval(const struct ceph_osds *old_acting,
const struct ceph_osds *new_acting,
const struct ceph_osds *old_up,
const struct ceph_osds *new_up,
int old_size,
int new_size,
int old_min_size,
int new_min_size,
u32 old_pg_num,
u32 new_pg_num,
bool old_sort_bitwise,
bool new_sort_bitwise,
const struct ceph_pg *pgid);
bool ceph_osds_changed(const struct ceph_osds *old_acting,
const struct ceph_osds *new_acting,
bool any_change);
/* calculate mapping of a file extent to an object */
extern int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
u64 off, u64 len,
u64 *bno, u64 *oxoff, u64 *oxlen);
/* calculate mapping of object to a placement group */
extern int ceph_oloc_oid_to_pg(struct ceph_osdmap *osdmap,
struct ceph_object_locator *oloc,
struct ceph_object_id *oid,
struct ceph_pg *pg_out);
extern int ceph_calc_pg_acting(struct ceph_osdmap *osdmap,
struct ceph_pg pgid,
int *osds, int *primary);
extern int ceph_calc_pg_primary(struct ceph_osdmap *osdmap,
struct ceph_pg pgid);
int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
struct ceph_object_id *oid,
struct ceph_object_locator *oloc,
struct ceph_pg *raw_pgid);
void ceph_pg_to_up_acting_osds(struct ceph_osdmap *osdmap,
const struct ceph_pg *raw_pgid,
struct ceph_osds *up,
struct ceph_osds *acting);
int ceph_pg_to_acting_primary(struct ceph_osdmap *osdmap,
const struct ceph_pg *raw_pgid);
extern struct ceph_pg_pool_info *ceph_pg_pool_by_id(struct ceph_osdmap *map,
u64 id);
......
......@@ -114,8 +114,8 @@ struct ceph_object_layout {
* compound epoch+version, used by storage layer to serialize mutations
*/
struct ceph_eversion {
__le32 epoch;
__le64 version;
__le32 epoch;
} __attribute__ ((packed));
/*
......@@ -153,6 +153,11 @@ extern const char *ceph_osd_state_name(int s);
#define CEPH_OSDMAP_NOIN (1<<8) /* block osd auto mark-in */
#define CEPH_OSDMAP_NOBACKFILL (1<<9) /* block osd backfill */
#define CEPH_OSDMAP_NORECOVER (1<<10) /* block osd recovery and backfill */
#define CEPH_OSDMAP_NOSCRUB (1<<11) /* block periodic scrub */
#define CEPH_OSDMAP_NODEEP_SCRUB (1<<12) /* block periodic deep-scrub */
#define CEPH_OSDMAP_NOTIERAGENT (1<<13) /* disable tiering agent */
#define CEPH_OSDMAP_NOREBALANCE (1<<14) /* block osd backfill unless pg is degraded */
#define CEPH_OSDMAP_SORTBITWISE (1<<15) /* use bitwise hobject_t sort */
/*
* The error code to return when an OSD can't handle a write
......@@ -389,6 +394,13 @@ enum {
CEPH_OSD_FLAG_SKIPRWLOCKS = 0x10000, /* skip rw locks */
CEPH_OSD_FLAG_IGNORE_OVERLAY = 0x20000, /* ignore pool overlay */
CEPH_OSD_FLAG_FLUSH = 0x40000, /* this is part of flush */
CEPH_OSD_FLAG_MAP_SNAP_CLONE = 0x80000, /* map snap direct to clone id */
CEPH_OSD_FLAG_ENFORCE_SNAPC = 0x100000, /* use snapc provided even if
pool uses pool snaps */
CEPH_OSD_FLAG_REDIRECTED = 0x200000, /* op has been redirected */
CEPH_OSD_FLAG_KNOWN_REDIR = 0x400000, /* redirect bit is authoritative */
CEPH_OSD_FLAG_FULL_TRY = 0x800000, /* try op despite full flag */
CEPH_OSD_FLAG_FULL_FORCE = 0x1000000, /* force op despite full flag */
};
enum {
......@@ -415,7 +427,17 @@ enum {
CEPH_OSD_CMPXATTR_MODE_U64 = 2
};
#define RADOS_NOTIFY_VER 1
enum {
CEPH_OSD_WATCH_OP_UNWATCH = 0,
CEPH_OSD_WATCH_OP_LEGACY_WATCH = 1,
/* note: use only ODD ids to prevent pre-giant code from
interpreting the op as UNWATCH */
CEPH_OSD_WATCH_OP_WATCH = 3,
CEPH_OSD_WATCH_OP_RECONNECT = 5,
CEPH_OSD_WATCH_OP_PING = 7,
};
const char *ceph_osd_watch_op_name(int o);
/*
* an individual object operation. each may be accompanied by some data
......@@ -450,9 +472,13 @@ struct ceph_osd_op {
} __attribute__ ((packed)) snap;
struct {
__le64 cookie;
__le64 ver;
__u8 flag; /* 0 = unwatch, 1 = watch */
__le64 ver; /* no longer used */
__u8 op; /* CEPH_OSD_WATCH_OP_* */
__le32 gen; /* registration generation */
} __attribute__ ((packed)) watch;
struct {
__le64 cookie;
} __attribute__ ((packed)) notify;
struct {
__le64 offset, length;
__le64 src_offset;
......
......@@ -651,7 +651,7 @@ EXPORT_SYMBOL(ceph_destroy_client);
/*
* true if we have the mon map (and have thus joined the cluster)
*/
static int have_mon_and_osd_map(struct ceph_client *client)
static bool have_mon_and_osd_map(struct ceph_client *client)
{
return client->monc.monmap && client->monc.monmap->epoch &&
client->osdc.osdmap && client->osdc.osdmap->epoch;
......
......@@ -27,6 +27,22 @@ __CEPH_FORALL_OSD_OPS(GENERATE_CASE)
}
}
const char *ceph_osd_watch_op_name(int o)
{
switch (o) {
case CEPH_OSD_WATCH_OP_UNWATCH:
return "unwatch";
case CEPH_OSD_WATCH_OP_WATCH:
return "watch";
case CEPH_OSD_WATCH_OP_RECONNECT:
return "reconnect";
case CEPH_OSD_WATCH_OP_PING:
return "ping";
default:
return "???";
}
}
const char *ceph_osd_state_name(int s)
{
switch (s) {
......
......@@ -54,24 +54,25 @@ static int osdmap_show(struct seq_file *s, void *p)
{
int i;
struct ceph_client *client = s->private;
struct ceph_osdmap *map = client->osdc.osdmap;
struct ceph_osd_client *osdc = &client->osdc;
struct ceph_osdmap *map = osdc->osdmap;
struct rb_node *n;
if (map == NULL)
return 0;
seq_printf(s, "epoch %d\n", map->epoch);
seq_printf(s, "flags%s%s\n",
(map->flags & CEPH_OSDMAP_NEARFULL) ? " NEARFULL" : "",
(map->flags & CEPH_OSDMAP_FULL) ? " FULL" : "");
down_read(&osdc->lock);
seq_printf(s, "epoch %d flags 0x%x\n", map->epoch, map->flags);
for (n = rb_first(&map->pg_pools); n; n = rb_next(n)) {
struct ceph_pg_pool_info *pool =
struct ceph_pg_pool_info *pi =
rb_entry(n, struct ceph_pg_pool_info, node);
seq_printf(s, "pool %lld pg_num %u (%d) read_tier %lld write_tier %lld\n",
pool->id, pool->pg_num, pool->pg_num_mask,
pool->read_tier, pool->write_tier);
seq_printf(s, "pool %lld '%s' type %d size %d min_size %d pg_num %u pg_num_mask %d flags 0x%llx lfor %u read_tier %lld write_tier %lld\n",
pi->id, pi->name, pi->type, pi->size, pi->min_size,
pi->pg_num, pi->pg_num_mask, pi->flags,
pi->last_force_request_resend, pi->read_tier,
pi->write_tier);
}
for (i = 0; i < map->max_osd; i++) {
struct ceph_entity_addr *addr = &map->osd_addr[i];
......@@ -103,6 +104,7 @@ static int osdmap_show(struct seq_file *s, void *p)
pg->pgid.seed, pg->primary_temp.osd);
}
up_read(&osdc->lock);
return 0;
}
......@@ -126,6 +128,7 @@ static int monc_show(struct seq_file *s, void *p)
CEPH_SUBSCRIBE_ONETIME ? "" : "+"));
seq_putc(s, '\n');
}
seq_printf(s, "fs_cluster_id %d\n", monc->fs_cluster_id);
for (rp = rb_first(&monc->generic_request_tree); rp; rp = rb_next(rp)) {
__u16 op;
......@@ -143,43 +146,113 @@ static int monc_show(struct seq_file *s, void *p)
return 0;
}
static int osdc_show(struct seq_file *s, void *pp)
static void dump_target(struct seq_file *s, struct ceph_osd_request_target *t)
{
struct ceph_client *client = s->private;
struct ceph_osd_client *osdc = &client->osdc;
struct rb_node *p;
int i;
mutex_lock(&osdc->request_mutex);
for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
struct ceph_osd_request *req;
unsigned int i;
int opcode;
seq_printf(s, "osd%d\t%llu.%x\t[", t->osd, t->pgid.pool, t->pgid.seed);
for (i = 0; i < t->up.size; i++)
seq_printf(s, "%s%d", (!i ? "" : ","), t->up.osds[i]);
seq_printf(s, "]/%d\t[", t->up.primary);
for (i = 0; i < t->acting.size; i++)
seq_printf(s, "%s%d", (!i ? "" : ","), t->acting.osds[i]);
seq_printf(s, "]/%d\t%*pE\t0x%x", t->acting.primary,
t->target_oid.name_len, t->target_oid.name, t->flags);
if (t->paused)
seq_puts(s, "\tP");
}
req = rb_entry(p, struct ceph_osd_request, r_node);
static void dump_request(struct seq_file *s, struct ceph_osd_request *req)
{
int i;
seq_printf(s, "%lld\tosd%d\t%lld.%x\t", req->r_tid,
req->r_osd ? req->r_osd->o_osd : -1,
req->r_pgid.pool, req->r_pgid.seed);
seq_printf(s, "%llu\t", req->r_tid);
dump_target(s, &req->r_t);
seq_printf(s, "%.*s", req->r_base_oid.name_len,
req->r_base_oid.name);
seq_printf(s, "\t%d\t%u'%llu", req->r_attempts,
le32_to_cpu(req->r_replay_version.epoch),
le64_to_cpu(req->r_replay_version.version));
if (req->r_reassert_version.epoch)
seq_printf(s, "\t%u'%llu",
(unsigned int)le32_to_cpu(req->r_reassert_version.epoch),
le64_to_cpu(req->r_reassert_version.version));
else
seq_printf(s, "\t");
for (i = 0; i < req->r_num_ops; i++) {
struct ceph_osd_req_op *op = &req->r_ops[i];
seq_printf(s, "%s%s", (i == 0 ? "\t" : ","),
ceph_osd_op_name(op->op));
if (op->op == CEPH_OSD_OP_WATCH)
seq_printf(s, "-%s",
ceph_osd_watch_op_name(op->watch.op));
}
seq_putc(s, '\n');
}
static void dump_requests(struct seq_file *s, struct ceph_osd *osd)
{
struct rb_node *n;
mutex_lock(&osd->lock);
for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
struct ceph_osd_request *req =
rb_entry(n, struct ceph_osd_request, r_node);
dump_request(s, req);
}
mutex_unlock(&osd->lock);
}
for (i = 0; i < req->r_num_ops; i++) {
opcode = req->r_ops[i].op;
seq_printf(s, "%s%s", (i == 0 ? "\t" : ","),
ceph_osd_op_name(opcode));
}
static void dump_linger_request(struct seq_file *s,
struct ceph_osd_linger_request *lreq)
{
seq_printf(s, "%llu\t", lreq->linger_id);
dump_target(s, &lreq->t);
seq_printf(s, "\t%u\t%s%s/%d\n", lreq->register_gen,
lreq->is_watch ? "W" : "N", lreq->committed ? "C" : "",
lreq->last_error);
}
static void dump_linger_requests(struct seq_file *s, struct ceph_osd *osd)
{
struct rb_node *n;
mutex_lock(&osd->lock);
for (n = rb_first(&osd->o_linger_requests); n; n = rb_next(n)) {
struct ceph_osd_linger_request *lreq =
rb_entry(n, struct ceph_osd_linger_request, node);
dump_linger_request(s, lreq);
}
mutex_unlock(&osd->lock);
}
seq_printf(s, "\n");
static int osdc_show(struct seq_file *s, void *pp)
{
struct ceph_client *client = s->private;
struct ceph_osd_client *osdc = &client->osdc;
struct rb_node *n;
down_read(&osdc->lock);
seq_printf(s, "REQUESTS %d homeless %d\n",
atomic_read(&osdc->num_requests),
atomic_read(&osdc->num_homeless));
for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
dump_requests(s, osd);
}
mutex_unlock(&osdc->request_mutex);
dump_requests(s, &osdc->homeless_osd);
seq_puts(s, "LINGER REQUESTS\n");
for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
dump_linger_requests(s, osd);
}
dump_linger_requests(s, &osdc->homeless_osd);
up_read(&osdc->lock);
return 0;
}
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment