Commit 72b5ac54 authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'ceph-for-4.8-rc1' of git://github.com/ceph/ceph-client

Pull Ceph updates from Ilya Dryomov:
 "The highlights are:

   - RADOS namespace support in libceph and CephFS (Zheng Yan and
     myself).  The stopgaps added in 4.5 to deny access to inodes in
     namespaces are removed and CEPH_FEATURE_FS_FILE_LAYOUT_V2 feature
     bit is now fully supported

   - A large rework of the MDS cap flushing code (Zheng Yan)

   - Handle some of ->d_revalidate() in RCU mode (Jeff Layton).  We were
     overly pessimistic before, bailing at the first sight of LOOKUP_RCU

  On top of that we've got a few CephFS bug fixes, a couple of cleanups
  and Arnd's workaround for a weird genksyms issue"

* tag 'ceph-for-4.8-rc1' of git://github.com/ceph/ceph-client: (34 commits)
  ceph: fix symbol versioning for ceph_monc_do_statfs
  ceph: Correctly return NXIO errors from ceph_llseek
  ceph: Mark the file cache as unreclaimable
  ceph: optimize cap flush waiting
  ceph: cleanup ceph_flush_snaps()
  ceph: kick cap flushes before sending other cap message
  ceph: introduce an inode flag to indicates if snapflush is needed
  ceph: avoid sending duplicated cap flush message
  ceph: unify cap flush and snapcap flush
  ceph: use list instead of rbtree to track cap flushes
  ceph: update types of some local varibles
  ceph: include 'follows' of pending snapflush in cap reconnect message
  ceph: update cap reconnect message to version 3
  ceph: mount non-default filesystem by name
  libceph: fsmap.user subscription support
  ceph: handle LOOKUP_RCU in ceph_d_revalidate
  ceph: allow dentry_lease_is_valid to work under RCU walk
  ceph: clear d_fsinfo pointer under d_lock
  ceph: remove ceph_mdsc_lease_release
  ceph: don't use ->d_time
  ...
parents c7fac299 a0f2b652
......@@ -1937,7 +1937,7 @@ static struct ceph_osd_request *rbd_osd_req_create(
osd_req->r_callback = rbd_osd_req_callback;
osd_req->r_priv = obj_request;
osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
osd_req->r_base_oloc.pool = rbd_dev->layout.pool_id;
if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s",
obj_request->object_name))
goto fail;
......@@ -1991,7 +1991,7 @@ rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
osd_req->r_callback = rbd_osd_req_callback;
osd_req->r_priv = obj_request;
osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
osd_req->r_base_oloc.pool = rbd_dev->layout.pool_id;
if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s",
obj_request->object_name))
goto fail;
......@@ -3995,10 +3995,11 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
/* Initialize the layout used for all rbd requests */
rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
rbd_dev->layout.stripe_unit = 1 << RBD_MAX_OBJ_ORDER;
rbd_dev->layout.stripe_count = 1;
rbd_dev->layout.object_size = 1 << RBD_MAX_OBJ_ORDER;
rbd_dev->layout.pool_id = spec->pool_id;
RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
/*
* If this is a mapping rbd_dev (as opposed to a parent one),
......@@ -5187,7 +5188,7 @@ static int rbd_dev_header_name(struct rbd_device *rbd_dev)
rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
rbd_dev->header_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
rbd_dev->header_oloc.pool = rbd_dev->layout.pool_id;
if (rbd_dev->image_format == 1)
ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
spec->image_name, RBD_SUFFIX);
......
......@@ -1730,7 +1730,8 @@ enum {
POOL_WRITE = 2,
};
static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
static int __ceph_pool_perm_get(struct ceph_inode_info *ci,
s64 pool, struct ceph_string *pool_ns)
{
struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode);
struct ceph_mds_client *mdsc = fsc->mdsc;
......@@ -1738,6 +1739,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
struct rb_node **p, *parent;
struct ceph_pool_perm *perm;
struct page **pages;
size_t pool_ns_len;
int err = 0, err2 = 0, have = 0;
down_read(&mdsc->pool_perm_rwsem);
......@@ -1749,17 +1751,31 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
else if (pool > perm->pool)
p = &(*p)->rb_right;
else {
have = perm->perm;
break;
int ret = ceph_compare_string(pool_ns,
perm->pool_ns,
perm->pool_ns_len);
if (ret < 0)
p = &(*p)->rb_left;
else if (ret > 0)
p = &(*p)->rb_right;
else {
have = perm->perm;
break;
}
}
}
up_read(&mdsc->pool_perm_rwsem);
if (*p)
goto out;
dout("__ceph_pool_perm_get pool %u no perm cached\n", pool);
if (pool_ns)
dout("__ceph_pool_perm_get pool %lld ns %.*s no perm cached\n",
pool, (int)pool_ns->len, pool_ns->str);
else
dout("__ceph_pool_perm_get pool %lld no perm cached\n", pool);
down_write(&mdsc->pool_perm_rwsem);
p = &mdsc->pool_perm_tree.rb_node;
parent = NULL;
while (*p) {
parent = *p;
......@@ -1769,8 +1785,17 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
else if (pool > perm->pool)
p = &(*p)->rb_right;
else {
have = perm->perm;
break;
int ret = ceph_compare_string(pool_ns,
perm->pool_ns,
perm->pool_ns_len);
if (ret < 0)
p = &(*p)->rb_left;
else if (ret > 0)
p = &(*p)->rb_right;
else {
have = perm->perm;
break;
}
}
}
if (*p) {
......@@ -1788,6 +1813,8 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
rd_req->r_flags = CEPH_OSD_FLAG_READ;
osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0);
rd_req->r_base_oloc.pool = pool;
if (pool_ns)
rd_req->r_base_oloc.pool_ns = ceph_get_string(pool_ns);
ceph_oid_printf(&rd_req->r_base_oid, "%llx.00000000", ci->i_vino.ino);
err = ceph_osdc_alloc_messages(rd_req, GFP_NOFS);
......@@ -1841,7 +1868,8 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
goto out_unlock;
}
perm = kmalloc(sizeof(*perm), GFP_NOFS);
pool_ns_len = pool_ns ? pool_ns->len : 0;
perm = kmalloc(sizeof(*perm) + pool_ns_len + 1, GFP_NOFS);
if (!perm) {
err = -ENOMEM;
goto out_unlock;
......@@ -1849,6 +1877,11 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
perm->pool = pool;
perm->perm = have;
perm->pool_ns_len = pool_ns_len;
if (pool_ns_len > 0)
memcpy(perm->pool_ns, pool_ns->str, pool_ns_len);
perm->pool_ns[pool_ns_len] = 0;
rb_link_node(&perm->node, parent, p);
rb_insert_color(&perm->node, &mdsc->pool_perm_tree);
err = 0;
......@@ -1860,43 +1893,46 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
out:
if (!err)
err = have;
dout("__ceph_pool_perm_get pool %u result = %d\n", pool, err);
if (pool_ns)
dout("__ceph_pool_perm_get pool %lld ns %.*s result = %d\n",
pool, (int)pool_ns->len, pool_ns->str, err);
else
dout("__ceph_pool_perm_get pool %lld result = %d\n", pool, err);
return err;
}
int ceph_pool_perm_check(struct ceph_inode_info *ci, int need)
{
u32 pool;
s64 pool;
struct ceph_string *pool_ns;
int ret, flags;
/* does not support pool namespace yet */
if (ci->i_pool_ns_len)
return -EIO;
if (ceph_test_mount_opt(ceph_inode_to_client(&ci->vfs_inode),
NOPOOLPERM))
return 0;
spin_lock(&ci->i_ceph_lock);
flags = ci->i_ceph_flags;
pool = ceph_file_layout_pg_pool(ci->i_layout);
pool = ci->i_layout.pool_id;
spin_unlock(&ci->i_ceph_lock);
check:
if (flags & CEPH_I_POOL_PERM) {
if ((need & CEPH_CAP_FILE_RD) && !(flags & CEPH_I_POOL_RD)) {
dout("ceph_pool_perm_check pool %u no read perm\n",
dout("ceph_pool_perm_check pool %lld no read perm\n",
pool);
return -EPERM;
}
if ((need & CEPH_CAP_FILE_WR) && !(flags & CEPH_I_POOL_WR)) {
dout("ceph_pool_perm_check pool %u no write perm\n",
dout("ceph_pool_perm_check pool %lld no write perm\n",
pool);
return -EPERM;
}
return 0;
}
ret = __ceph_pool_perm_get(ci, pool);
pool_ns = ceph_try_get_string(ci->i_layout.pool_ns);
ret = __ceph_pool_perm_get(ci, pool, pool_ns);
ceph_put_string(pool_ns);
if (ret < 0)
return ret;
......@@ -1907,10 +1943,11 @@ int ceph_pool_perm_check(struct ceph_inode_info *ci, int need)
flags |= CEPH_I_POOL_WR;
spin_lock(&ci->i_ceph_lock);
if (pool == ceph_file_layout_pg_pool(ci->i_layout)) {
ci->i_ceph_flags = flags;
if (pool == ci->i_layout.pool_id &&
pool_ns == rcu_dereference_raw(ci->i_layout.pool_ns)) {
ci->i_ceph_flags |= flags;
} else {
pool = ceph_file_layout_pg_pool(ci->i_layout);
pool = ci->i_layout.pool_id;
flags = ci->i_ceph_flags;
}
spin_unlock(&ci->i_ceph_lock);
......
......@@ -71,7 +71,7 @@ int ceph_fscache_register_fs(struct ceph_fs_client* fsc)
&ceph_fscache_fsid_object_def,
fsc, true);
if (!fsc->fscache)
pr_err("Unable to resgister fsid: %p fscache cookie", fsc);
pr_err("Unable to register fsid: %p fscache cookie\n", fsc);
return 0;
}
......
This diff is collapsed.
......@@ -59,7 +59,7 @@ int ceph_init_dentry(struct dentry *dentry)
di->dentry = dentry;
di->lease_session = NULL;
dentry->d_time = jiffies;
di->time = jiffies;
/* avoid reordering d_fsdata setup so that the check above is safe */
smp_mb();
dentry->d_fsdata = di;
......@@ -1124,7 +1124,7 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
void ceph_invalidate_dentry_lease(struct dentry *dentry)
{
spin_lock(&dentry->d_lock);
dentry->d_time = jiffies;
ceph_dentry(dentry)->time = jiffies;
ceph_dentry(dentry)->lease_shared_gen = 0;
spin_unlock(&dentry->d_lock);
}
......@@ -1133,7 +1133,8 @@ void ceph_invalidate_dentry_lease(struct dentry *dentry)
* Check if dentry lease is valid. If not, delete the lease. Try to
* renew if the least is more than half up.
*/
static int dentry_lease_is_valid(struct dentry *dentry)
static int dentry_lease_is_valid(struct dentry *dentry, unsigned int flags,
struct inode *dir)
{
struct ceph_dentry_info *di;
struct ceph_mds_session *s;
......@@ -1141,12 +1142,11 @@ static int dentry_lease_is_valid(struct dentry *dentry)
u32 gen;
unsigned long ttl;
struct ceph_mds_session *session = NULL;
struct inode *dir = NULL;
u32 seq = 0;
spin_lock(&dentry->d_lock);
di = ceph_dentry(dentry);
if (di->lease_session) {
if (di && di->lease_session) {
s = di->lease_session;
spin_lock(&s->s_gen_ttl_lock);
gen = s->s_cap_gen;
......@@ -1154,17 +1154,24 @@ static int dentry_lease_is_valid(struct dentry *dentry)
spin_unlock(&s->s_gen_ttl_lock);
if (di->lease_gen == gen &&
time_before(jiffies, dentry->d_time) &&
time_before(jiffies, di->time) &&
time_before(jiffies, ttl)) {
valid = 1;
if (di->lease_renew_after &&
time_after(jiffies, di->lease_renew_after)) {
/* we should renew */
dir = d_inode(dentry->d_parent);
session = ceph_get_mds_session(s);
seq = di->lease_seq;
di->lease_renew_after = 0;
di->lease_renew_from = jiffies;
/*
* We should renew. If we're in RCU walk mode
* though, we can't do that so just return
* -ECHILD.
*/
if (flags & LOOKUP_RCU) {
valid = -ECHILD;
} else {
session = ceph_get_mds_session(s);
seq = di->lease_seq;
di->lease_renew_after = 0;
di->lease_renew_from = jiffies;
}
}
}
}
......@@ -1207,15 +1214,19 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
struct dentry *parent;
struct inode *dir;
if (flags & LOOKUP_RCU)
return -ECHILD;
if (flags & LOOKUP_RCU) {
parent = ACCESS_ONCE(dentry->d_parent);
dir = d_inode_rcu(parent);
if (!dir)
return -ECHILD;
} else {
parent = dget_parent(dentry);
dir = d_inode(parent);
}
dout("d_revalidate %p '%pd' inode %p offset %lld\n", dentry,
dentry, d_inode(dentry), ceph_dentry(dentry)->offset);
parent = dget_parent(dentry);
dir = d_inode(parent);
/* always trust cached snapped dentries, snapdir dentry */
if (ceph_snap(dir) != CEPH_NOSNAP) {
dout("d_revalidate %p '%pd' inode %p is SNAPPED\n", dentry,
......@@ -1224,12 +1235,16 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
} else if (d_really_is_positive(dentry) &&
ceph_snap(d_inode(dentry)) == CEPH_SNAPDIR) {
valid = 1;
} else if (dentry_lease_is_valid(dentry) ||
dir_lease_is_valid(dir, dentry)) {
if (d_really_is_positive(dentry))
valid = ceph_is_any_caps(d_inode(dentry));
else
valid = 1;
} else {
valid = dentry_lease_is_valid(dentry, flags, dir);
if (valid == -ECHILD)
return valid;
if (valid || dir_lease_is_valid(dir, dentry)) {
if (d_really_is_positive(dentry))
valid = ceph_is_any_caps(d_inode(dentry));
else
valid = 1;
}
}
if (!valid) {
......@@ -1238,6 +1253,9 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
struct ceph_mds_request *req;
int op, mask, err;
if (flags & LOOKUP_RCU)
return -ECHILD;
op = ceph_snap(dir) == CEPH_SNAPDIR ?
CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP;
req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS);
......@@ -1273,7 +1291,8 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
ceph_dir_clear_complete(dir);
}
dput(parent);
if (!(flags & LOOKUP_RCU))
dput(parent);
return valid;
}
......@@ -1286,10 +1305,14 @@ static void ceph_d_release(struct dentry *dentry)
dout("d_release %p\n", dentry);
ceph_dentry_lru_del(dentry);
spin_lock(&dentry->d_lock);
dentry->d_fsdata = NULL;
spin_unlock(&dentry->d_lock);
if (di->lease_session)
ceph_put_mds_session(di->lease_session);
kmem_cache_free(ceph_dentry_cachep, di);
dentry->d_fsdata = NULL;
}
static int ceph_snapdir_d_revalidate(struct dentry *dentry,
......
......@@ -708,7 +708,7 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
}
}
ceph_put_page_vector(osd_data->pages, num_pages, false);
ceph_put_page_vector(osd_data->pages, num_pages, !aio_req->write);
ceph_osdc_put_request(req);
if (rc < 0)
......@@ -821,6 +821,54 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
}
}
/*
* Wait on any unsafe replies for the given inode. First wait on the
* newest request, and make that the upper bound. Then, if there are
* more requests, keep waiting on the oldest as long as it is still older
* than the original request.
*/
void ceph_sync_write_wait(struct inode *inode)
{
struct ceph_inode_info *ci = ceph_inode(inode);
struct list_head *head = &ci->i_unsafe_writes;
struct ceph_osd_request *req;
u64 last_tid;
if (!S_ISREG(inode->i_mode))
return;
spin_lock(&ci->i_unsafe_lock);
if (list_empty(head))
goto out;
/* set upper bound as _last_ entry in chain */
req = list_last_entry(head, struct ceph_osd_request,
r_unsafe_item);
last_tid = req->r_tid;
do {
ceph_osdc_get_request(req);
spin_unlock(&ci->i_unsafe_lock);
dout("sync_write_wait on tid %llu (until %llu)\n",
req->r_tid, last_tid);
wait_for_completion(&req->r_safe_completion);
ceph_osdc_put_request(req);
spin_lock(&ci->i_unsafe_lock);
/*
* from here on look at first entry in chain, since we
* only want to wait for anything older than last_tid
*/
if (list_empty(head))
break;
req = list_first_entry(head, struct ceph_osd_request,
r_unsafe_item);
} while (req->r_tid < last_tid);
out:
spin_unlock(&ci->i_unsafe_lock);
}
static ssize_t
ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
......@@ -964,7 +1012,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
len = ret;
}
ceph_put_page_vector(pages, num_pages, false);
ceph_put_page_vector(pages, num_pages, !write);
ceph_osdc_put_request(req);
if (ret < 0)
......@@ -985,6 +1033,8 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
}
if (aio_req) {
LIST_HEAD(osd_reqs);
if (aio_req->num_reqs == 0) {
kfree(aio_req);
return ret;
......@@ -993,8 +1043,9 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
ceph_get_cap_refs(ci, write ? CEPH_CAP_FILE_WR :
CEPH_CAP_FILE_RD);
while (!list_empty(&aio_req->osd_reqs)) {
req = list_first_entry(&aio_req->osd_reqs,
list_splice(&aio_req->osd_reqs, &osd_reqs);
while (!list_empty(&osd_reqs)) {
req = list_first_entry(&osd_reqs,
struct ceph_osd_request,
r_unsafe_item);
list_del_init(&req->r_unsafe_item);
......@@ -1448,16 +1499,14 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
{
struct inode *inode = file->f_mapping->host;
loff_t i_size;
int ret;
loff_t ret;
inode_lock(inode);
if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) {
ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false);
if (ret < 0) {
offset = ret;
if (ret < 0)
goto out;
}
}
i_size = i_size_read(inode);
......@@ -1473,7 +1522,7 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
* write() or lseek() might have altered it
*/
if (offset == 0) {
offset = file->f_pos;
ret = file->f_pos;
goto out;
}
offset += file->f_pos;
......@@ -1493,11 +1542,11 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
break;
}
offset = vfs_setpos(file, offset, inode->i_sb->s_maxbytes);
ret = vfs_setpos(file, offset, inode->i_sb->s_maxbytes);
out:
inode_unlock(inode);
return offset;
return ret;
}
static inline void ceph_zero_partial_page(
......@@ -1583,9 +1632,9 @@ static int ceph_zero_objects(struct inode *inode, loff_t offset, loff_t length)
{
int ret = 0;
struct ceph_inode_info *ci = ceph_inode(inode);
s32 stripe_unit = ceph_file_layout_su(ci->i_layout);
s32 stripe_count = ceph_file_layout_stripe_count(ci->i_layout);
s32 object_size = ceph_file_layout_object_size(ci->i_layout);
s32 stripe_unit = ci->i_layout.stripe_unit;
s32 stripe_count = ci->i_layout.stripe_count;
s32 object_size = ci->i_layout.object_size;
u64 object_set_size = object_size * stripe_count;
u64 nearly, t;
......
......@@ -446,7 +446,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
ci->i_symlink = NULL;
memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout));
ci->i_pool_ns_len = 0;
RCU_INIT_POINTER(ci->i_layout.pool_ns, NULL);
ci->i_fragtree = RB_ROOT;
mutex_init(&ci->i_fragtree_mutex);
......@@ -468,7 +468,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
INIT_LIST_HEAD(&ci->i_dirty_item);
INIT_LIST_HEAD(&ci->i_flushing_item);
ci->i_prealloc_cap_flush = NULL;
ci->i_cap_flush_tree = RB_ROOT;
INIT_LIST_HEAD(&ci->i_cap_flush_list);
init_waitqueue_head(&ci->i_cap_wq);
ci->i_hold_caps_min = 0;
ci->i_hold_caps_max = 0;
......@@ -477,7 +477,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
ci->i_head_snapc = NULL;
ci->i_snap_caps = 0;
for (i = 0; i < CEPH_FILE_MODE_NUM; i++)
for (i = 0; i < CEPH_FILE_MODE_BITS; i++)
ci->i_nr_by_mode[i] = 0;
mutex_init(&ci->i_truncate_mutex);
......@@ -570,6 +570,8 @@ void ceph_destroy_inode(struct inode *inode)
if (ci->i_xattrs.prealloc_blob)
ceph_buffer_put(ci->i_xattrs.prealloc_blob);
ceph_put_string(rcu_dereference_raw(ci->i_layout.pool_ns));
call_rcu(&inode->i_rcu, ceph_i_callback);
}
......@@ -583,6 +585,14 @@ int ceph_drop_inode(struct inode *inode)
return 1;
}
void ceph_evict_inode(struct inode *inode)
{
/* wait unsafe sync writes */
ceph_sync_write_wait(inode);
truncate_inode_pages_final(&inode->i_data);
clear_inode(inode);
}
static inline blkcnt_t calc_inode_blocks(u64 size)
{
return (size + (1<<9) - 1) >> 9;
......@@ -733,6 +743,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
int issued = 0, implemented, new_issued;
struct timespec mtime, atime, ctime;
struct ceph_buffer *xattr_blob = NULL;
struct ceph_string *pool_ns = NULL;
struct ceph_cap *new_cap = NULL;
int err = 0;
bool wake = false;
......@@ -760,6 +771,10 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
iinfo->xattr_len);
}
if (iinfo->pool_ns_len > 0)
pool_ns = ceph_find_or_create_string(iinfo->pool_ns_data,
iinfo->pool_ns_len);
spin_lock(&ci->i_ceph_lock);
/*
......@@ -814,10 +829,18 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
if (new_version ||
(new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) {
if (ci->i_layout.fl_pg_pool != info->layout.fl_pg_pool)
s64 old_pool = ci->i_layout.pool_id;
struct ceph_string *old_ns;
ceph_file_layout_from_legacy(&ci->i_layout, &info->layout);
old_ns = rcu_dereference_protected(ci->i_layout.pool_ns,
lockdep_is_held(&ci->i_ceph_lock));
rcu_assign_pointer(ci->i_layout.pool_ns, pool_ns);
if (ci->i_layout.pool_id != old_pool || pool_ns != old_ns)
ci->i_ceph_flags &= ~CEPH_I_POOL_PERM;
ci->i_layout = info->layout;
ci->i_pool_ns_len = iinfo->pool_ns_len;
pool_ns = old_ns;
queue_trunc = ceph_fill_file_size(inode, issued,
le32_to_cpu(info->truncate_seq),
......@@ -985,6 +1008,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
ceph_put_cap(mdsc, new_cap);
if (xattr_blob)
ceph_buffer_put(xattr_blob);
ceph_put_string(pool_ns);
return err;
}
......@@ -1018,7 +1042,7 @@ static void update_dentry_lease(struct dentry *dentry,
goto out_unlock;
if (di->lease_gen == session->s_cap_gen &&
time_before(ttl, dentry->d_time))
time_before(ttl, di->time))
goto out_unlock; /* we already have a newer lease. */
if (di->lease_session && di->lease_session != session)
......@@ -1032,7 +1056,7 @@ static void update_dentry_lease(struct dentry *dentry,
di->lease_seq = le32_to_cpu(lease->seq);
di->lease_renew_after = half_ttl;
di->lease_renew_from = 0;
dentry->d_time = ttl;
di->time = ttl;
out_unlock:
spin_unlock(&dentry->d_lock);
return;
......
......@@ -21,10 +21,10 @@ static long ceph_ioctl_get_layout(struct file *file, void __user *arg)
err = ceph_do_getattr(file_inode(file), CEPH_STAT_CAP_LAYOUT, false);
if (!err) {
l.stripe_unit = ceph_file_layout_su(ci->i_layout);
l.stripe_count = ceph_file_layout_stripe_count(ci->i_layout);
l.object_size = ceph_file_layout_object_size(ci->i_layout);
l.data_pool = le32_to_cpu(ci->i_layout.fl_pg_pool);
l.stripe_unit = ci->i_layout.stripe_unit;
l.stripe_count = ci->i_layout.stripe_count;
l.object_size = ci->i_layout.object_size;
l.data_pool = ci->i_layout.pool_id;
l.preferred_osd = (s32)-1;
if (copy_to_user(arg, &l, sizeof(l)))
return -EFAULT;
......@@ -82,19 +82,19 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg)
if (l.stripe_count)
nl.stripe_count = l.stripe_count;
else
nl.stripe_count = ceph_file_layout_stripe_count(ci->i_layout);
nl.stripe_count = ci->i_layout.stripe_count;
if (l.stripe_unit)
nl.stripe_unit = l.stripe_unit;
else
nl.stripe_unit = ceph_file_layout_su(ci->i_layout);
nl.stripe_unit = ci->i_layout.stripe_unit;
if (l.object_size)
nl.object_size = l.object_size;
else
nl.object_size = ceph_file_layout_object_size(ci->i_layout);
nl.object_size = ci->i_layout.object_size;
if (l.data_pool)
nl.data_pool = l.data_pool;
else
nl.data_pool = ceph_file_layout_pg_pool(ci->i_layout);
nl.data_pool = ci->i_layout.pool_id;
/* this is obsolete, and always -1 */
nl.preferred_osd = le64_to_cpu(-1);
......@@ -183,7 +183,7 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
struct ceph_osd_client *osdc =
&ceph_sb_to_client(inode->i_sb)->client->osdc;
struct ceph_object_locator oloc;
struct ceph_object_id oid;
CEPH_DEFINE_OID_ONSTACK(oid);
u64 len = 1, olen;
u64 tmp;
struct ceph_pg pgid;
......@@ -202,8 +202,8 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
return -EIO;
}
dl.file_offset -= dl.object_offset;
dl.object_size = ceph_file_layout_object_size(ci->i_layout);
dl.block_size = ceph_file_layout_su(ci->i_layout);
dl.object_size = ci->i_layout.object_size;
dl.block_size = ci->i_layout.stripe_unit;
/* block_offset = object_offset % block_size */
tmp = dl.object_offset;
......@@ -212,10 +212,13 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
snprintf(dl.object_name, sizeof(dl.object_name), "%llx.%08llx",
ceph_ino(inode), dl.object_no);
oloc.pool = ceph_file_layout_pg_pool(ci->i_layout);
oloc.pool = ci->i_layout.pool_id;
oloc.pool_ns = ceph_try_get_string(ci->i_layout.pool_ns);
ceph_oid_printf(&oid, "%s", dl.object_name);
r = ceph_object_locator_to_pg(osdc->osdmap, &oid, &oloc, &pgid);
ceph_oloc_destroy(&oloc);
if (r < 0) {
up_read(&osdc->lock);
return r;
......@@ -247,9 +250,8 @@ static long ceph_ioctl_lazyio(struct file *file)
if ((fi->fmode & CEPH_FILE_MODE_LAZY) == 0) {
spin_lock(&ci->i_ceph_lock);
ci->i_nr_by_mode[fi->fmode]--;
fi->fmode |= CEPH_FILE_MODE_LAZY;
ci->i_nr_by_mode[fi->fmode]++;
ci->i_nr_by_mode[ffs(CEPH_FILE_MODE_LAZY)]++;
spin_unlock(&ci->i_ceph_lock);
dout("ioctl_layzio: file %p marked lazy\n", file);
......
This diff is collapsed.
......@@ -45,6 +45,7 @@ struct ceph_mds_reply_info_in {
u32 inline_len;
char *inline_data;
u32 pool_ns_len;
char *pool_ns_data;
};
struct ceph_mds_reply_dir_entry {
......@@ -151,7 +152,6 @@ struct ceph_mds_session {
/* protected by mutex */
struct list_head s_cap_flushing; /* inodes w/ flushing caps */
struct list_head s_cap_snaps_flushing;
unsigned long s_renew_requested; /* last time we sent a renew req */
u64 s_renew_seq;
......@@ -275,8 +275,10 @@ struct ceph_mds_request {
struct ceph_pool_perm {
struct rb_node node;
u32 pool;
int perm;
s64 pool;
size_t pool_ns_len;
char pool_ns[];
};
/*
......@@ -290,6 +292,7 @@ struct ceph_mds_client {
struct completion safe_umount_waiters;
wait_queue_head_t session_close_wq;
struct list_head waiting_for_map;
int mdsmap_err;
struct ceph_mds_session **sessions; /* NULL for mds if no session */
atomic_t num_sessions;
......@@ -321,7 +324,7 @@ struct ceph_mds_client {
spinlock_t snap_flush_lock;
u64 last_cap_flush_tid;
struct rb_root cap_flush_tree;
struct list_head cap_flush_list;
struct list_head cap_dirty; /* inodes with dirty caps */
struct list_head cap_dirty_migrating; /* ...that are migration... */
int num_cap_flushing; /* # caps we are flushing */
......@@ -382,10 +385,6 @@ extern void ceph_mdsc_destroy(struct ceph_fs_client *fsc);
extern void ceph_mdsc_sync(struct ceph_mds_client *mdsc);
extern void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc,
struct inode *inode,
struct dentry *dn);
extern void ceph_invalidate_dir_request(struct ceph_mds_request *req);
extern int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
struct inode *dir);
......@@ -420,8 +419,10 @@ extern void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
struct dentry *dentry, char action,
u32 seq);
extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc,
struct ceph_msg *msg);
extern void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc,
struct ceph_msg *msg);
extern void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc,
struct ceph_msg *msg);
extern struct ceph_mds_session *
ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target);
......
......@@ -520,9 +520,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
ihold(inode);
atomic_set(&capsnap->nref, 1);
capsnap->ci = ci;
INIT_LIST_HEAD(&capsnap->ci_item);
INIT_LIST_HEAD(&capsnap->flushing_item);
capsnap->follows = old_snapc->seq;
capsnap->issued = __ceph_caps_issued(ci, NULL);
......@@ -551,7 +549,6 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
ci->i_wrbuffer_ref_head = 0;
capsnap->context = old_snapc;
list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps);
old_snapc = NULL;
if (used & CEPH_CAP_FILE_WR) {
dout("queue_cap_snap %p cap_snap %p snapc %p"
......@@ -563,6 +560,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
__ceph_finish_cap_snap(ci, capsnap);
}
capsnap = NULL;
old_snapc = NULL;
update_snapc:
if (ci->i_head_snapc) {
......@@ -603,6 +601,8 @@ int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
capsnap->dirty_pages);
return 0;
}
ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS;
dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu\n",
inode, capsnap, capsnap->context,
capsnap->context->seq, ceph_cap_string(capsnap->dirty),
......@@ -799,9 +799,7 @@ static void flush_snaps(struct ceph_mds_client *mdsc)
inode = &ci->vfs_inode;
ihold(inode);
spin_unlock(&mdsc->snap_flush_lock);
spin_lock(&ci->i_ceph_lock);
__ceph_flush_snaps(ci, &session, 0);
spin_unlock(&ci->i_ceph_lock);
ceph_flush_snaps(ci, &session);
iput(inode);
spin_lock(&mdsc->snap_flush_lock);
}
......
......@@ -108,7 +108,6 @@ static int ceph_sync_fs(struct super_block *sb, int wait)
* mount options
*/
enum {
Opt_mds_namespace,
Opt_wsize,
Opt_rsize,
Opt_rasize,
......@@ -121,6 +120,7 @@ enum {
Opt_last_int,
/* int args above */
Opt_snapdirname,
Opt_mds_namespace,
Opt_last_string,
/* string args above */
Opt_dirstat,
......@@ -144,7 +144,6 @@ enum {
};
static match_table_t fsopt_tokens = {
{Opt_mds_namespace, "mds_namespace=%d"},
{Opt_wsize, "wsize=%d"},
{Opt_rsize, "rsize=%d"},
{Opt_rasize, "rasize=%d"},
......@@ -156,6 +155,7 @@ static match_table_t fsopt_tokens = {
{Opt_congestion_kb, "write_congestion_kb=%d"},
/* int args above */
{Opt_snapdirname, "snapdirname=%s"},
{Opt_mds_namespace, "mds_namespace=%s"},
/* string args above */
{Opt_dirstat, "dirstat"},
{Opt_nodirstat, "nodirstat"},
......@@ -212,11 +212,14 @@ static int parse_fsopt_token(char *c, void *private)
if (!fsopt->snapdir_name)
return -ENOMEM;
break;
/* misc */
case Opt_mds_namespace:
fsopt->mds_namespace = intval;
fsopt->mds_namespace = kstrndup(argstr[0].from,
argstr[0].to-argstr[0].from,
GFP_KERNEL);
if (!fsopt->mds_namespace)
return -ENOMEM;
break;
/* misc */
case Opt_wsize:
fsopt->wsize = intval;
break;
......@@ -302,6 +305,7 @@ static void destroy_mount_options(struct ceph_mount_options *args)
{
dout("destroy_mount_options %p\n", args);
kfree(args->snapdir_name);
kfree(args->mds_namespace);
kfree(args->server_path);
kfree(args);
}
......@@ -331,6 +335,9 @@ static int compare_mount_options(struct ceph_mount_options *new_fsopt,
return ret;
ret = strcmp_null(fsopt1->snapdir_name, fsopt2->snapdir_name);
if (ret)
return ret;
ret = strcmp_null(fsopt1->mds_namespace, fsopt2->mds_namespace);
if (ret)
return ret;
......@@ -376,7 +383,6 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,
fsopt->max_readdir = CEPH_MAX_READDIR_DEFAULT;
fsopt->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT;
fsopt->congestion_kb = default_congestion_kb();
fsopt->mds_namespace = CEPH_FS_CLUSTER_ID_NONE;
/*
* Distinguish the server list from the path in "dev_name".
......@@ -469,8 +475,8 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
seq_puts(m, ",noacl");
#endif
if (fsopt->mds_namespace != CEPH_FS_CLUSTER_ID_NONE)
seq_printf(m, ",mds_namespace=%d", fsopt->mds_namespace);
if (fsopt->mds_namespace)
seq_printf(m, ",mds_namespace=%s", fsopt->mds_namespace);
if (fsopt->wsize)
seq_printf(m, ",wsize=%d", fsopt->wsize);
if (fsopt->rsize != CEPH_RSIZE_DEFAULT)
......@@ -509,9 +515,11 @@ static int extra_mon_dispatch(struct ceph_client *client, struct ceph_msg *msg)
switch (type) {
case CEPH_MSG_MDS_MAP:
ceph_mdsc_handle_map(fsc->mdsc, msg);
ceph_mdsc_handle_mdsmap(fsc->mdsc, msg);
return 0;
case CEPH_MSG_FS_MAP_USER:
ceph_mdsc_handle_fsmap(fsc->mdsc, msg);
return 0;
default:
return -1;
}
......@@ -543,8 +551,14 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
goto fail;
}
fsc->client->extra_mon_dispatch = extra_mon_dispatch;
fsc->client->monc.fs_cluster_id = fsopt->mds_namespace;
ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 0, true);
if (fsopt->mds_namespace == NULL) {
ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
0, true);
} else {
ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_FSMAP,
0, false);
}
fsc->mount_options = fsopt;
......@@ -672,8 +686,8 @@ static int __init init_caches(void)
if (ceph_dentry_cachep == NULL)
goto bad_dentry;
ceph_file_cachep = KMEM_CACHE(ceph_file_info,
SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD);
ceph_file_cachep = KMEM_CACHE(ceph_file_info, SLAB_MEM_SPREAD);
if (ceph_file_cachep == NULL)
goto bad_file;
......@@ -731,6 +745,7 @@ static const struct super_operations ceph_super_ops = {
.destroy_inode = ceph_destroy_inode,
.write_inode = ceph_write_inode,
.drop_inode = ceph_drop_inode,
.evict_inode = ceph_evict_inode,
.sync_fs = ceph_sync_fs,
.put_super = ceph_put_super,
.show_options = ceph_show_options,
......
......@@ -62,7 +62,6 @@ struct ceph_mount_options {
int cap_release_safety;
int max_readdir; /* max readdir result (entires) */
int max_readdir_bytes; /* max readdir result (bytes) */
int mds_namespace;
/*
* everything above this point can be memcmp'd; everything below
......@@ -70,6 +69,7 @@ struct ceph_mount_options {
*/
char *snapdir_name; /* default ".snap" */
char *mds_namespace; /* default NULL */
char *server_path; /* default "/" */
};
......@@ -147,6 +147,14 @@ struct ceph_cap {
#define CHECK_CAPS_AUTHONLY 2 /* only check auth cap */
#define CHECK_CAPS_FLUSH 4 /* flush any dirty caps */
struct ceph_cap_flush {
u64 tid;
int caps; /* 0 means capsnap */
bool wake; /* wake up flush waiters when finish ? */
struct list_head g_list; // global
struct list_head i_list; // per inode
};
/*
* Snapped cap state that is pending flush to mds. When a snapshot occurs,
* we first complete any in-process sync writes and writeback any dirty
......@@ -154,10 +162,11 @@ struct ceph_cap {
*/
struct ceph_cap_snap {
atomic_t nref;
struct ceph_inode_info *ci;
struct list_head ci_item, flushing_item;
struct list_head ci_item;
struct ceph_cap_flush cap_flush;
u64 follows, flush_tid;
u64 follows;
int issued, dirty;
struct ceph_snap_context *context;
......@@ -186,16 +195,6 @@ static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap)
}
}
struct ceph_cap_flush {
u64 tid;
int caps;
struct rb_node g_node; // global
union {
struct rb_node i_node; // inode
struct list_head list;
};
};
/*
* The frag tree describes how a directory is fragmented, potentially across
* multiple metadata servers. It is also used to indicate points where
......@@ -246,7 +245,7 @@ struct ceph_dentry_info {
unsigned long lease_renew_after, lease_renew_from;
struct list_head lru;
struct dentry *dentry;
u64 time;
unsigned long time;
u64 offset;
};
......@@ -287,7 +286,6 @@ struct ceph_inode_info {
struct ceph_dir_layout i_dir_layout;
struct ceph_file_layout i_layout;
size_t i_pool_ns_len;
char *i_symlink;
/* for dirs */
......@@ -311,7 +309,7 @@ struct ceph_inode_info {
* overlapping, pipelined cap flushes to the mds. we can probably
* reduce the tid to 8 bits if we're concerned about inode size. */
struct ceph_cap_flush *i_prealloc_cap_flush;
struct rb_root i_cap_flush_tree;
struct list_head i_cap_flush_list;
wait_queue_head_t i_cap_wq; /* threads waiting on a capability */
unsigned long i_hold_caps_min; /* jiffies */
unsigned long i_hold_caps_max; /* jiffies */
......@@ -322,7 +320,7 @@ struct ceph_inode_info {
dirty|flushing caps */
unsigned i_snap_caps; /* cap bits for snapped files */
int i_nr_by_mode[CEPH_FILE_MODE_NUM]; /* open file counts */
int i_nr_by_mode[CEPH_FILE_MODE_BITS]; /* open file counts */
struct mutex i_truncate_mutex;
u32 i_truncate_seq; /* last truncate to smaller size */
......@@ -471,6 +469,8 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
#define CEPH_I_POOL_WR (1 << 6) /* can write to pool */
#define CEPH_I_SEC_INITED (1 << 7) /* security initialized */
#define CEPH_I_CAP_DROPPED (1 << 8) /* caps were forcibly dropped */
#define CEPH_I_KICK_FLUSH (1 << 9) /* kick flushing caps */
#define CEPH_I_FLUSH_SNAPS (1 << 10) /* need flush snapss */
static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
long long release_count,
......@@ -750,6 +750,7 @@ extern const struct inode_operations ceph_file_iops;
extern struct inode *ceph_alloc_inode(struct super_block *sb);
extern void ceph_destroy_inode(struct inode *inode);
extern int ceph_drop_inode(struct inode *inode);
extern void ceph_evict_inode(struct inode *inode);
extern struct inode *ceph_get_inode(struct super_block *sb,
struct ceph_vino vino);
......@@ -890,9 +891,8 @@ extern void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps);
extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had);
extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
struct ceph_snap_context *snapc);
extern void __ceph_flush_snaps(struct ceph_inode_info *ci,
struct ceph_mds_session **psession,
int again);
extern void ceph_flush_snaps(struct ceph_inode_info *ci,
struct ceph_mds_session **psession);
extern void ceph_check_caps(struct ceph_inode_info *ci, int flags,
struct ceph_mds_session *session);
extern void ceph_check_delayed_caps(struct ceph_mds_client *mdsc);
......@@ -907,10 +907,7 @@ extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
loff_t endoff, int *got, struct page **pinned_page);
/* for counting open files by mode */
static inline void __ceph_get_fmode(struct ceph_inode_info *ci, int mode)
{
ci->i_nr_by_mode[mode]++;
}
extern void __ceph_get_fmode(struct ceph_inode_info *ci, int mode);
extern void ceph_put_fmode(struct ceph_inode_info *ci, int mode);
/* addr.c */
......@@ -931,6 +928,7 @@ extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
extern int ceph_release(struct inode *inode, struct file *filp);
extern void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
char *data, size_t len);
extern void ceph_sync_write_wait(struct inode *inode);
/* dir.c */
extern const struct file_operations ceph_dir_fops;
extern const struct file_operations ceph_snapdir_fops;
......
......@@ -57,81 +57,88 @@ struct ceph_vxattr {
static bool ceph_vxattrcb_layout_exists(struct ceph_inode_info *ci)
{
size_t s;
char *p = (char *)&ci->i_layout;
for (s = 0; s < sizeof(ci->i_layout); s++, p++)
if (*p)
return true;
return false;
struct ceph_file_layout *fl = &ci->i_layout;
return (fl->stripe_unit > 0 || fl->stripe_count > 0 ||
fl->object_size > 0 || fl->pool_id >= 0 ||
rcu_dereference_raw(fl->pool_ns) != NULL);
}
static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val,
size_t size)
{
int ret;
struct ceph_fs_client *fsc = ceph_sb_to_client(ci->vfs_inode.i_sb);
struct ceph_osd_client *osdc = &fsc->client->osdc;
s64 pool = ceph_file_layout_pg_pool(ci->i_layout);
struct ceph_string *pool_ns;
s64 pool = ci->i_layout.pool_id;
const char *pool_name;
const char *ns_field = " pool_namespace=";
char buf[128];
size_t len, total_len = 0;
int ret;
pool_ns = ceph_try_get_string(ci->i_layout.pool_ns);
dout("ceph_vxattrcb_layout %p\n", &ci->vfs_inode);
down_read(&osdc->lock);
pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool);
if (pool_name) {
size_t len = strlen(pool_name);
ret = snprintf(buf, sizeof(buf),
"stripe_unit=%lld stripe_count=%lld object_size=%lld pool=",
(unsigned long long)ceph_file_layout_su(ci->i_layout),
(unsigned long long)ceph_file_layout_stripe_count(ci->i_layout),
(unsigned long long)ceph_file_layout_object_size(ci->i_layout));
if (!size) {
ret += len;
} else if (ret + len > size) {
ret = -ERANGE;
} else {
memcpy(val, buf, ret);
len = snprintf(buf, sizeof(buf),
"stripe_unit=%u stripe_count=%u object_size=%u pool=",
ci->i_layout.stripe_unit, ci->i_layout.stripe_count,
ci->i_layout.object_size);
total_len = len + strlen(pool_name);
} else {
len = snprintf(buf, sizeof(buf),
"stripe_unit=%u stripe_count=%u object_size=%u pool=%lld",
ci->i_layout.stripe_unit, ci->i_layout.stripe_count,
ci->i_layout.object_size, (unsigned long long)pool);
total_len = len;
}
if (pool_ns)
total_len += strlen(ns_field) + pool_ns->len;
if (!size) {
ret = total_len;
} else if (total_len > size) {
ret = -ERANGE;
} else {
memcpy(val, buf, len);
ret = len;
if (pool_name) {
len = strlen(pool_name);
memcpy(val + ret, pool_name, len);
ret += len;
}
} else {
ret = snprintf(buf, sizeof(buf),
"stripe_unit=%lld stripe_count=%lld object_size=%lld pool=%lld",
(unsigned long long)ceph_file_layout_su(ci->i_layout),
(unsigned long long)ceph_file_layout_stripe_count(ci->i_layout),
(unsigned long long)ceph_file_layout_object_size(ci->i_layout),
(unsigned long long)pool);
if (size) {
if (ret <= size)
memcpy(val, buf, ret);
else
ret = -ERANGE;
if (pool_ns) {
len = strlen(ns_field);
memcpy(val + ret, ns_field, len);
ret += len;
memcpy(val + ret, pool_ns->str, pool_ns->len);
ret += pool_ns->len;
}
}
up_read(&osdc->lock);
ceph_put_string(pool_ns);
return ret;
}
static size_t ceph_vxattrcb_layout_stripe_unit(struct ceph_inode_info *ci,
char *val, size_t size)
{
return snprintf(val, size, "%lld",
(unsigned long long)ceph_file_layout_su(ci->i_layout));
return snprintf(val, size, "%u", ci->i_layout.stripe_unit);
}
static size_t ceph_vxattrcb_layout_stripe_count(struct ceph_inode_info *ci,
char *val, size_t size)
{
return snprintf(val, size, "%lld",
(unsigned long long)ceph_file_layout_stripe_count(ci->i_layout));
return snprintf(val, size, "%u", ci->i_layout.stripe_count);
}
static size_t ceph_vxattrcb_layout_object_size(struct ceph_inode_info *ci,
char *val, size_t size)
{
return snprintf(val, size, "%lld",
(unsigned long long)ceph_file_layout_object_size(ci->i_layout));
return snprintf(val, size, "%u", ci->i_layout.object_size);
}
static size_t ceph_vxattrcb_layout_pool(struct ceph_inode_info *ci,
......@@ -140,7 +147,7 @@ static size_t ceph_vxattrcb_layout_pool(struct ceph_inode_info *ci,
int ret;
struct ceph_fs_client *fsc = ceph_sb_to_client(ci->vfs_inode.i_sb);
struct ceph_osd_client *osdc = &fsc->client->osdc;
s64 pool = ceph_file_layout_pg_pool(ci->i_layout);
s64 pool = ci->i_layout.pool_id;
const char *pool_name;
down_read(&osdc->lock);
......@@ -153,6 +160,18 @@ static size_t ceph_vxattrcb_layout_pool(struct ceph_inode_info *ci,
return ret;
}
static size_t ceph_vxattrcb_layout_pool_namespace(struct ceph_inode_info *ci,
char *val, size_t size)
{
int ret = 0;
struct ceph_string *ns = ceph_try_get_string(ci->i_layout.pool_ns);
if (ns) {
ret = snprintf(val, size, "%.*s", (int)ns->len, ns->str);
ceph_put_string(ns);
}
return ret;
}
/* directories */
static size_t ceph_vxattrcb_dir_entries(struct ceph_inode_info *ci, char *val,
......@@ -241,6 +260,7 @@ static struct ceph_vxattr ceph_dir_vxattrs[] = {
XATTR_LAYOUT_FIELD(dir, layout, stripe_count),
XATTR_LAYOUT_FIELD(dir, layout, object_size),
XATTR_LAYOUT_FIELD(dir, layout, pool),
XATTR_LAYOUT_FIELD(dir, layout, pool_namespace),
XATTR_NAME_CEPH(dir, entries),
XATTR_NAME_CEPH(dir, files),
XATTR_NAME_CEPH(dir, subdirs),
......@@ -268,6 +288,7 @@ static struct ceph_vxattr ceph_file_vxattrs[] = {
XATTR_LAYOUT_FIELD(file, layout, stripe_count),
XATTR_LAYOUT_FIELD(file, layout, object_size),
XATTR_LAYOUT_FIELD(file, layout, pool),
XATTR_LAYOUT_FIELD(file, layout, pool_namespace),
{ .name = NULL, 0 } /* Required table terminator */
};
static size_t ceph_file_vxattrs_name_size; /* total size of all names */
......
......@@ -34,9 +34,9 @@
#define CEPH_MAX_MON 31
/*
* ceph_file_layout - describe data layout for a file/inode
* legacy ceph_file_layoute
*/
struct ceph_file_layout {
struct ceph_file_layout_legacy {
/* file -> object mapping */
__le32 fl_stripe_unit; /* stripe unit, in bytes. must be multiple
of page size. */
......@@ -53,33 +53,27 @@ struct ceph_file_layout {
__le32 fl_pg_pool; /* namespace, crush ruleset, rep level */
} __attribute__ ((packed));
#define ceph_file_layout_su(l) ((__s32)le32_to_cpu((l).fl_stripe_unit))
#define ceph_file_layout_stripe_count(l) \
((__s32)le32_to_cpu((l).fl_stripe_count))
#define ceph_file_layout_object_size(l) ((__s32)le32_to_cpu((l).fl_object_size))
#define ceph_file_layout_cas_hash(l) ((__s32)le32_to_cpu((l).fl_cas_hash))
#define ceph_file_layout_object_su(l) \
((__s32)le32_to_cpu((l).fl_object_stripe_unit))
#define ceph_file_layout_pg_pool(l) \
((__s32)le32_to_cpu((l).fl_pg_pool))
static inline unsigned ceph_file_layout_stripe_width(struct ceph_file_layout *l)
{
return le32_to_cpu(l->fl_stripe_unit) *
le32_to_cpu(l->fl_stripe_count);
}
/* "period" == bytes before i start on a new set of objects */
static inline unsigned ceph_file_layout_period(struct ceph_file_layout *l)
{
return le32_to_cpu(l->fl_object_size) *
le32_to_cpu(l->fl_stripe_count);
}
struct ceph_string;
/*
* ceph_file_layout - describe data layout for a file/inode
*/
struct ceph_file_layout {
/* file -> object mapping */
u32 stripe_unit; /* stripe unit, in bytes */
u32 stripe_count; /* over this many objects */
u32 object_size; /* until objects are this big */
s64 pool_id; /* rados pool id */
struct ceph_string __rcu *pool_ns; /* rados pool namespace */
};
extern int ceph_file_layout_is_valid(const struct ceph_file_layout *layout);
extern void ceph_file_layout_from_legacy(struct ceph_file_layout *fl,
struct ceph_file_layout_legacy *legacy);
extern void ceph_file_layout_to_legacy(struct ceph_file_layout *fl,
struct ceph_file_layout_legacy *legacy);
#define CEPH_MIN_STRIPE_UNIT 65536
int ceph_file_layout_is_valid(const struct ceph_file_layout *layout);
struct ceph_dir_layout {
__u8 dl_dir_hash; /* see ceph_hash.h for ids */
__u8 dl_unused1;
......@@ -127,6 +121,7 @@ struct ceph_dir_layout {
/* client <-> mds */
#define CEPH_MSG_MDS_MAP 21
#define CEPH_MSG_FS_MAP_USER 103
#define CEPH_MSG_CLIENT_SESSION 22
#define CEPH_MSG_CLIENT_RECONNECT 23
......@@ -399,7 +394,7 @@ union ceph_mds_request_args {
__le32 flags;
} __attribute__ ((packed)) setxattr;
struct {
struct ceph_file_layout layout;
struct ceph_file_layout_legacy layout;
} __attribute__ ((packed)) setlayout;
struct {
__u8 rule; /* currently fcntl or flock */
......@@ -478,7 +473,7 @@ struct ceph_mds_reply_inode {
__le64 version; /* inode version */
__le64 xattr_version; /* version for xattr blob */
struct ceph_mds_reply_cap cap; /* caps issued for this inode */
struct ceph_file_layout layout;
struct ceph_file_layout_legacy layout;
struct ceph_timespec ctime, mtime, atime;
__le32 time_warp_seq;
__le64 size, max_size, truncate_size;
......@@ -531,7 +526,7 @@ struct ceph_filelock {
#define CEPH_FILE_MODE_WR 2
#define CEPH_FILE_MODE_RDWR 3 /* RD | WR */
#define CEPH_FILE_MODE_LAZY 4 /* lazy io */
#define CEPH_FILE_MODE_NUM 8 /* bc these are bit fields.. mostly */
#define CEPH_FILE_MODE_BITS 4
int ceph_flags_to_mode(int flags);
......@@ -673,7 +668,7 @@ struct ceph_mds_caps {
__le64 size, max_size, truncate_size;
__le32 truncate_seq;
struct ceph_timespec mtime, atime, ctime;
struct ceph_file_layout layout;
struct ceph_file_layout_legacy layout;
__le32 time_warp_seq;
} __attribute__ ((packed));
......
......@@ -3,6 +3,7 @@
#include <linux/err.h>
#include <linux/bug.h>
#include <linux/slab.h>
#include <linux/time.h>
#include <asm/unaligned.h>
......@@ -217,6 +218,60 @@ static inline void ceph_encode_string(void **p, void *end,
*p += len;
}
/*
* version and length starting block encoders/decoders
*/
/* current code version (u8) + compat code version (u8) + len of struct (u32) */
#define CEPH_ENCODING_START_BLK_LEN 6
/**
* ceph_start_encoding - start encoding block
* @struct_v: current (code) version of the encoding
* @struct_compat: oldest code version that can decode it
* @struct_len: length of struct encoding
*/
static inline void ceph_start_encoding(void **p, u8 struct_v, u8 struct_compat,
u32 struct_len)
{
ceph_encode_8(p, struct_v);
ceph_encode_8(p, struct_compat);
ceph_encode_32(p, struct_len);
}
/**
* ceph_start_decoding - start decoding block
* @v: current version of the encoding that the code supports
* @name: name of the struct (free-form)
* @struct_v: out param for the encoding version
* @struct_len: out param for the length of struct encoding
*
* Validates the length of struct encoding, so unsafe ceph_decode_*
* variants can be used for decoding.
*/
static inline int ceph_start_decoding(void **p, void *end, u8 v,
const char *name, u8 *struct_v,
u32 *struct_len)
{
u8 struct_compat;
ceph_decode_need(p, end, CEPH_ENCODING_START_BLK_LEN, bad);
*struct_v = ceph_decode_8(p);
struct_compat = ceph_decode_8(p);
if (v < struct_compat) {
pr_warn("got struct_v %d struct_compat %d > %d of %s\n",
*struct_v, struct_compat, v, name);
return -EINVAL;
}
*struct_len = ceph_decode_32(p);
ceph_decode_need(p, end, *struct_len, bad);
return 0;
bad:
return -ERANGE;
}
#define ceph_encode_need(p, end, n, bad) \
do { \
if (!likely(ceph_has_room(p, end, n))) \
......
......@@ -21,6 +21,7 @@
#include <linux/ceph/mon_client.h>
#include <linux/ceph/osd_client.h>
#include <linux/ceph/ceph_fs.h>
#include <linux/ceph/string_table.h>
/*
* mount options
......@@ -214,8 +215,9 @@ static void erase_##name(struct rb_root *root, type *t) \
}
#define DEFINE_RB_LOOKUP_FUNC(name, type, keyfld, nodefld) \
extern type __lookup_##name##_key; \
static type *lookup_##name(struct rb_root *root, \
typeof(((type *)0)->keyfld) key) \
typeof(__lookup_##name##_key.keyfld) key) \
{ \
struct rb_node *n = root->rb_node; \
\
......
......@@ -95,7 +95,7 @@ struct ceph_mon_client {
struct ceph_mon_subscribe_item item;
bool want;
u32 have; /* epoch */
} subs[3];
} subs[4];
int fs_cluster_id; /* "mdsmap.<id>" sub */
#ifdef CONFIG_DEBUG_FS
......@@ -111,9 +111,10 @@ extern int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl);
extern void ceph_monc_stop(struct ceph_mon_client *monc);
enum {
CEPH_SUB_MDSMAP = 0,
CEPH_SUB_MONMAP,
CEPH_SUB_MONMAP = 0,
CEPH_SUB_OSDMAP,
CEPH_SUB_FSMAP,
CEPH_SUB_MDSMAP,
};
extern const char *ceph_sub_str[];
......
......@@ -2,7 +2,6 @@
#define _FS_CEPH_MSGPOOL
#include <linux/mempool.h>
#include <linux/ceph/messenger.h>
/*
* we use memory pools for preallocating messages we may receive, to
......
......@@ -9,6 +9,7 @@
#include <linux/ceph/types.h>
#include <linux/ceph/osdmap.h>
#include <linux/ceph/messenger.h>
#include <linux/ceph/msgpool.h>
#include <linux/ceph/auth.h>
#include <linux/ceph/pagelist.h>
......
......@@ -63,11 +63,13 @@ static inline bool ceph_can_shift_osds(struct ceph_pg_pool_info *pool)
struct ceph_object_locator {
s64 pool;
struct ceph_string *pool_ns;
};
static inline void ceph_oloc_init(struct ceph_object_locator *oloc)
{
oloc->pool = -1;
oloc->pool_ns = NULL;
}
static inline bool ceph_oloc_empty(const struct ceph_object_locator *oloc)
......@@ -75,11 +77,9 @@ static inline bool ceph_oloc_empty(const struct ceph_object_locator *oloc)
return oloc->pool == -1;
}
static inline void ceph_oloc_copy(struct ceph_object_locator *dest,
const struct ceph_object_locator *src)
{
dest->pool = src->pool;
}
void ceph_oloc_copy(struct ceph_object_locator *dest,
const struct ceph_object_locator *src);
void ceph_oloc_destroy(struct ceph_object_locator *oloc);
/*
* Maximum supported by kernel client object name length
......@@ -115,6 +115,11 @@ static inline void ceph_oid_init(struct ceph_object_id *oid)
oid->name_len = 0;
}
#define CEPH_OID_INIT_ONSTACK(oid) \
({ ceph_oid_init(&oid); oid; })
#define CEPH_DEFINE_OID_ONSTACK(oid) \
struct ceph_object_id oid = CEPH_OID_INIT_ONSTACK(oid)
static inline bool ceph_oid_empty(const struct ceph_object_id *oid)
{
return oid->name == oid->inline_name && !oid->name_len;
......
#ifndef _FS_CEPH_STRING_TABLE_H
#define _FS_CEPH_STRING_TABLE_H
#include <linux/types.h>
#include <linux/kref.h>
#include <linux/rbtree.h>
#include <linux/rcupdate.h>
struct ceph_string {
struct kref kref;
union {
struct rb_node node;
struct rcu_head rcu;
};
size_t len;
char str[];
};
extern void ceph_release_string(struct kref *ref);
extern struct ceph_string *ceph_find_or_create_string(const char *str,
size_t len);
extern bool ceph_strings_empty(void);
static inline struct ceph_string *ceph_get_string(struct ceph_string *str)
{
kref_get(&str->kref);
return str;
}
static inline void ceph_put_string(struct ceph_string *str)
{
if (!str)
return;
kref_put(&str->kref, ceph_release_string);
}
static inline int ceph_compare_string(struct ceph_string *cs,
const char* str, size_t len)
{
size_t cs_len = cs ? cs->len : 0;
if (cs_len != len)
return cs_len - len;
if (len == 0)
return 0;
return strncmp(cs->str, str, len);
}
#define ceph_try_get_string(x) \
({ \
struct ceph_string *___str; \
rcu_read_lock(); \
for (;;) { \
___str = rcu_dereference(x); \
if (!___str || \
kref_get_unless_zero(&___str->kref)) \
break; \
} \
rcu_read_unlock(); \
(___str); \
})
#endif
......@@ -11,5 +11,5 @@ libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \
crypto.o armor.o \
auth_x.o \
ceph_fs.o ceph_strings.o ceph_hash.o \
pagevec.o snapshot.o
pagevec.o snapshot.o string_table.o
......@@ -747,6 +747,8 @@ static int __init init_ceph_lib(void)
static void __exit exit_ceph_lib(void)
{
dout("exit_ceph_lib\n");
WARN_ON(!ceph_strings_empty());
ceph_osdc_cleanup();
ceph_msgr_exit();
ceph_crypto_shutdown();
......
......@@ -9,9 +9,9 @@
*/
int ceph_file_layout_is_valid(const struct ceph_file_layout *layout)
{
__u32 su = le32_to_cpu(layout->fl_stripe_unit);
__u32 sc = le32_to_cpu(layout->fl_stripe_count);
__u32 os = le32_to_cpu(layout->fl_object_size);
__u32 su = layout->stripe_unit;
__u32 sc = layout->stripe_count;
__u32 os = layout->object_size;
/* stripe unit, object size must be non-zero, 64k increment */
if (!su || (su & (CEPH_MIN_STRIPE_UNIT-1)))
......@@ -27,6 +27,30 @@ int ceph_file_layout_is_valid(const struct ceph_file_layout *layout)
return 1;
}
void ceph_file_layout_from_legacy(struct ceph_file_layout *fl,
struct ceph_file_layout_legacy *legacy)
{
fl->stripe_unit = le32_to_cpu(legacy->fl_stripe_unit);
fl->stripe_count = le32_to_cpu(legacy->fl_stripe_count);
fl->object_size = le32_to_cpu(legacy->fl_object_size);
fl->pool_id = le32_to_cpu(legacy->fl_pg_pool);
if (fl->pool_id == 0)
fl->pool_id = -1;
}
EXPORT_SYMBOL(ceph_file_layout_from_legacy);
void ceph_file_layout_to_legacy(struct ceph_file_layout *fl,
struct ceph_file_layout_legacy *legacy)
{
legacy->fl_stripe_unit = cpu_to_le32(fl->stripe_unit);
legacy->fl_stripe_count = cpu_to_le32(fl->stripe_count);
legacy->fl_object_size = cpu_to_le32(fl->object_size);
if (fl->pool_id >= 0)
legacy->fl_pg_pool = cpu_to_le32(fl->pool_id);
else
legacy->fl_pg_pool = 0;
}
EXPORT_SYMBOL(ceph_file_layout_to_legacy);
int ceph_flags_to_mode(int flags)
{
......
......@@ -156,8 +156,16 @@ static void dump_target(struct seq_file *s, struct ceph_osd_request_target *t)
seq_printf(s, "]/%d\t[", t->up.primary);
for (i = 0; i < t->acting.size; i++)
seq_printf(s, "%s%d", (!i ? "" : ","), t->acting.osds[i]);
seq_printf(s, "]/%d\t%*pE\t0x%x", t->acting.primary,
t->target_oid.name_len, t->target_oid.name, t->flags);
seq_printf(s, "]/%d\t", t->acting.primary);
if (t->target_oloc.pool_ns) {
seq_printf(s, "%*pE/%*pE\t0x%x",
(int)t->target_oloc.pool_ns->len,
t->target_oloc.pool_ns->str,
t->target_oid.name_len, t->target_oid.name, t->flags);
} else {
seq_printf(s, "%*pE\t0x%x", t->target_oid.name_len,
t->target_oid.name, t->flags);
}
if (t->paused)
seq_puts(s, "\tP");
}
......
......@@ -227,9 +227,10 @@ static void __schedule_delayed(struct ceph_mon_client *monc)
}
const char *ceph_sub_str[] = {
[CEPH_SUB_MDSMAP] = "mdsmap",
[CEPH_SUB_MONMAP] = "monmap",
[CEPH_SUB_OSDMAP] = "osdmap",
[CEPH_SUB_FSMAP] = "fsmap.user",
[CEPH_SUB_MDSMAP] = "mdsmap",
};
/*
......@@ -1193,6 +1194,7 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
case CEPH_MSG_MON_MAP:
case CEPH_MSG_MDS_MAP:
case CEPH_MSG_OSD_MAP:
case CEPH_MSG_FS_MAP_USER:
m = ceph_msg_new(type, front_len, GFP_NOFS, false);
if (!m)
return NULL; /* ENOMEM--return skip == 0 */
......
......@@ -5,6 +5,7 @@
#include <linux/types.h>
#include <linux/vmalloc.h>
#include <linux/ceph/messenger.h>
#include <linux/ceph/msgpool.h>
static void *msgpool_alloc(gfp_t gfp_mask, void *arg)
......
......@@ -387,7 +387,9 @@ static void target_copy(struct ceph_osd_request_target *dest,
static void target_destroy(struct ceph_osd_request_target *t)
{
ceph_oid_destroy(&t->base_oid);
ceph_oloc_destroy(&t->base_oloc);
ceph_oid_destroy(&t->target_oid);
ceph_oloc_destroy(&t->target_oloc);
}
/*
......@@ -533,6 +535,11 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
}
EXPORT_SYMBOL(ceph_osdc_alloc_request);
static int ceph_oloc_encoding_size(struct ceph_object_locator *oloc)
{
return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0);
}
int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
{
struct ceph_osd_client *osdc = req->r_osdc;
......@@ -540,11 +547,13 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
int msg_size;
WARN_ON(ceph_oid_empty(&req->r_base_oid));
WARN_ON(ceph_oloc_empty(&req->r_base_oloc));
/* create request message */
msg_size = 4 + 4 + 4; /* client_inc, osdmap_epoch, flags */
msg_size += 4 + 4 + 4 + 8; /* mtime, reassert_version */
msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
msg_size += CEPH_ENCODING_START_BLK_LEN +
ceph_oloc_encoding_size(&req->r_base_oloc); /* oloc */
msg_size += 1 + 8 + 4 + 4; /* pgid */
msg_size += 4 + req->r_base_oid.name_len; /* oid */
msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op);
......@@ -932,7 +941,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) {
osd_req_op_init(req, which, opcode, 0);
} else {
u32 object_size = le32_to_cpu(layout->fl_object_size);
u32 object_size = layout->object_size;
u32 object_base = off - objoff;
if (!(truncate_seq == 1 && truncate_size == -1ULL)) {
if (truncate_size <= object_base) {
......@@ -948,7 +957,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
}
req->r_flags = flags;
req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout);
req->r_base_oloc.pool = layout->pool_id;
req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns);
ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum);
req->r_snapid = vino.snap;
......@@ -1489,12 +1499,16 @@ static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg)
p += sizeof(req->r_replay_version);
/* oloc */
ceph_encode_8(&p, 4);
ceph_encode_8(&p, 4);
ceph_encode_32(&p, 8 + 4 + 4);
ceph_start_encoding(&p, 5, 4,
ceph_oloc_encoding_size(&req->r_t.target_oloc));
ceph_encode_64(&p, req->r_t.target_oloc.pool);
ceph_encode_32(&p, -1); /* preferred */
ceph_encode_32(&p, 0); /* key len */
if (req->r_t.target_oloc.pool_ns)
ceph_encode_string(&p, end, req->r_t.target_oloc.pool_ns->str,
req->r_t.target_oloc.pool_ns->len);
else
ceph_encode_32(&p, 0);
/* pgid */
ceph_encode_8(&p, 1);
......@@ -2594,9 +2608,22 @@ static int ceph_oloc_decode(void **p, void *end,
}
if (struct_v >= 5) {
bool changed = false;
len = ceph_decode_32(p);
if (len > 0) {
pr_warn("ceph_object_locator::nspace is set\n");
ceph_decode_need(p, end, len, e_inval);
if (!oloc->pool_ns ||
ceph_compare_string(oloc->pool_ns, *p, len))
changed = true;
*p += len;
} else {
if (oloc->pool_ns)
changed = true;
}
if (changed) {
/* redirect changes namespace */
pr_warn("ceph_object_locator::nspace is changed\n");
goto e_inval;
}
}
......@@ -2806,7 +2833,9 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
goto out_unlock_session;
}
m.redirect.oloc.pool_ns = req->r_t.target_oloc.pool_ns;
ret = decode_MOSDOpReply(msg, &m);
m.redirect.oloc.pool_ns = NULL;
if (ret) {
pr_err("failed to decode MOSDOpReply for tid %llu: %d\n",
req->r_tid, ret);
......@@ -2835,7 +2864,11 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
unlink_request(osd, req);
mutex_unlock(&osd->lock);
ceph_oloc_copy(&req->r_t.target_oloc, &m.redirect.oloc);
/*
* Not ceph_oloc_copy() - changing pool_ns is not
* supported.
*/
req->r_t.target_oloc.pool = m.redirect.oloc.pool;
req->r_flags |= CEPH_OSD_FLAG_REDIRECTED;
req->r_tid = 0;
__submit_request(req, false);
......
......@@ -1510,6 +1510,24 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
return ERR_PTR(err);
}
void ceph_oloc_copy(struct ceph_object_locator *dest,
const struct ceph_object_locator *src)
{
WARN_ON(!ceph_oloc_empty(dest));
WARN_ON(dest->pool_ns); /* empty() only covers ->pool */
dest->pool = src->pool;
if (src->pool_ns)
dest->pool_ns = ceph_get_string(src->pool_ns);
}
EXPORT_SYMBOL(ceph_oloc_copy);
void ceph_oloc_destroy(struct ceph_object_locator *oloc)
{
ceph_put_string(oloc->pool_ns);
}
EXPORT_SYMBOL(ceph_oloc_destroy);
void ceph_oid_copy(struct ceph_object_id *dest,
const struct ceph_object_id *src)
{
......@@ -1770,9 +1788,9 @@ int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
u64 *ono,
u64 *oxoff, u64 *oxlen)
{
u32 osize = le32_to_cpu(layout->fl_object_size);
u32 su = le32_to_cpu(layout->fl_stripe_unit);
u32 sc = le32_to_cpu(layout->fl_stripe_count);
u32 osize = layout->object_size;
u32 su = layout->stripe_unit;
u32 sc = layout->stripe_count;
u32 bl, stripeno, stripepos, objsetno;
u32 su_per_object;
u64 t, su_offset;
......@@ -1844,12 +1862,34 @@ int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
if (!pi)
return -ENOENT;
raw_pgid->pool = oloc->pool;
raw_pgid->seed = ceph_str_hash(pi->object_hash, oid->name,
oid->name_len);
dout("%s %s -> raw_pgid %llu.%x\n", __func__, oid->name,
raw_pgid->pool, raw_pgid->seed);
if (!oloc->pool_ns) {
raw_pgid->pool = oloc->pool;
raw_pgid->seed = ceph_str_hash(pi->object_hash, oid->name,
oid->name_len);
dout("%s %s -> raw_pgid %llu.%x\n", __func__, oid->name,
raw_pgid->pool, raw_pgid->seed);
} else {
char stack_buf[256];
char *buf = stack_buf;
int nsl = oloc->pool_ns->len;
size_t total = nsl + 1 + oid->name_len;
if (total > sizeof(stack_buf)) {
buf = kmalloc(total, GFP_NOIO);
if (!buf)
return -ENOMEM;
}
memcpy(buf, oloc->pool_ns->str, nsl);
buf[nsl] = '\037';
memcpy(buf + nsl + 1, oid->name, oid->name_len);
raw_pgid->pool = oloc->pool;
raw_pgid->seed = ceph_str_hash(pi->object_hash, buf, total);
if (buf != stack_buf)
kfree(buf);
dout("%s %s ns %.*s -> raw_pgid %llu.%x\n", __func__,
oid->name, nsl, oloc->pool_ns->str,
raw_pgid->pool, raw_pgid->seed);
}
return 0;
}
EXPORT_SYMBOL(ceph_object_locator_to_pg);
......
#include <linux/slab.h>
#include <linux/gfp.h>
#include <linux/string.h>
#include <linux/spinlock.h>
#include <linux/ceph/string_table.h>
static DEFINE_SPINLOCK(string_tree_lock);
static struct rb_root string_tree = RB_ROOT;
struct ceph_string *ceph_find_or_create_string(const char* str, size_t len)
{
struct ceph_string *cs, *exist;
struct rb_node **p, *parent;
int ret;
exist = NULL;
spin_lock(&string_tree_lock);
p = &string_tree.rb_node;
while (*p) {
exist = rb_entry(*p, struct ceph_string, node);
ret = ceph_compare_string(exist, str, len);
if (ret > 0)
p = &(*p)->rb_left;
else if (ret < 0)
p = &(*p)->rb_right;
else
break;
exist = NULL;
}
if (exist && !kref_get_unless_zero(&exist->kref)) {
rb_erase(&exist->node, &string_tree);
RB_CLEAR_NODE(&exist->node);
exist = NULL;
}
spin_unlock(&string_tree_lock);
if (exist)
return exist;
cs = kmalloc(sizeof(*cs) + len + 1, GFP_NOFS);
if (!cs)
return NULL;
kref_init(&cs->kref);
cs->len = len;
memcpy(cs->str, str, len);
cs->str[len] = 0;
retry:
exist = NULL;
parent = NULL;
p = &string_tree.rb_node;
spin_lock(&string_tree_lock);
while (*p) {
parent = *p;
exist = rb_entry(*p, struct ceph_string, node);
ret = ceph_compare_string(exist, str, len);
if (ret > 0)
p = &(*p)->rb_left;
else if (ret < 0)
p = &(*p)->rb_right;
else
break;
exist = NULL;
}
ret = 0;
if (!exist) {
rb_link_node(&cs->node, parent, p);
rb_insert_color(&cs->node, &string_tree);
} else if (!kref_get_unless_zero(&exist->kref)) {
rb_erase(&exist->node, &string_tree);
RB_CLEAR_NODE(&exist->node);
ret = -EAGAIN;
}
spin_unlock(&string_tree_lock);
if (ret == -EAGAIN)
goto retry;
if (exist) {
kfree(cs);
cs = exist;
}
return cs;
}
EXPORT_SYMBOL(ceph_find_or_create_string);
static void ceph_free_string(struct rcu_head *head)
{
struct ceph_string *cs = container_of(head, struct ceph_string, rcu);
kfree(cs);
}
void ceph_release_string(struct kref *ref)
{
struct ceph_string *cs = container_of(ref, struct ceph_string, kref);
spin_lock(&string_tree_lock);
if (!RB_EMPTY_NODE(&cs->node)) {
rb_erase(&cs->node, &string_tree);
RB_CLEAR_NODE(&cs->node);
}
spin_unlock(&string_tree_lock);
call_rcu(&cs->rcu, ceph_free_string);
}
EXPORT_SYMBOL(ceph_release_string);
bool ceph_strings_empty(void)
{
return RB_EMPTY_ROOT(&string_tree);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment