Commit 240cd6a8 authored by Linus Torvalds's avatar Linus Torvalds

Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

Pull Ceph updates from Sage Weil:
 "The biggest chunk is a series of patches from Ilya that add support
  for new Ceph osd and crush map features, including some new tunables,
  primary affinity, and the new encoding that is needed for erasure
  coding support.  This brings things into parity with the server side
  and the looming firefly release.  There is also support for allocation
  hints in RBD that help limit fragmentation on the server side.

  There is also a series of patches from Zheng fixing NFS reexport,
  directory fragmentation support, flock vs fnctl behavior, and some
  issues with clustered MDS.

  Finally, there are some miscellaneous fixes from Yunchuan Wen for
  fscache, Fabian Frederick for ACLs, and from me for fsync(dirfd)
  behavior"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (79 commits)
  ceph: skip invalid dentry during dcache readdir
  libceph: dump pool {read,write}_tier to debugfs
  libceph: output primary affinity values on osdmap updates
  ceph: flush cap release queue when trimming session caps
  ceph: don't grabs open file reference for aborted request
  ceph: drop extra open file reference in ceph_atomic_open()
  ceph: preallocate buffer for readdir reply
  libceph: enable PRIMARY_AFFINITY feature bit
  libceph: redo ceph_calc_pg_primary() in terms of ceph_calc_pg_acting()
  libceph: add support for osd primary affinity
  libceph: add support for primary_temp mappings
  libceph: return primary from ceph_calc_pg_acting()
  libceph: switch ceph_calc_pg_acting() to new helpers
  libceph: introduce apply_temps() helper
  libceph: introduce pg_to_raw_osds() and raw_to_up_osds() helpers
  libceph: ceph_can_shift_osds(pool) and pool type defines
  libceph: ceph_osd_{exists,is_up,is_down}(osd) definitions
  libceph: enable OSDMAP_ENC feature bit
  libceph: primary_affinity decode bits
  libceph: primary_affinity infrastructure
  ...
parents 30211125 a30be7cb
...@@ -1654,7 +1654,7 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req, ...@@ -1654,7 +1654,7 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
if (osd_req->r_result < 0) if (osd_req->r_result < 0)
obj_request->result = osd_req->r_result; obj_request->result = osd_req->r_result;
BUG_ON(osd_req->r_num_ops > 2); rbd_assert(osd_req->r_num_ops <= CEPH_OSD_MAX_OP);
/* /*
* We support a 64-bit length, but ultimately it has to be * We support a 64-bit length, but ultimately it has to be
...@@ -1662,11 +1662,15 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req, ...@@ -1662,11 +1662,15 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
*/ */
obj_request->xferred = osd_req->r_reply_op_len[0]; obj_request->xferred = osd_req->r_reply_op_len[0];
rbd_assert(obj_request->xferred < (u64)UINT_MAX); rbd_assert(obj_request->xferred < (u64)UINT_MAX);
opcode = osd_req->r_ops[0].op; opcode = osd_req->r_ops[0].op;
switch (opcode) { switch (opcode) {
case CEPH_OSD_OP_READ: case CEPH_OSD_OP_READ:
rbd_osd_read_callback(obj_request); rbd_osd_read_callback(obj_request);
break; break;
case CEPH_OSD_OP_SETALLOCHINT:
rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE);
/* fall through */
case CEPH_OSD_OP_WRITE: case CEPH_OSD_OP_WRITE:
rbd_osd_write_callback(obj_request); rbd_osd_write_callback(obj_request);
break; break;
...@@ -1715,9 +1719,16 @@ static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request) ...@@ -1715,9 +1719,16 @@ static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
snapc, CEPH_NOSNAP, &mtime); snapc, CEPH_NOSNAP, &mtime);
} }
/*
* Create an osd request. A read request has one osd op (read).
* A write request has either one (watch) or two (hint+write) osd ops.
* (All rbd data writes are prefixed with an allocation hint op, but
* technically osd watch is a write request, hence this distinction.)
*/
static struct ceph_osd_request *rbd_osd_req_create( static struct ceph_osd_request *rbd_osd_req_create(
struct rbd_device *rbd_dev, struct rbd_device *rbd_dev,
bool write_request, bool write_request,
unsigned int num_ops,
struct rbd_obj_request *obj_request) struct rbd_obj_request *obj_request)
{ {
struct ceph_snap_context *snapc = NULL; struct ceph_snap_context *snapc = NULL;
...@@ -1733,10 +1744,13 @@ static struct ceph_osd_request *rbd_osd_req_create( ...@@ -1733,10 +1744,13 @@ static struct ceph_osd_request *rbd_osd_req_create(
snapc = img_request->snapc; snapc = img_request->snapc;
} }
/* Allocate and initialize the request, for the single op */ rbd_assert(num_ops == 1 || (write_request && num_ops == 2));
/* Allocate and initialize the request, for the num_ops ops */
osdc = &rbd_dev->rbd_client->client->osdc; osdc = &rbd_dev->rbd_client->client->osdc;
osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC); osd_req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false,
GFP_ATOMIC);
if (!osd_req) if (!osd_req)
return NULL; /* ENOMEM */ return NULL; /* ENOMEM */
...@@ -1756,8 +1770,8 @@ static struct ceph_osd_request *rbd_osd_req_create( ...@@ -1756,8 +1770,8 @@ static struct ceph_osd_request *rbd_osd_req_create(
/* /*
* Create a copyup osd request based on the information in the * Create a copyup osd request based on the information in the
* object request supplied. A copyup request has two osd ops, * object request supplied. A copyup request has three osd ops,
* a copyup method call, and a "normal" write request. * a copyup method call, a hint op, and a write op.
*/ */
static struct ceph_osd_request * static struct ceph_osd_request *
rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request) rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
...@@ -1773,12 +1787,12 @@ rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request) ...@@ -1773,12 +1787,12 @@ rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
rbd_assert(img_request); rbd_assert(img_request);
rbd_assert(img_request_write_test(img_request)); rbd_assert(img_request_write_test(img_request));
/* Allocate and initialize the request, for the two ops */ /* Allocate and initialize the request, for the three ops */
snapc = img_request->snapc; snapc = img_request->snapc;
rbd_dev = img_request->rbd_dev; rbd_dev = img_request->rbd_dev;
osdc = &rbd_dev->rbd_client->client->osdc; osdc = &rbd_dev->rbd_client->client->osdc;
osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC); osd_req = ceph_osdc_alloc_request(osdc, snapc, 3, false, GFP_ATOMIC);
if (!osd_req) if (!osd_req)
return NULL; /* ENOMEM */ return NULL; /* ENOMEM */
...@@ -2178,6 +2192,7 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, ...@@ -2178,6 +2192,7 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
const char *object_name; const char *object_name;
u64 offset; u64 offset;
u64 length; u64 length;
unsigned int which = 0;
object_name = rbd_segment_name(rbd_dev, img_offset); object_name = rbd_segment_name(rbd_dev, img_offset);
if (!object_name) if (!object_name)
...@@ -2190,6 +2205,7 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, ...@@ -2190,6 +2205,7 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
rbd_segment_name_free(object_name); rbd_segment_name_free(object_name);
if (!obj_request) if (!obj_request)
goto out_unwind; goto out_unwind;
/* /*
* set obj_request->img_request before creating the * set obj_request->img_request before creating the
* osd_request so that it gets the right snapc * osd_request so that it gets the right snapc
...@@ -2207,7 +2223,7 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, ...@@ -2207,7 +2223,7 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
clone_size, clone_size,
GFP_ATOMIC); GFP_ATOMIC);
if (!obj_request->bio_list) if (!obj_request->bio_list)
goto out_partial; goto out_unwind;
} else { } else {
unsigned int page_count; unsigned int page_count;
...@@ -2220,19 +2236,27 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, ...@@ -2220,19 +2236,27 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
} }
osd_req = rbd_osd_req_create(rbd_dev, write_request, osd_req = rbd_osd_req_create(rbd_dev, write_request,
(write_request ? 2 : 1),
obj_request); obj_request);
if (!osd_req) if (!osd_req)
goto out_partial; goto out_unwind;
obj_request->osd_req = osd_req; obj_request->osd_req = osd_req;
obj_request->callback = rbd_img_obj_callback; obj_request->callback = rbd_img_obj_callback;
osd_req_op_extent_init(osd_req, 0, opcode, offset, length, if (write_request) {
osd_req_op_alloc_hint_init(osd_req, which,
rbd_obj_bytes(&rbd_dev->header),
rbd_obj_bytes(&rbd_dev->header));
which++;
}
osd_req_op_extent_init(osd_req, which, opcode, offset, length,
0, 0); 0, 0);
if (type == OBJ_REQUEST_BIO) if (type == OBJ_REQUEST_BIO)
osd_req_op_extent_osd_data_bio(osd_req, 0, osd_req_op_extent_osd_data_bio(osd_req, which,
obj_request->bio_list, length); obj_request->bio_list, length);
else else
osd_req_op_extent_osd_data_pages(osd_req, 0, osd_req_op_extent_osd_data_pages(osd_req, which,
obj_request->pages, length, obj_request->pages, length,
offset & ~PAGE_MASK, false, false); offset & ~PAGE_MASK, false, false);
...@@ -2249,11 +2273,9 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, ...@@ -2249,11 +2273,9 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
return 0; return 0;
out_partial:
rbd_obj_request_put(obj_request);
out_unwind: out_unwind:
for_each_obj_request_safe(img_request, obj_request, next_obj_request) for_each_obj_request_safe(img_request, obj_request, next_obj_request)
rbd_obj_request_put(obj_request); rbd_img_obj_request_del(img_request, obj_request);
return -ENOMEM; return -ENOMEM;
} }
...@@ -2353,7 +2375,7 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) ...@@ -2353,7 +2375,7 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
/* /*
* The original osd request is of no use to use any more. * The original osd request is of no use to use any more.
* We need a new one that can hold the two ops in a copyup * We need a new one that can hold the three ops in a copyup
* request. Allocate the new copyup osd request for the * request. Allocate the new copyup osd request for the
* original request, and release the old one. * original request, and release the old one.
*/ */
...@@ -2372,17 +2394,22 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) ...@@ -2372,17 +2394,22 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0, osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
false, false); false, false);
/* Then the original write request op */ /* Then the hint op */
osd_req_op_alloc_hint_init(osd_req, 1, rbd_obj_bytes(&rbd_dev->header),
rbd_obj_bytes(&rbd_dev->header));
/* And the original write request op */
offset = orig_request->offset; offset = orig_request->offset;
length = orig_request->length; length = orig_request->length;
osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE, osd_req_op_extent_init(osd_req, 2, CEPH_OSD_OP_WRITE,
offset, length, 0, 0); offset, length, 0, 0);
if (orig_request->type == OBJ_REQUEST_BIO) if (orig_request->type == OBJ_REQUEST_BIO)
osd_req_op_extent_osd_data_bio(osd_req, 1, osd_req_op_extent_osd_data_bio(osd_req, 2,
orig_request->bio_list, length); orig_request->bio_list, length);
else else
osd_req_op_extent_osd_data_pages(osd_req, 1, osd_req_op_extent_osd_data_pages(osd_req, 2,
orig_request->pages, length, orig_request->pages, length,
offset & ~PAGE_MASK, false, false); offset & ~PAGE_MASK, false, false);
...@@ -2603,7 +2630,7 @@ static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) ...@@ -2603,7 +2630,7 @@ static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
rbd_assert(obj_request->img_request); rbd_assert(obj_request->img_request);
rbd_dev = obj_request->img_request->rbd_dev; rbd_dev = obj_request->img_request->rbd_dev;
stat_request->osd_req = rbd_osd_req_create(rbd_dev, false, stat_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1,
stat_request); stat_request);
if (!stat_request->osd_req) if (!stat_request->osd_req)
goto out; goto out;
...@@ -2807,7 +2834,8 @@ static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id) ...@@ -2807,7 +2834,8 @@ static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id)
return -ENOMEM; return -ENOMEM;
ret = -ENOMEM; ret = -ENOMEM;
obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request); obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1,
obj_request);
if (!obj_request->osd_req) if (!obj_request->osd_req)
goto out; goto out;
...@@ -2870,7 +2898,8 @@ static int __rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start) ...@@ -2870,7 +2898,8 @@ static int __rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
if (!obj_request) if (!obj_request)
goto out_cancel; goto out_cancel;
obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request); obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1,
obj_request);
if (!obj_request->osd_req) if (!obj_request->osd_req)
goto out_cancel; goto out_cancel;
...@@ -2978,7 +3007,8 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, ...@@ -2978,7 +3007,8 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
obj_request->pages = pages; obj_request->pages = pages;
obj_request->page_count = page_count; obj_request->page_count = page_count;
obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request); obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1,
obj_request);
if (!obj_request->osd_req) if (!obj_request->osd_req)
goto out; goto out;
...@@ -3211,7 +3241,8 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev, ...@@ -3211,7 +3241,8 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
obj_request->pages = pages; obj_request->pages = pages;
obj_request->page_count = page_count; obj_request->page_count = page_count;
obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request); obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1,
obj_request);
if (!obj_request->osd_req) if (!obj_request->osd_req)
goto out; goto out;
......
...@@ -205,6 +205,7 @@ void ceph_fscache_register_inode_cookie(struct ceph_fs_client* fsc, ...@@ -205,6 +205,7 @@ void ceph_fscache_register_inode_cookie(struct ceph_fs_client* fsc,
ci->fscache = fscache_acquire_cookie(fsc->fscache, ci->fscache = fscache_acquire_cookie(fsc->fscache,
&ceph_fscache_inode_object_def, &ceph_fscache_inode_object_def,
ci, true); ci, true);
fscache_check_consistency(ci->fscache);
done: done:
mutex_unlock(&inode->i_mutex); mutex_unlock(&inode->i_mutex);
......
...@@ -48,6 +48,12 @@ void ceph_readpage_to_fscache(struct inode *inode, struct page *page); ...@@ -48,6 +48,12 @@ void ceph_readpage_to_fscache(struct inode *inode, struct page *page);
void ceph_invalidate_fscache_page(struct inode* inode, struct page *page); void ceph_invalidate_fscache_page(struct inode* inode, struct page *page);
void ceph_queue_revalidate(struct inode *inode); void ceph_queue_revalidate(struct inode *inode);
static inline void ceph_fscache_update_objectsize(struct inode *inode)
{
struct ceph_inode_info *ci = ceph_inode(inode);
fscache_attr_changed(ci->fscache);
}
static inline void ceph_fscache_invalidate(struct inode *inode) static inline void ceph_fscache_invalidate(struct inode *inode)
{ {
fscache_invalidate(ceph_inode(inode)->fscache); fscache_invalidate(ceph_inode(inode)->fscache);
...@@ -135,6 +141,10 @@ static inline void ceph_readpage_to_fscache(struct inode *inode, ...@@ -135,6 +141,10 @@ static inline void ceph_readpage_to_fscache(struct inode *inode,
{ {
} }
static inline void ceph_fscache_update_objectsize(struct inode *inode)
{
}
static inline void ceph_fscache_invalidate(struct inode *inode) static inline void ceph_fscache_invalidate(struct inode *inode)
{ {
} }
......
...@@ -622,8 +622,10 @@ int ceph_add_cap(struct inode *inode, ...@@ -622,8 +622,10 @@ int ceph_add_cap(struct inode *inode,
if (flags & CEPH_CAP_FLAG_AUTH) { if (flags & CEPH_CAP_FLAG_AUTH) {
if (ci->i_auth_cap == NULL || if (ci->i_auth_cap == NULL ||
ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) {
ci->i_auth_cap = cap; ci->i_auth_cap = cap;
cap->mds_wanted = wanted;
}
ci->i_cap_exporting_issued = 0; ci->i_cap_exporting_issued = 0;
} else { } else {
WARN_ON(ci->i_auth_cap == cap); WARN_ON(ci->i_auth_cap == cap);
...@@ -885,7 +887,10 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci) ...@@ -885,7 +887,10 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci)
cap = rb_entry(p, struct ceph_cap, ci_node); cap = rb_entry(p, struct ceph_cap, ci_node);
if (!__cap_is_valid(cap)) if (!__cap_is_valid(cap))
continue; continue;
if (cap == ci->i_auth_cap)
mds_wanted |= cap->mds_wanted; mds_wanted |= cap->mds_wanted;
else
mds_wanted |= (cap->mds_wanted & ~CEPH_CAP_ANY_FILE_WR);
} }
return mds_wanted; return mds_wanted;
} }
......
...@@ -93,6 +93,8 @@ static int mdsc_show(struct seq_file *s, void *p) ...@@ -93,6 +93,8 @@ static int mdsc_show(struct seq_file *s, void *p)
} else if (req->r_path1) { } else if (req->r_path1) {
seq_printf(s, " #%llx/%s", req->r_ino1.ino, seq_printf(s, " #%llx/%s", req->r_ino1.ino,
req->r_path1); req->r_path1);
} else {
seq_printf(s, " #%llx", req->r_ino1.ino);
} }
if (req->r_old_dentry) { if (req->r_old_dentry) {
...@@ -102,7 +104,8 @@ static int mdsc_show(struct seq_file *s, void *p) ...@@ -102,7 +104,8 @@ static int mdsc_show(struct seq_file *s, void *p)
path = NULL; path = NULL;
spin_lock(&req->r_old_dentry->d_lock); spin_lock(&req->r_old_dentry->d_lock);
seq_printf(s, " #%llx/%.*s (%s)", seq_printf(s, " #%llx/%.*s (%s)",
ceph_ino(req->r_old_dentry_dir), req->r_old_dentry_dir ?
ceph_ino(req->r_old_dentry_dir) : 0,
req->r_old_dentry->d_name.len, req->r_old_dentry->d_name.len,
req->r_old_dentry->d_name.name, req->r_old_dentry->d_name.name,
path ? path : ""); path ? path : "");
......
...@@ -119,7 +119,8 @@ static int fpos_cmp(loff_t l, loff_t r) ...@@ -119,7 +119,8 @@ static int fpos_cmp(loff_t l, loff_t r)
* defined IFF we hold CEPH_CAP_FILE_SHARED (which will be revoked by * defined IFF we hold CEPH_CAP_FILE_SHARED (which will be revoked by
* the MDS if/when the directory is modified). * the MDS if/when the directory is modified).
*/ */
static int __dcache_readdir(struct file *file, struct dir_context *ctx) static int __dcache_readdir(struct file *file, struct dir_context *ctx,
u32 shared_gen)
{ {
struct ceph_file_info *fi = file->private_data; struct ceph_file_info *fi = file->private_data;
struct dentry *parent = file->f_dentry; struct dentry *parent = file->f_dentry;
...@@ -133,8 +134,8 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx) ...@@ -133,8 +134,8 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx)
last = fi->dentry; last = fi->dentry;
fi->dentry = NULL; fi->dentry = NULL;
dout("__dcache_readdir %p at %llu (last %p)\n", dir, ctx->pos, dout("__dcache_readdir %p v%u at %llu (last %p)\n",
last); dir, shared_gen, ctx->pos, last);
spin_lock(&parent->d_lock); spin_lock(&parent->d_lock);
...@@ -161,7 +162,8 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx) ...@@ -161,7 +162,8 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx)
goto out_unlock; goto out_unlock;
} }
spin_lock_nested(&dentry->d_lock, DENTRY_D_LOCK_NESTED); spin_lock_nested(&dentry->d_lock, DENTRY_D_LOCK_NESTED);
if (!d_unhashed(dentry) && dentry->d_inode && if (di->lease_shared_gen == shared_gen &&
!d_unhashed(dentry) && dentry->d_inode &&
ceph_snap(dentry->d_inode) != CEPH_SNAPDIR && ceph_snap(dentry->d_inode) != CEPH_SNAPDIR &&
ceph_ino(dentry->d_inode) != CEPH_INO_CEPH && ceph_ino(dentry->d_inode) != CEPH_INO_CEPH &&
fpos_cmp(ctx->pos, di->offset) <= 0) fpos_cmp(ctx->pos, di->offset) <= 0)
...@@ -190,7 +192,7 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx) ...@@ -190,7 +192,7 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx)
if (last) { if (last) {
/* remember our position */ /* remember our position */
fi->dentry = last; fi->dentry = last;
fi->next_offset = di->offset; fi->next_offset = fpos_off(di->offset);
} }
dput(dentry); dput(dentry);
return 0; return 0;
...@@ -252,8 +254,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) ...@@ -252,8 +254,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
int err; int err;
u32 ftype; u32 ftype;
struct ceph_mds_reply_info_parsed *rinfo; struct ceph_mds_reply_info_parsed *rinfo;
const int max_entries = fsc->mount_options->max_readdir;
const int max_bytes = fsc->mount_options->max_readdir_bytes;
dout("readdir %p file %p frag %u off %u\n", inode, file, frag, off); dout("readdir %p file %p frag %u off %u\n", inode, file, frag, off);
if (fi->flags & CEPH_F_ATEND) if (fi->flags & CEPH_F_ATEND)
...@@ -291,8 +291,9 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) ...@@ -291,8 +291,9 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
ceph_snap(inode) != CEPH_SNAPDIR && ceph_snap(inode) != CEPH_SNAPDIR &&
__ceph_dir_is_complete(ci) && __ceph_dir_is_complete(ci) &&
__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) { __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) {
u32 shared_gen = ci->i_shared_gen;
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
err = __dcache_readdir(file, ctx); err = __dcache_readdir(file, ctx, shared_gen);
if (err != -EAGAIN) if (err != -EAGAIN)
return err; return err;
} else { } else {
...@@ -322,14 +323,16 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) ...@@ -322,14 +323,16 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
fi->last_readdir = NULL; fi->last_readdir = NULL;
} }
/* requery frag tree, as the frag topology may have changed */
frag = ceph_choose_frag(ceph_inode(inode), frag, NULL, NULL);
dout("readdir fetching %llx.%llx frag %x offset '%s'\n", dout("readdir fetching %llx.%llx frag %x offset '%s'\n",
ceph_vinop(inode), frag, fi->last_name); ceph_vinop(inode), frag, fi->last_name);
req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
if (IS_ERR(req)) if (IS_ERR(req))
return PTR_ERR(req); return PTR_ERR(req);
err = ceph_alloc_readdir_reply_buffer(req, inode);
if (err) {
ceph_mdsc_put_request(req);
return err;
}
req->r_inode = inode; req->r_inode = inode;
ihold(inode); ihold(inode);
req->r_dentry = dget(file->f_dentry); req->r_dentry = dget(file->f_dentry);
...@@ -340,9 +343,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) ...@@ -340,9 +343,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
req->r_path2 = kstrdup(fi->last_name, GFP_NOFS); req->r_path2 = kstrdup(fi->last_name, GFP_NOFS);
req->r_readdir_offset = fi->next_offset; req->r_readdir_offset = fi->next_offset;
req->r_args.readdir.frag = cpu_to_le32(frag); req->r_args.readdir.frag = cpu_to_le32(frag);
req->r_args.readdir.max_entries = cpu_to_le32(max_entries);
req->r_args.readdir.max_bytes = cpu_to_le32(max_bytes);
req->r_num_caps = max_entries + 1;
err = ceph_mdsc_do_request(mdsc, NULL, req); err = ceph_mdsc_do_request(mdsc, NULL, req);
if (err < 0) { if (err < 0) {
ceph_mdsc_put_request(req); ceph_mdsc_put_request(req);
...@@ -369,9 +369,9 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) ...@@ -369,9 +369,9 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
fi->next_offset = 0; fi->next_offset = 0;
off = fi->next_offset; off = fi->next_offset;
} }
fi->frag = frag;
fi->offset = fi->next_offset; fi->offset = fi->next_offset;
fi->last_readdir = req; fi->last_readdir = req;
fi->frag = frag;
if (req->r_reply_info.dir_end) { if (req->r_reply_info.dir_end) {
kfree(fi->last_name); kfree(fi->last_name);
...@@ -454,7 +454,7 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) ...@@ -454,7 +454,7 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
return 0; return 0;
} }
static void reset_readdir(struct ceph_file_info *fi) static void reset_readdir(struct ceph_file_info *fi, unsigned frag)
{ {
if (fi->last_readdir) { if (fi->last_readdir) {
ceph_mdsc_put_request(fi->last_readdir); ceph_mdsc_put_request(fi->last_readdir);
...@@ -462,7 +462,10 @@ static void reset_readdir(struct ceph_file_info *fi) ...@@ -462,7 +462,10 @@ static void reset_readdir(struct ceph_file_info *fi)
} }
kfree(fi->last_name); kfree(fi->last_name);
fi->last_name = NULL; fi->last_name = NULL;
if (ceph_frag_is_leftmost(frag))
fi->next_offset = 2; /* compensate for . and .. */ fi->next_offset = 2; /* compensate for . and .. */
else
fi->next_offset = 0;
if (fi->dentry) { if (fi->dentry) {
dput(fi->dentry); dput(fi->dentry);
fi->dentry = NULL; fi->dentry = NULL;
...@@ -474,7 +477,7 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence) ...@@ -474,7 +477,7 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
{ {
struct ceph_file_info *fi = file->private_data; struct ceph_file_info *fi = file->private_data;
struct inode *inode = file->f_mapping->host; struct inode *inode = file->f_mapping->host;
loff_t old_offset = offset; loff_t old_offset = ceph_make_fpos(fi->frag, fi->next_offset);
loff_t retval; loff_t retval;
mutex_lock(&inode->i_mutex); mutex_lock(&inode->i_mutex);
...@@ -491,7 +494,7 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence) ...@@ -491,7 +494,7 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
goto out; goto out;
} }
if (offset >= 0 && offset <= inode->i_sb->s_maxbytes) { if (offset >= 0) {
if (offset != file->f_pos) { if (offset != file->f_pos) {
file->f_pos = offset; file->f_pos = offset;
file->f_version = 0; file->f_version = 0;
...@@ -504,14 +507,14 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence) ...@@ -504,14 +507,14 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
* seek to new frag, or seek prior to current chunk. * seek to new frag, or seek prior to current chunk.
*/ */
if (offset == 0 || if (offset == 0 ||
fpos_frag(offset) != fpos_frag(old_offset) || fpos_frag(offset) != fi->frag ||
fpos_off(offset) < fi->offset) { fpos_off(offset) < fi->offset) {
dout("dir_llseek dropping %p content\n", file); dout("dir_llseek dropping %p content\n", file);
reset_readdir(fi); reset_readdir(fi, fpos_frag(offset));
} }
/* bump dir_release_count if we did a forward seek */ /* bump dir_release_count if we did a forward seek */
if (offset > old_offset) if (fpos_cmp(offset, old_offset) > 0)
fi->dir_release_count--; fi->dir_release_count--;
} }
out: out:
...@@ -812,8 +815,7 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir, ...@@ -812,8 +815,7 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
} }
req->r_dentry = dget(dentry); req->r_dentry = dget(dentry);
req->r_num_caps = 2; req->r_num_caps = 2;
req->r_old_dentry = dget(old_dentry); /* or inode? hrm. */ req->r_old_dentry = dget(old_dentry);
req->r_old_dentry_dir = ceph_get_dentry_parent_inode(old_dentry);
req->r_locked_dir = dir; req->r_locked_dir = dir;
req->r_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
req->r_dentry_unless = CEPH_CAP_FILE_EXCL; req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
...@@ -911,10 +913,11 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry, ...@@ -911,10 +913,11 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_RENAME, USE_AUTH_MDS); req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_RENAME, USE_AUTH_MDS);
if (IS_ERR(req)) if (IS_ERR(req))
return PTR_ERR(req); return PTR_ERR(req);
ihold(old_dir);
req->r_dentry = dget(new_dentry); req->r_dentry = dget(new_dentry);
req->r_num_caps = 2; req->r_num_caps = 2;
req->r_old_dentry = dget(old_dentry); req->r_old_dentry = dget(old_dentry);
req->r_old_dentry_dir = ceph_get_dentry_parent_inode(old_dentry); req->r_old_dentry_dir = old_dir;
req->r_locked_dir = new_dir; req->r_locked_dir = new_dir;
req->r_old_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_old_dentry_drop = CEPH_CAP_FILE_SHARED;
req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL; req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL;
......
This diff is collapsed.
...@@ -210,7 +210,7 @@ int ceph_open(struct inode *inode, struct file *file) ...@@ -210,7 +210,7 @@ int ceph_open(struct inode *inode, struct file *file)
ihold(inode); ihold(inode);
req->r_num_caps = 1; req->r_num_caps = 1;
if (flags & (O_CREAT|O_TRUNC)) if (flags & O_CREAT)
parent_inode = ceph_get_dentry_parent_inode(file->f_dentry); parent_inode = ceph_get_dentry_parent_inode(file->f_dentry);
err = ceph_mdsc_do_request(mdsc, parent_inode, req); err = ceph_mdsc_do_request(mdsc, parent_inode, req);
iput(parent_inode); iput(parent_inode);
...@@ -291,8 +291,9 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, ...@@ -291,8 +291,9 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
} }
err = finish_open(file, dentry, ceph_open, opened); err = finish_open(file, dentry, ceph_open, opened);
} }
out_err: out_err:
if (!req->r_err && req->r_target_inode)
ceph_put_fmode(ceph_inode(req->r_target_inode), req->r_fmode);
ceph_mdsc_put_request(req); ceph_mdsc_put_request(req);
dout("atomic_open result=%d\n", err); dout("atomic_open result=%d\n", err);
return err; return err;
...@@ -970,6 +971,7 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov, ...@@ -970,6 +971,7 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov,
goto retry_snap; goto retry_snap;
} }
} else { } else {
loff_t old_size = inode->i_size;
/* /*
* No need to acquire the i_truncate_mutex. Because * No need to acquire the i_truncate_mutex. Because
* the MDS revokes Fwb caps before sending truncate * the MDS revokes Fwb caps before sending truncate
...@@ -980,6 +982,8 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov, ...@@ -980,6 +982,8 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov,
written = generic_file_buffered_write(iocb, iov, nr_segs, written = generic_file_buffered_write(iocb, iov, nr_segs,
pos, &iocb->ki_pos, pos, &iocb->ki_pos,
count, 0); count, 0);
if (inode->i_size > old_size)
ceph_fscache_update_objectsize(inode);
mutex_unlock(&inode->i_mutex); mutex_unlock(&inode->i_mutex);
} }
......
...@@ -659,14 +659,6 @@ static int fill_inode(struct inode *inode, ...@@ -659,14 +659,6 @@ static int fill_inode(struct inode *inode,
le32_to_cpu(info->time_warp_seq), le32_to_cpu(info->time_warp_seq),
&ctime, &mtime, &atime); &ctime, &mtime, &atime);
/* only update max_size on auth cap */
if ((info->cap.flags & CEPH_CAP_FLAG_AUTH) &&
ci->i_max_size != le64_to_cpu(info->max_size)) {
dout("max_size %lld -> %llu\n", ci->i_max_size,
le64_to_cpu(info->max_size));
ci->i_max_size = le64_to_cpu(info->max_size);
}
ci->i_layout = info->layout; ci->i_layout = info->layout;
inode->i_blkbits = fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1; inode->i_blkbits = fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1;
...@@ -755,6 +747,14 @@ static int fill_inode(struct inode *inode, ...@@ -755,6 +747,14 @@ static int fill_inode(struct inode *inode,
ci->i_max_offset = 2; ci->i_max_offset = 2;
} }
no_change: no_change:
/* only update max_size on auth cap */
if ((info->cap.flags & CEPH_CAP_FLAG_AUTH) &&
ci->i_max_size != le64_to_cpu(info->max_size)) {
dout("max_size %lld -> %llu\n", ci->i_max_size,
le64_to_cpu(info->max_size));
ci->i_max_size = le64_to_cpu(info->max_size);
}
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
/* queue truncate if we saw i_size decrease */ /* queue truncate if we saw i_size decrease */
...@@ -1044,10 +1044,59 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, ...@@ -1044,10 +1044,59 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
session, req->r_request_started, -1, session, req->r_request_started, -1,
&req->r_caps_reservation); &req->r_caps_reservation);
if (err < 0) if (err < 0)
return err; goto done;
} else { } else {
WARN_ON_ONCE(1); WARN_ON_ONCE(1);
} }
if (dir && req->r_op == CEPH_MDS_OP_LOOKUPNAME) {
struct qstr dname;
struct dentry *dn, *parent;
BUG_ON(!rinfo->head->is_target);
BUG_ON(req->r_dentry);
parent = d_find_any_alias(dir);
BUG_ON(!parent);
dname.name = rinfo->dname;
dname.len = rinfo->dname_len;
dname.hash = full_name_hash(dname.name, dname.len);
vino.ino = le64_to_cpu(rinfo->targeti.in->ino);
vino.snap = le64_to_cpu(rinfo->targeti.in->snapid);
retry_lookup:
dn = d_lookup(parent, &dname);
dout("d_lookup on parent=%p name=%.*s got %p\n",
parent, dname.len, dname.name, dn);
if (!dn) {
dn = d_alloc(parent, &dname);
dout("d_alloc %p '%.*s' = %p\n", parent,
dname.len, dname.name, dn);
if (dn == NULL) {
dput(parent);
err = -ENOMEM;
goto done;
}
err = ceph_init_dentry(dn);
if (err < 0) {
dput(dn);
dput(parent);
goto done;
}
} else if (dn->d_inode &&
(ceph_ino(dn->d_inode) != vino.ino ||
ceph_snap(dn->d_inode) != vino.snap)) {
dout(" dn %p points to wrong inode %p\n",
dn, dn->d_inode);
d_delete(dn);
dput(dn);
goto retry_lookup;
}
req->r_dentry = dn;
dput(parent);
}
} }
if (rinfo->head->is_target) { if (rinfo->head->is_target) {
...@@ -1063,7 +1112,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, ...@@ -1063,7 +1112,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
err = fill_inode(in, &rinfo->targeti, NULL, err = fill_inode(in, &rinfo->targeti, NULL,
session, req->r_request_started, session, req->r_request_started,
(le32_to_cpu(rinfo->head->result) == 0) ? (!req->r_aborted && rinfo->head->result == 0) ?
req->r_fmode : -1, req->r_fmode : -1,
&req->r_caps_reservation); &req->r_caps_reservation);
if (err < 0) { if (err < 0) {
...@@ -1616,8 +1665,6 @@ static const struct inode_operations ceph_symlink_iops = { ...@@ -1616,8 +1665,6 @@ static const struct inode_operations ceph_symlink_iops = {
.getxattr = ceph_getxattr, .getxattr = ceph_getxattr,
.listxattr = ceph_listxattr, .listxattr = ceph_listxattr,
.removexattr = ceph_removexattr, .removexattr = ceph_removexattr,
.get_acl = ceph_get_acl,
.set_acl = ceph_set_acl,
}; };
/* /*
...@@ -1627,7 +1674,6 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) ...@@ -1627,7 +1674,6 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
{ {
struct inode *inode = dentry->d_inode; struct inode *inode = dentry->d_inode;
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
struct inode *parent_inode;
const unsigned int ia_valid = attr->ia_valid; const unsigned int ia_valid = attr->ia_valid;
struct ceph_mds_request *req; struct ceph_mds_request *req;
struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc; struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc;
...@@ -1819,9 +1865,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) ...@@ -1819,9 +1865,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
req->r_inode_drop = release; req->r_inode_drop = release;
req->r_args.setattr.mask = cpu_to_le32(mask); req->r_args.setattr.mask = cpu_to_le32(mask);
req->r_num_caps = 1; req->r_num_caps = 1;
parent_inode = ceph_get_dentry_parent_inode(dentry); err = ceph_mdsc_do_request(mdsc, NULL, req);
err = ceph_mdsc_do_request(mdsc, parent_inode, req);
iput(parent_inode);
} }
dout("setattr %p result=%d (%s locally, %d remote)\n", inode, err, dout("setattr %p result=%d (%s locally, %d remote)\n", inode, err,
ceph_cap_string(dirtied), mask); ceph_cap_string(dirtied), mask);
......
...@@ -64,7 +64,6 @@ static long __validate_layout(struct ceph_mds_client *mdsc, ...@@ -64,7 +64,6 @@ static long __validate_layout(struct ceph_mds_client *mdsc,
static long ceph_ioctl_set_layout(struct file *file, void __user *arg) static long ceph_ioctl_set_layout(struct file *file, void __user *arg)
{ {
struct inode *inode = file_inode(file); struct inode *inode = file_inode(file);
struct inode *parent_inode;
struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
struct ceph_mds_request *req; struct ceph_mds_request *req;
struct ceph_ioctl_layout l; struct ceph_ioctl_layout l;
...@@ -121,9 +120,7 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg) ...@@ -121,9 +120,7 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg)
cpu_to_le32(l.object_size); cpu_to_le32(l.object_size);
req->r_args.setlayout.layout.fl_pg_pool = cpu_to_le32(l.data_pool); req->r_args.setlayout.layout.fl_pg_pool = cpu_to_le32(l.data_pool);
parent_inode = ceph_get_dentry_parent_inode(file->f_dentry); err = ceph_mdsc_do_request(mdsc, NULL, req);
err = ceph_mdsc_do_request(mdsc, parent_inode, req);
iput(parent_inode);
ceph_mdsc_put_request(req); ceph_mdsc_put_request(req);
return err; return err;
} }
......
...@@ -2,11 +2,31 @@ ...@@ -2,11 +2,31 @@
#include <linux/file.h> #include <linux/file.h>
#include <linux/namei.h> #include <linux/namei.h>
#include <linux/random.h>
#include "super.h" #include "super.h"
#include "mds_client.h" #include "mds_client.h"
#include <linux/ceph/pagelist.h> #include <linux/ceph/pagelist.h>
static u64 lock_secret;
static inline u64 secure_addr(void *addr)
{
u64 v = lock_secret ^ (u64)(unsigned long)addr;
/*
* Set the most significant bit, so that MDS knows the 'owner'
* is sufficient to identify the owner of lock. (old code uses
* both 'owner' and 'pid')
*/
v |= (1ULL << 63);
return v;
}
void __init ceph_flock_init(void)
{
get_random_bytes(&lock_secret, sizeof(lock_secret));
}
/** /**
* Implement fcntl and flock locking functions. * Implement fcntl and flock locking functions.
*/ */
...@@ -14,11 +34,11 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file, ...@@ -14,11 +34,11 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file,
int cmd, u8 wait, struct file_lock *fl) int cmd, u8 wait, struct file_lock *fl)
{ {
struct inode *inode = file_inode(file); struct inode *inode = file_inode(file);
struct ceph_mds_client *mdsc = struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
ceph_sb_to_client(inode->i_sb)->mdsc;
struct ceph_mds_request *req; struct ceph_mds_request *req;
int err; int err;
u64 length = 0; u64 length = 0;
u64 owner;
req = ceph_mdsc_create_request(mdsc, operation, USE_AUTH_MDS); req = ceph_mdsc_create_request(mdsc, operation, USE_AUTH_MDS);
if (IS_ERR(req)) if (IS_ERR(req))
...@@ -32,25 +52,27 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file, ...@@ -32,25 +52,27 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file,
else else
length = fl->fl_end - fl->fl_start + 1; length = fl->fl_end - fl->fl_start + 1;
dout("ceph_lock_message: rule: %d, op: %d, pid: %llu, start: %llu, " if (lock_type == CEPH_LOCK_FCNTL)
"length: %llu, wait: %d, type: %d", (int)lock_type, owner = secure_addr(fl->fl_owner);
(int)operation, (u64)fl->fl_pid, fl->fl_start, else
length, wait, fl->fl_type); owner = secure_addr(fl->fl_file);
dout("ceph_lock_message: rule: %d, op: %d, owner: %llx, pid: %llu, "
"start: %llu, length: %llu, wait: %d, type: %d", (int)lock_type,
(int)operation, owner, (u64)fl->fl_pid, fl->fl_start, length,
wait, fl->fl_type);
req->r_args.filelock_change.rule = lock_type; req->r_args.filelock_change.rule = lock_type;
req->r_args.filelock_change.type = cmd; req->r_args.filelock_change.type = cmd;
req->r_args.filelock_change.owner = cpu_to_le64(owner);
req->r_args.filelock_change.pid = cpu_to_le64((u64)fl->fl_pid); req->r_args.filelock_change.pid = cpu_to_le64((u64)fl->fl_pid);
/* This should be adjusted, but I'm not sure if
namespaces actually get id numbers*/
req->r_args.filelock_change.pid_namespace =
cpu_to_le64((u64)(unsigned long)fl->fl_nspid);
req->r_args.filelock_change.start = cpu_to_le64(fl->fl_start); req->r_args.filelock_change.start = cpu_to_le64(fl->fl_start);
req->r_args.filelock_change.length = cpu_to_le64(length); req->r_args.filelock_change.length = cpu_to_le64(length);
req->r_args.filelock_change.wait = wait; req->r_args.filelock_change.wait = wait;
err = ceph_mdsc_do_request(mdsc, inode, req); err = ceph_mdsc_do_request(mdsc, inode, req);
if ( operation == CEPH_MDS_OP_GETFILELOCK){ if (operation == CEPH_MDS_OP_GETFILELOCK) {
fl->fl_pid = le64_to_cpu(req->r_reply_info.filelock_reply->pid); fl->fl_pid = le64_to_cpu(req->r_reply_info.filelock_reply->pid);
if (CEPH_LOCK_SHARED == req->r_reply_info.filelock_reply->type) if (CEPH_LOCK_SHARED == req->r_reply_info.filelock_reply->type)
fl->fl_type = F_RDLCK; fl->fl_type = F_RDLCK;
...@@ -87,14 +109,19 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl) ...@@ -87,14 +109,19 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl)
u8 wait = 0; u8 wait = 0;
u16 op = CEPH_MDS_OP_SETFILELOCK; u16 op = CEPH_MDS_OP_SETFILELOCK;
fl->fl_nspid = get_pid(task_tgid(current)); if (!(fl->fl_flags & FL_POSIX))
dout("ceph_lock, fl_pid:%d", fl->fl_pid); return -ENOLCK;
/* No mandatory locks */
if (__mandatory_lock(file->f_mapping->host) && fl->fl_type != F_UNLCK)
return -ENOLCK;
dout("ceph_lock, fl_owner: %p", fl->fl_owner);
/* set wait bit as appropriate, then make command as Ceph expects it*/ /* set wait bit as appropriate, then make command as Ceph expects it*/
if (F_SETLKW == cmd) if (IS_GETLK(cmd))
wait = 1;
if (F_GETLK == cmd)
op = CEPH_MDS_OP_GETFILELOCK; op = CEPH_MDS_OP_GETFILELOCK;
else if (IS_SETLKW(cmd))
wait = 1;
if (F_RDLCK == fl->fl_type) if (F_RDLCK == fl->fl_type)
lock_cmd = CEPH_LOCK_SHARED; lock_cmd = CEPH_LOCK_SHARED;
...@@ -105,7 +132,7 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl) ...@@ -105,7 +132,7 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl)
err = ceph_lock_message(CEPH_LOCK_FCNTL, op, file, lock_cmd, wait, fl); err = ceph_lock_message(CEPH_LOCK_FCNTL, op, file, lock_cmd, wait, fl);
if (!err) { if (!err) {
if ( op != CEPH_MDS_OP_GETFILELOCK ){ if (op != CEPH_MDS_OP_GETFILELOCK) {
dout("mds locked, locking locally"); dout("mds locked, locking locally");
err = posix_lock_file(file, fl, NULL); err = posix_lock_file(file, fl, NULL);
if (err && (CEPH_MDS_OP_SETFILELOCK == op)) { if (err && (CEPH_MDS_OP_SETFILELOCK == op)) {
...@@ -131,20 +158,22 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl) ...@@ -131,20 +158,22 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl)
{ {
u8 lock_cmd; u8 lock_cmd;
int err; int err;
u8 wait = 1; u8 wait = 0;
fl->fl_nspid = get_pid(task_tgid(current)); if (!(fl->fl_flags & FL_FLOCK))
dout("ceph_flock, fl_pid:%d", fl->fl_pid); return -ENOLCK;
/* No mandatory locks */
/* set wait bit, then clear it out of cmd*/ if (__mandatory_lock(file->f_mapping->host) && fl->fl_type != F_UNLCK)
if (cmd & LOCK_NB) return -ENOLCK;
wait = 0;
cmd = cmd & (LOCK_SH | LOCK_EX | LOCK_UN); dout("ceph_flock, fl_file: %p", fl->fl_file);
/* set command sequence that Ceph wants to see:
shared lock, exclusive lock, or unlock */ if (IS_SETLKW(cmd))
if (LOCK_SH == cmd) wait = 1;
if (F_RDLCK == fl->fl_type)
lock_cmd = CEPH_LOCK_SHARED; lock_cmd = CEPH_LOCK_SHARED;
else if (LOCK_EX == cmd) else if (F_WRLCK == fl->fl_type)
lock_cmd = CEPH_LOCK_EXCL; lock_cmd = CEPH_LOCK_EXCL;
else else
lock_cmd = CEPH_LOCK_UNLOCK; lock_cmd = CEPH_LOCK_UNLOCK;
...@@ -280,13 +309,14 @@ int lock_to_ceph_filelock(struct file_lock *lock, ...@@ -280,13 +309,14 @@ int lock_to_ceph_filelock(struct file_lock *lock,
struct ceph_filelock *cephlock) struct ceph_filelock *cephlock)
{ {
int err = 0; int err = 0;
cephlock->start = cpu_to_le64(lock->fl_start); cephlock->start = cpu_to_le64(lock->fl_start);
cephlock->length = cpu_to_le64(lock->fl_end - lock->fl_start + 1); cephlock->length = cpu_to_le64(lock->fl_end - lock->fl_start + 1);
cephlock->client = cpu_to_le64(0); cephlock->client = cpu_to_le64(0);
cephlock->pid = cpu_to_le64(lock->fl_pid); cephlock->pid = cpu_to_le64((u64)lock->fl_pid);
cephlock->pid_namespace = if (lock->fl_flags & FL_POSIX)
cpu_to_le64((u64)(unsigned long)lock->fl_nspid); cephlock->owner = cpu_to_le64(secure_addr(lock->fl_owner));
else
cephlock->owner = cpu_to_le64(secure_addr(lock->fl_file));
switch (lock->fl_type) { switch (lock->fl_type) {
case F_RDLCK: case F_RDLCK:
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
#include <linux/fs.h> #include <linux/fs.h>
#include <linux/wait.h> #include <linux/wait.h>
#include <linux/slab.h> #include <linux/slab.h>
#include <linux/gfp.h>
#include <linux/sched.h> #include <linux/sched.h>
#include <linux/debugfs.h> #include <linux/debugfs.h>
#include <linux/seq_file.h> #include <linux/seq_file.h>
...@@ -165,21 +166,18 @@ static int parse_reply_info_dir(void **p, void *end, ...@@ -165,21 +166,18 @@ static int parse_reply_info_dir(void **p, void *end,
if (num == 0) if (num == 0)
goto done; goto done;
/* alloc large array */ BUG_ON(!info->dir_in);
info->dir_nr = num;
info->dir_in = kcalloc(num, sizeof(*info->dir_in) +
sizeof(*info->dir_dname) +
sizeof(*info->dir_dname_len) +
sizeof(*info->dir_dlease),
GFP_NOFS);
if (info->dir_in == NULL) {
err = -ENOMEM;
goto out_bad;
}
info->dir_dname = (void *)(info->dir_in + num); info->dir_dname = (void *)(info->dir_in + num);
info->dir_dname_len = (void *)(info->dir_dname + num); info->dir_dname_len = (void *)(info->dir_dname + num);
info->dir_dlease = (void *)(info->dir_dname_len + num); info->dir_dlease = (void *)(info->dir_dname_len + num);
if ((unsigned long)(info->dir_dlease + num) >
(unsigned long)info->dir_in + info->dir_buf_size) {
pr_err("dir contents are larger than expected\n");
WARN_ON(1);
goto bad;
}
info->dir_nr = num;
while (num) { while (num) {
/* dentry */ /* dentry */
ceph_decode_need(p, end, sizeof(u32)*2, bad); ceph_decode_need(p, end, sizeof(u32)*2, bad);
...@@ -327,7 +325,9 @@ static int parse_reply_info(struct ceph_msg *msg, ...@@ -327,7 +325,9 @@ static int parse_reply_info(struct ceph_msg *msg,
static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
{ {
kfree(info->dir_in); if (!info->dir_in)
return;
free_pages((unsigned long)info->dir_in, get_order(info->dir_buf_size));
} }
...@@ -512,12 +512,11 @@ void ceph_mdsc_release_request(struct kref *kref) ...@@ -512,12 +512,11 @@ void ceph_mdsc_release_request(struct kref *kref)
struct ceph_mds_request *req = container_of(kref, struct ceph_mds_request *req = container_of(kref,
struct ceph_mds_request, struct ceph_mds_request,
r_kref); r_kref);
destroy_reply_info(&req->r_reply_info);
if (req->r_request) if (req->r_request)
ceph_msg_put(req->r_request); ceph_msg_put(req->r_request);
if (req->r_reply) { if (req->r_reply)
ceph_msg_put(req->r_reply); ceph_msg_put(req->r_reply);
destroy_reply_info(&req->r_reply_info);
}
if (req->r_inode) { if (req->r_inode) {
ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
iput(req->r_inode); iput(req->r_inode);
...@@ -528,7 +527,9 @@ void ceph_mdsc_release_request(struct kref *kref) ...@@ -528,7 +527,9 @@ void ceph_mdsc_release_request(struct kref *kref)
iput(req->r_target_inode); iput(req->r_target_inode);
if (req->r_dentry) if (req->r_dentry)
dput(req->r_dentry); dput(req->r_dentry);
if (req->r_old_dentry) { if (req->r_old_dentry)
dput(req->r_old_dentry);
if (req->r_old_dentry_dir) {
/* /*
* track (and drop pins for) r_old_dentry_dir * track (and drop pins for) r_old_dentry_dir
* separately, since r_old_dentry's d_parent may have * separately, since r_old_dentry's d_parent may have
...@@ -537,7 +538,6 @@ void ceph_mdsc_release_request(struct kref *kref) ...@@ -537,7 +538,6 @@ void ceph_mdsc_release_request(struct kref *kref)
*/ */
ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
CEPH_CAP_PIN); CEPH_CAP_PIN);
dput(req->r_old_dentry);
iput(req->r_old_dentry_dir); iput(req->r_old_dentry_dir);
} }
kfree(req->r_path1); kfree(req->r_path1);
...@@ -1311,6 +1311,9 @@ static int trim_caps(struct ceph_mds_client *mdsc, ...@@ -1311,6 +1311,9 @@ static int trim_caps(struct ceph_mds_client *mdsc,
trim_caps - session->s_trim_caps); trim_caps - session->s_trim_caps);
session->s_trim_caps = 0; session->s_trim_caps = 0;
} }
ceph_add_cap_releases(mdsc, session);
ceph_send_cap_releases(mdsc, session);
return 0; return 0;
} }
...@@ -1461,15 +1464,18 @@ static void discard_cap_releases(struct ceph_mds_client *mdsc, ...@@ -1461,15 +1464,18 @@ static void discard_cap_releases(struct ceph_mds_client *mdsc,
dout("discard_cap_releases mds%d\n", session->s_mds); dout("discard_cap_releases mds%d\n", session->s_mds);
if (!list_empty(&session->s_cap_releases)) {
/* zero out the in-progress message */ /* zero out the in-progress message */
msg = list_first_entry(&session->s_cap_releases, msg = list_first_entry(&session->s_cap_releases,
struct ceph_msg, list_head); struct ceph_msg, list_head);
head = msg->front.iov_base; head = msg->front.iov_base;
num = le32_to_cpu(head->num); num = le32_to_cpu(head->num);
dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num); dout("discard_cap_releases mds%d %p %u\n",
session->s_mds, msg, num);
head->num = cpu_to_le32(0); head->num = cpu_to_le32(0);
msg->front.iov_len = sizeof(*head); msg->front.iov_len = sizeof(*head);
session->s_num_cap_releases += num; session->s_num_cap_releases += num;
}
/* requeue completed messages */ /* requeue completed messages */
while (!list_empty(&session->s_cap_releases_done)) { while (!list_empty(&session->s_cap_releases_done)) {
...@@ -1492,6 +1498,43 @@ static void discard_cap_releases(struct ceph_mds_client *mdsc, ...@@ -1492,6 +1498,43 @@ static void discard_cap_releases(struct ceph_mds_client *mdsc,
* requests * requests
*/ */
int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
struct inode *dir)
{
struct ceph_inode_info *ci = ceph_inode(dir);
struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
size_t size = sizeof(*rinfo->dir_in) + sizeof(*rinfo->dir_dname_len) +
sizeof(*rinfo->dir_dname) + sizeof(*rinfo->dir_dlease);
int order, num_entries;
spin_lock(&ci->i_ceph_lock);
num_entries = ci->i_files + ci->i_subdirs;
spin_unlock(&ci->i_ceph_lock);
num_entries = max(num_entries, 1);
num_entries = min(num_entries, opt->max_readdir);
order = get_order(size * num_entries);
while (order >= 0) {
rinfo->dir_in = (void*)__get_free_pages(GFP_NOFS | __GFP_NOWARN,
order);
if (rinfo->dir_in)
break;
order--;
}
if (!rinfo->dir_in)
return -ENOMEM;
num_entries = (PAGE_SIZE << order) / size;
num_entries = min(num_entries, opt->max_readdir);
rinfo->dir_buf_size = PAGE_SIZE << order;
req->r_num_caps = num_entries + 1;
req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
return 0;
}
/* /*
* Create an mds request. * Create an mds request.
*/ */
...@@ -2053,7 +2096,7 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, ...@@ -2053,7 +2096,7 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
if (req->r_locked_dir) if (req->r_locked_dir)
ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN); ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
if (req->r_old_dentry) if (req->r_old_dentry_dir)
ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
CEPH_CAP_PIN); CEPH_CAP_PIN);
......
...@@ -67,6 +67,7 @@ struct ceph_mds_reply_info_parsed { ...@@ -67,6 +67,7 @@ struct ceph_mds_reply_info_parsed {
/* for readdir results */ /* for readdir results */
struct { struct {
struct ceph_mds_reply_dirfrag *dir_dir; struct ceph_mds_reply_dirfrag *dir_dir;
size_t dir_buf_size;
int dir_nr; int dir_nr;
char **dir_dname; char **dir_dname;
u32 *dir_dname_len; u32 *dir_dname_len;
...@@ -346,7 +347,8 @@ extern void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, ...@@ -346,7 +347,8 @@ extern void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc,
struct dentry *dn); struct dentry *dn);
extern void ceph_invalidate_dir_request(struct ceph_mds_request *req); extern void ceph_invalidate_dir_request(struct ceph_mds_request *req);
extern int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
struct inode *dir);
extern struct ceph_mds_request * extern struct ceph_mds_request *
ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode); ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode);
extern void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, extern void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
......
...@@ -54,6 +54,7 @@ const char *ceph_mds_op_name(int op) ...@@ -54,6 +54,7 @@ const char *ceph_mds_op_name(int op)
case CEPH_MDS_OP_LOOKUPHASH: return "lookuphash"; case CEPH_MDS_OP_LOOKUPHASH: return "lookuphash";
case CEPH_MDS_OP_LOOKUPPARENT: return "lookupparent"; case CEPH_MDS_OP_LOOKUPPARENT: return "lookupparent";
case CEPH_MDS_OP_LOOKUPINO: return "lookupino"; case CEPH_MDS_OP_LOOKUPINO: return "lookupino";
case CEPH_MDS_OP_LOOKUPNAME: return "lookupname";
case CEPH_MDS_OP_GETATTR: return "getattr"; case CEPH_MDS_OP_GETATTR: return "getattr";
case CEPH_MDS_OP_SETXATTR: return "setxattr"; case CEPH_MDS_OP_SETXATTR: return "setxattr";
case CEPH_MDS_OP_SETATTR: return "setattr"; case CEPH_MDS_OP_SETATTR: return "setattr";
......
...@@ -1026,6 +1026,7 @@ static int __init init_ceph(void) ...@@ -1026,6 +1026,7 @@ static int __init init_ceph(void)
if (ret) if (ret)
goto out; goto out;
ceph_flock_init();
ceph_xattr_init(); ceph_xattr_init();
ret = register_filesystem(&ceph_fs_type); ret = register_filesystem(&ceph_fs_type);
if (ret) if (ret)
......
...@@ -577,7 +577,7 @@ struct ceph_file_info { ...@@ -577,7 +577,7 @@ struct ceph_file_info {
/* readdir: position within a frag */ /* readdir: position within a frag */
unsigned offset; /* offset of last chunk, adjusted for . and .. */ unsigned offset; /* offset of last chunk, adjusted for . and .. */
u64 next_offset; /* offset of next chunk (last_name's + 1) */ unsigned next_offset; /* offset of next chunk (last_name's + 1) */
char *last_name; /* last entry in previous chunk */ char *last_name; /* last entry in previous chunk */
struct dentry *dentry; /* next dentry (for dcache readdir) */ struct dentry *dentry; /* next dentry (for dcache readdir) */
int dir_release_count; int dir_release_count;
...@@ -871,6 +871,7 @@ extern long ceph_ioctl(struct file *file, unsigned int cmd, unsigned long arg); ...@@ -871,6 +871,7 @@ extern long ceph_ioctl(struct file *file, unsigned int cmd, unsigned long arg);
extern const struct export_operations ceph_export_ops; extern const struct export_operations ceph_export_ops;
/* locks.c */ /* locks.c */
extern __init void ceph_flock_init(void);
extern int ceph_lock(struct file *file, int cmd, struct file_lock *fl); extern int ceph_lock(struct file *file, int cmd, struct file_lock *fl);
extern int ceph_flock(struct file *file, int cmd, struct file_lock *fl); extern int ceph_flock(struct file *file, int cmd, struct file_lock *fl);
extern void ceph_count_locks(struct inode *inode, int *p_num, int *f_num); extern void ceph_count_locks(struct inode *inode, int *p_num, int *f_num);
......
...@@ -71,25 +71,41 @@ static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val, ...@@ -71,25 +71,41 @@ static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val,
struct ceph_osd_client *osdc = &fsc->client->osdc; struct ceph_osd_client *osdc = &fsc->client->osdc;
s64 pool = ceph_file_layout_pg_pool(ci->i_layout); s64 pool = ceph_file_layout_pg_pool(ci->i_layout);
const char *pool_name; const char *pool_name;
char buf[128];
dout("ceph_vxattrcb_layout %p\n", &ci->vfs_inode); dout("ceph_vxattrcb_layout %p\n", &ci->vfs_inode);
down_read(&osdc->map_sem); down_read(&osdc->map_sem);
pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool); pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool);
if (pool_name) if (pool_name) {
ret = snprintf(val, size, size_t len = strlen(pool_name);
"stripe_unit=%lld stripe_count=%lld object_size=%lld pool=%s", ret = snprintf(buf, sizeof(buf),
"stripe_unit=%lld stripe_count=%lld object_size=%lld pool=",
(unsigned long long)ceph_file_layout_su(ci->i_layout), (unsigned long long)ceph_file_layout_su(ci->i_layout),
(unsigned long long)ceph_file_layout_stripe_count(ci->i_layout), (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout),
(unsigned long long)ceph_file_layout_object_size(ci->i_layout), (unsigned long long)ceph_file_layout_object_size(ci->i_layout));
pool_name); if (!size) {
else ret += len;
ret = snprintf(val, size, } else if (ret + len > size) {
ret = -ERANGE;
} else {
memcpy(val, buf, ret);
memcpy(val + ret, pool_name, len);
ret += len;
}
} else {
ret = snprintf(buf, sizeof(buf),
"stripe_unit=%lld stripe_count=%lld object_size=%lld pool=%lld", "stripe_unit=%lld stripe_count=%lld object_size=%lld pool=%lld",
(unsigned long long)ceph_file_layout_su(ci->i_layout), (unsigned long long)ceph_file_layout_su(ci->i_layout),
(unsigned long long)ceph_file_layout_stripe_count(ci->i_layout), (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout),
(unsigned long long)ceph_file_layout_object_size(ci->i_layout), (unsigned long long)ceph_file_layout_object_size(ci->i_layout),
(unsigned long long)pool); (unsigned long long)pool);
if (size) {
if (ret <= size)
memcpy(val, buf, ret);
else
ret = -ERANGE;
}
}
up_read(&osdc->map_sem); up_read(&osdc->map_sem);
return ret; return ret;
} }
...@@ -215,7 +231,7 @@ static struct ceph_vxattr ceph_dir_vxattrs[] = { ...@@ -215,7 +231,7 @@ static struct ceph_vxattr ceph_dir_vxattrs[] = {
.name_size = sizeof("ceph.dir.layout"), .name_size = sizeof("ceph.dir.layout"),
.getxattr_cb = ceph_vxattrcb_layout, .getxattr_cb = ceph_vxattrcb_layout,
.readonly = false, .readonly = false,
.hidden = false, .hidden = true,
.exists_cb = ceph_vxattrcb_layout_exists, .exists_cb = ceph_vxattrcb_layout_exists,
}, },
XATTR_LAYOUT_FIELD(dir, layout, stripe_unit), XATTR_LAYOUT_FIELD(dir, layout, stripe_unit),
...@@ -242,7 +258,7 @@ static struct ceph_vxattr ceph_file_vxattrs[] = { ...@@ -242,7 +258,7 @@ static struct ceph_vxattr ceph_file_vxattrs[] = {
.name_size = sizeof("ceph.file.layout"), .name_size = sizeof("ceph.file.layout"),
.getxattr_cb = ceph_vxattrcb_layout, .getxattr_cb = ceph_vxattrcb_layout,
.readonly = false, .readonly = false,
.hidden = false, .hidden = true,
.exists_cb = ceph_vxattrcb_layout_exists, .exists_cb = ceph_vxattrcb_layout_exists,
}, },
XATTR_LAYOUT_FIELD(file, layout, stripe_unit), XATTR_LAYOUT_FIELD(file, layout, stripe_unit),
...@@ -842,7 +858,6 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name, ...@@ -842,7 +858,6 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
struct inode *inode = dentry->d_inode; struct inode *inode = dentry->d_inode;
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
struct inode *parent_inode;
struct ceph_mds_request *req; struct ceph_mds_request *req;
struct ceph_mds_client *mdsc = fsc->mdsc; struct ceph_mds_client *mdsc = fsc->mdsc;
int err; int err;
...@@ -893,9 +908,7 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name, ...@@ -893,9 +908,7 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
req->r_data_len = size; req->r_data_len = size;
dout("xattr.ver (before): %lld\n", ci->i_xattrs.version); dout("xattr.ver (before): %lld\n", ci->i_xattrs.version);
parent_inode = ceph_get_dentry_parent_inode(dentry); err = ceph_mdsc_do_request(mdsc, NULL, req);
err = ceph_mdsc_do_request(mdsc, parent_inode, req);
iput(parent_inode);
ceph_mdsc_put_request(req); ceph_mdsc_put_request(req);
dout("xattr.ver (after): %lld\n", ci->i_xattrs.version); dout("xattr.ver (after): %lld\n", ci->i_xattrs.version);
...@@ -1019,7 +1032,6 @@ static int ceph_send_removexattr(struct dentry *dentry, const char *name) ...@@ -1019,7 +1032,6 @@ static int ceph_send_removexattr(struct dentry *dentry, const char *name)
struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
struct ceph_mds_client *mdsc = fsc->mdsc; struct ceph_mds_client *mdsc = fsc->mdsc;
struct inode *inode = dentry->d_inode; struct inode *inode = dentry->d_inode;
struct inode *parent_inode;
struct ceph_mds_request *req; struct ceph_mds_request *req;
int err; int err;
...@@ -1033,9 +1045,7 @@ static int ceph_send_removexattr(struct dentry *dentry, const char *name) ...@@ -1033,9 +1045,7 @@ static int ceph_send_removexattr(struct dentry *dentry, const char *name)
req->r_num_caps = 1; req->r_num_caps = 1;
req->r_path2 = kstrdup(name, GFP_NOFS); req->r_path2 = kstrdup(name, GFP_NOFS);
parent_inode = ceph_get_dentry_parent_inode(dentry); err = ceph_mdsc_do_request(mdsc, NULL, req);
err = ceph_mdsc_do_request(mdsc, parent_inode, req);
iput(parent_inode);
ceph_mdsc_put_request(req); ceph_mdsc_put_request(req);
return err; return err;
} }
......
...@@ -43,6 +43,13 @@ ...@@ -43,6 +43,13 @@
#define CEPH_FEATURE_CRUSH_V2 (1ULL<<36) /* new indep; SET_* steps */ #define CEPH_FEATURE_CRUSH_V2 (1ULL<<36) /* new indep; SET_* steps */
#define CEPH_FEATURE_EXPORT_PEER (1ULL<<37) #define CEPH_FEATURE_EXPORT_PEER (1ULL<<37)
#define CEPH_FEATURE_OSD_ERASURE_CODES (1ULL<<38) #define CEPH_FEATURE_OSD_ERASURE_CODES (1ULL<<38)
#define CEPH_FEATURE_OSD_TMAP2OMAP (1ULL<<38) /* overlap with EC */
/* The process supports new-style OSDMap encoding. Monitors also use
this bit to determine if peers support NAK messages. */
#define CEPH_FEATURE_OSDMAP_ENC (1ULL<<39)
#define CEPH_FEATURE_MDS_INLINE_DATA (1ULL<<40)
#define CEPH_FEATURE_CRUSH_TUNABLES3 (1ULL<<41)
#define CEPH_FEATURE_OSD_PRIMARY_AFFINITY (1ULL<<41) /* overlap w/ tunables3 */
/* /*
* The introduction of CEPH_FEATURE_OSD_SNAPMAPPER caused the feature * The introduction of CEPH_FEATURE_OSD_SNAPMAPPER caused the feature
...@@ -82,7 +89,10 @@ static inline u64 ceph_sanitize_features(u64 features) ...@@ -82,7 +89,10 @@ static inline u64 ceph_sanitize_features(u64 features)
CEPH_FEATURE_OSDHASHPSPOOL | \ CEPH_FEATURE_OSDHASHPSPOOL | \
CEPH_FEATURE_OSD_CACHEPOOL | \ CEPH_FEATURE_OSD_CACHEPOOL | \
CEPH_FEATURE_CRUSH_V2 | \ CEPH_FEATURE_CRUSH_V2 | \
CEPH_FEATURE_EXPORT_PEER) CEPH_FEATURE_EXPORT_PEER | \
CEPH_FEATURE_OSDMAP_ENC | \
CEPH_FEATURE_CRUSH_TUNABLES3 | \
CEPH_FEATURE_OSD_PRIMARY_AFFINITY)
#define CEPH_FEATURES_REQUIRED_DEFAULT \ #define CEPH_FEATURES_REQUIRED_DEFAULT \
(CEPH_FEATURE_NOSRCADDR | \ (CEPH_FEATURE_NOSRCADDR | \
......
...@@ -332,6 +332,7 @@ enum { ...@@ -332,6 +332,7 @@ enum {
CEPH_MDS_OP_LOOKUPHASH = 0x00102, CEPH_MDS_OP_LOOKUPHASH = 0x00102,
CEPH_MDS_OP_LOOKUPPARENT = 0x00103, CEPH_MDS_OP_LOOKUPPARENT = 0x00103,
CEPH_MDS_OP_LOOKUPINO = 0x00104, CEPH_MDS_OP_LOOKUPINO = 0x00104,
CEPH_MDS_OP_LOOKUPNAME = 0x00105,
CEPH_MDS_OP_SETXATTR = 0x01105, CEPH_MDS_OP_SETXATTR = 0x01105,
CEPH_MDS_OP_RMXATTR = 0x01106, CEPH_MDS_OP_RMXATTR = 0x01106,
...@@ -420,8 +421,8 @@ union ceph_mds_request_args { ...@@ -420,8 +421,8 @@ union ceph_mds_request_args {
struct { struct {
__u8 rule; /* currently fcntl or flock */ __u8 rule; /* currently fcntl or flock */
__u8 type; /* shared, exclusive, remove*/ __u8 type; /* shared, exclusive, remove*/
__le64 owner; /* owner of the lock */
__le64 pid; /* process id requesting the lock */ __le64 pid; /* process id requesting the lock */
__le64 pid_namespace;
__le64 start; /* initial location to lock */ __le64 start; /* initial location to lock */
__le64 length; /* num bytes to lock from start */ __le64 length; /* num bytes to lock from start */
__u8 wait; /* will caller wait for lock to become available? */ __u8 wait; /* will caller wait for lock to become available? */
...@@ -532,8 +533,8 @@ struct ceph_filelock { ...@@ -532,8 +533,8 @@ struct ceph_filelock {
__le64 start;/* file offset to start lock at */ __le64 start;/* file offset to start lock at */
__le64 length; /* num bytes to lock; 0 for all following start */ __le64 length; /* num bytes to lock; 0 for all following start */
__le64 client; /* which client holds the lock */ __le64 client; /* which client holds the lock */
__le64 owner; /* owner the lock */
__le64 pid; /* process id holding the lock on the client */ __le64 pid; /* process id holding the lock on the client */
__le64 pid_namespace;
__u8 type; /* shared lock, exclusive lock, or unlock */ __u8 type; /* shared lock, exclusive lock, or unlock */
} __attribute__ ((packed)); } __attribute__ ((packed));
......
...@@ -43,7 +43,7 @@ struct ceph_osd { ...@@ -43,7 +43,7 @@ struct ceph_osd {
}; };
#define CEPH_OSD_MAX_OP 2 #define CEPH_OSD_MAX_OP 3
enum ceph_osd_data_type { enum ceph_osd_data_type {
CEPH_OSD_DATA_TYPE_NONE = 0, CEPH_OSD_DATA_TYPE_NONE = 0,
...@@ -76,6 +76,7 @@ struct ceph_osd_data { ...@@ -76,6 +76,7 @@ struct ceph_osd_data {
struct ceph_osd_req_op { struct ceph_osd_req_op {
u16 op; /* CEPH_OSD_OP_* */ u16 op; /* CEPH_OSD_OP_* */
u32 flags; /* CEPH_OSD_OP_FLAG_* */
u32 payload_len; u32 payload_len;
union { union {
struct ceph_osd_data raw_data_in; struct ceph_osd_data raw_data_in;
...@@ -102,6 +103,10 @@ struct ceph_osd_req_op { ...@@ -102,6 +103,10 @@ struct ceph_osd_req_op {
u32 timeout; u32 timeout;
__u8 flag; __u8 flag;
} watch; } watch;
struct {
u64 expected_object_size;
u64 expected_write_size;
} alloc_hint;
}; };
}; };
...@@ -293,6 +298,10 @@ extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req, ...@@ -293,6 +298,10 @@ extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req,
extern void osd_req_op_watch_init(struct ceph_osd_request *osd_req, extern void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
unsigned int which, u16 opcode, unsigned int which, u16 opcode,
u64 cookie, u64 version, int flag); u64 cookie, u64 version, int flag);
extern void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
unsigned int which,
u64 expected_object_size,
u64 expected_write_size);
extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
struct ceph_snap_context *snapc, struct ceph_snap_context *snapc,
......
...@@ -41,6 +41,18 @@ struct ceph_pg_pool_info { ...@@ -41,6 +41,18 @@ struct ceph_pg_pool_info {
char *name; char *name;
}; };
static inline bool ceph_can_shift_osds(struct ceph_pg_pool_info *pool)
{
switch (pool->type) {
case CEPH_POOL_TYPE_REP:
return true;
case CEPH_POOL_TYPE_EC:
return false;
default:
BUG_ON(1);
}
}
struct ceph_object_locator { struct ceph_object_locator {
s64 pool; s64 pool;
}; };
...@@ -60,8 +72,16 @@ struct ceph_object_id { ...@@ -60,8 +72,16 @@ struct ceph_object_id {
struct ceph_pg_mapping { struct ceph_pg_mapping {
struct rb_node node; struct rb_node node;
struct ceph_pg pgid; struct ceph_pg pgid;
union {
struct {
int len; int len;
int osds[]; int osds[];
} pg_temp;
struct {
int osd;
} primary_temp;
};
}; };
struct ceph_osdmap { struct ceph_osdmap {
...@@ -78,12 +98,19 @@ struct ceph_osdmap { ...@@ -78,12 +98,19 @@ struct ceph_osdmap {
struct ceph_entity_addr *osd_addr; struct ceph_entity_addr *osd_addr;
struct rb_root pg_temp; struct rb_root pg_temp;
struct rb_root primary_temp;
u32 *osd_primary_affinity;
struct rb_root pg_pools; struct rb_root pg_pools;
u32 pool_max; u32 pool_max;
/* the CRUSH map specifies the mapping of placement groups to /* the CRUSH map specifies the mapping of placement groups to
* the list of osds that store+replicate them. */ * the list of osds that store+replicate them. */
struct crush_map *crush; struct crush_map *crush;
struct mutex crush_scratch_mutex;
int crush_scratch_ary[CEPH_PG_MAX_SIZE * 3];
}; };
static inline void ceph_oid_set_name(struct ceph_object_id *oid, static inline void ceph_oid_set_name(struct ceph_object_id *oid,
...@@ -110,9 +137,21 @@ static inline void ceph_oid_copy(struct ceph_object_id *dest, ...@@ -110,9 +137,21 @@ static inline void ceph_oid_copy(struct ceph_object_id *dest,
dest->name_len = src->name_len; dest->name_len = src->name_len;
} }
static inline int ceph_osd_exists(struct ceph_osdmap *map, int osd)
{
return osd >= 0 && osd < map->max_osd &&
(map->osd_state[osd] & CEPH_OSD_EXISTS);
}
static inline int ceph_osd_is_up(struct ceph_osdmap *map, int osd) static inline int ceph_osd_is_up(struct ceph_osdmap *map, int osd)
{ {
return (osd < map->max_osd) && (map->osd_state[osd] & CEPH_OSD_UP); return ceph_osd_exists(map, osd) &&
(map->osd_state[osd] & CEPH_OSD_UP);
}
static inline int ceph_osd_is_down(struct ceph_osdmap *map, int osd)
{
return !ceph_osd_is_up(map, osd);
} }
static inline bool ceph_osdmap_flag(struct ceph_osdmap *map, int flag) static inline bool ceph_osdmap_flag(struct ceph_osdmap *map, int flag)
...@@ -121,6 +160,7 @@ static inline bool ceph_osdmap_flag(struct ceph_osdmap *map, int flag) ...@@ -121,6 +160,7 @@ static inline bool ceph_osdmap_flag(struct ceph_osdmap *map, int flag)
} }
extern char *ceph_osdmap_state_str(char *str, int len, int state); extern char *ceph_osdmap_state_str(char *str, int len, int state);
extern u32 ceph_get_primary_affinity(struct ceph_osdmap *map, int osd);
static inline struct ceph_entity_addr *ceph_osd_addr(struct ceph_osdmap *map, static inline struct ceph_entity_addr *ceph_osd_addr(struct ceph_osdmap *map,
int osd) int osd)
...@@ -153,7 +193,7 @@ static inline int ceph_decode_pgid(void **p, void *end, struct ceph_pg *pgid) ...@@ -153,7 +193,7 @@ static inline int ceph_decode_pgid(void **p, void *end, struct ceph_pg *pgid)
return 0; return 0;
} }
extern struct ceph_osdmap *osdmap_decode(void **p, void *end); extern struct ceph_osdmap *ceph_osdmap_decode(void **p, void *end);
extern struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, extern struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
struct ceph_osdmap *map, struct ceph_osdmap *map,
struct ceph_messenger *msgr); struct ceph_messenger *msgr);
...@@ -172,7 +212,7 @@ extern int ceph_oloc_oid_to_pg(struct ceph_osdmap *osdmap, ...@@ -172,7 +212,7 @@ extern int ceph_oloc_oid_to_pg(struct ceph_osdmap *osdmap,
extern int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, extern int ceph_calc_pg_acting(struct ceph_osdmap *osdmap,
struct ceph_pg pgid, struct ceph_pg pgid,
int *acting); int *osds, int *primary);
extern int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, extern int ceph_calc_pg_primary(struct ceph_osdmap *osdmap,
struct ceph_pg pgid); struct ceph_pg pgid);
......
...@@ -81,8 +81,9 @@ struct ceph_pg_v1 { ...@@ -81,8 +81,9 @@ struct ceph_pg_v1 {
*/ */
#define CEPH_NOPOOL ((__u64) (-1)) /* pool id not defined */ #define CEPH_NOPOOL ((__u64) (-1)) /* pool id not defined */
#define CEPH_PG_TYPE_REP 1 #define CEPH_POOL_TYPE_REP 1
#define CEPH_PG_TYPE_RAID4 2 #define CEPH_POOL_TYPE_RAID4 2 /* never implemented */
#define CEPH_POOL_TYPE_EC 3
/* /*
* stable_mod func is used to control number of placement groups. * stable_mod func is used to control number of placement groups.
...@@ -133,6 +134,10 @@ extern const char *ceph_osd_state_name(int s); ...@@ -133,6 +134,10 @@ extern const char *ceph_osd_state_name(int s);
#define CEPH_OSD_IN 0x10000 #define CEPH_OSD_IN 0x10000
#define CEPH_OSD_OUT 0 #define CEPH_OSD_OUT 0
/* osd primary-affinity. fixed point value: 0x10000 == baseline */
#define CEPH_OSD_MAX_PRIMARY_AFFINITY 0x10000
#define CEPH_OSD_DEFAULT_PRIMARY_AFFINITY 0x10000
/* /*
* osd map flag bits * osd map flag bits
...@@ -227,6 +232,9 @@ enum { ...@@ -227,6 +232,9 @@ enum {
CEPH_OSD_OP_OMAPRMKEYS = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 24, CEPH_OSD_OP_OMAPRMKEYS = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 24,
CEPH_OSD_OP_OMAP_CMP = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 25, CEPH_OSD_OP_OMAP_CMP = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 25,
/* hints */
CEPH_OSD_OP_SETALLOCHINT = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 35,
/** multi **/ /** multi **/
CEPH_OSD_OP_CLONERANGE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_MULTI | 1, CEPH_OSD_OP_CLONERANGE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_MULTI | 1,
CEPH_OSD_OP_ASSERT_SRC_VERSION = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_MULTI | 2, CEPH_OSD_OP_ASSERT_SRC_VERSION = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_MULTI | 2,
...@@ -382,7 +390,7 @@ enum { ...@@ -382,7 +390,7 @@ enum {
*/ */
struct ceph_osd_op { struct ceph_osd_op {
__le16 op; /* CEPH_OSD_OP_* */ __le16 op; /* CEPH_OSD_OP_* */
__le32 flags; /* CEPH_OSD_FLAG_* */ __le32 flags; /* CEPH_OSD_OP_FLAG_* */
union { union {
struct { struct {
__le64 offset, length; __le64 offset, length;
...@@ -416,6 +424,10 @@ struct ceph_osd_op { ...@@ -416,6 +424,10 @@ struct ceph_osd_op {
__le64 offset, length; __le64 offset, length;
__le64 src_offset; __le64 src_offset;
} __attribute__ ((packed)) clonerange; } __attribute__ ((packed)) clonerange;
struct {
__le64 expected_object_size;
__le64 expected_write_size;
} __attribute__ ((packed)) alloc_hint;
}; };
__le32 payload_len; __le32 payload_len;
} __attribute__ ((packed)); } __attribute__ ((packed));
......
...@@ -51,6 +51,7 @@ enum { ...@@ -51,6 +51,7 @@ enum {
CRUSH_RULE_SET_CHOOSELEAF_TRIES = 9, /* override chooseleaf_descend_once */ CRUSH_RULE_SET_CHOOSELEAF_TRIES = 9, /* override chooseleaf_descend_once */
CRUSH_RULE_SET_CHOOSE_LOCAL_TRIES = 10, CRUSH_RULE_SET_CHOOSE_LOCAL_TRIES = 10,
CRUSH_RULE_SET_CHOOSE_LOCAL_FALLBACK_TRIES = 11, CRUSH_RULE_SET_CHOOSE_LOCAL_FALLBACK_TRIES = 11,
CRUSH_RULE_SET_CHOOSELEAF_VARY_R = 12
}; };
/* /*
...@@ -173,6 +174,12 @@ struct crush_map { ...@@ -173,6 +174,12 @@ struct crush_map {
* apply to a collision: in that case we will retry as we used * apply to a collision: in that case we will retry as we used
* to. */ * to. */
__u32 chooseleaf_descend_once; __u32 chooseleaf_descend_once;
/* if non-zero, feed r into chooseleaf, bit-shifted right by (r-1)
* bits. a value of 1 is best for new clusters. for legacy clusters
* that want to limit reshuffling, a value of 3 or 4 will make the
* mappings line up a bit better with previous mappings. */
__u8 chooseleaf_vary_r;
}; };
......
...@@ -292,10 +292,12 @@ static int is_out(const struct crush_map *map, ...@@ -292,10 +292,12 @@ static int is_out(const struct crush_map *map,
* @outpos: our position in that vector * @outpos: our position in that vector
* @tries: number of attempts to make * @tries: number of attempts to make
* @recurse_tries: number of attempts to have recursive chooseleaf make * @recurse_tries: number of attempts to have recursive chooseleaf make
* @local_tries: localized retries * @local_retries: localized retries
* @local_fallback_tries: localized fallback retries * @local_fallback_retries: localized fallback retries
* @recurse_to_leaf: true if we want one device under each item of given type (chooseleaf instead of choose) * @recurse_to_leaf: true if we want one device under each item of given type (chooseleaf instead of choose)
* @vary_r: pass r to recursive calls
* @out2: second output vector for leaf items (if @recurse_to_leaf) * @out2: second output vector for leaf items (if @recurse_to_leaf)
* @parent_r: r value passed from the parent
*/ */
static int crush_choose_firstn(const struct crush_map *map, static int crush_choose_firstn(const struct crush_map *map,
struct crush_bucket *bucket, struct crush_bucket *bucket,
...@@ -304,10 +306,12 @@ static int crush_choose_firstn(const struct crush_map *map, ...@@ -304,10 +306,12 @@ static int crush_choose_firstn(const struct crush_map *map,
int *out, int outpos, int *out, int outpos,
unsigned int tries, unsigned int tries,
unsigned int recurse_tries, unsigned int recurse_tries,
unsigned int local_tries, unsigned int local_retries,
unsigned int local_fallback_tries, unsigned int local_fallback_retries,
int recurse_to_leaf, int recurse_to_leaf,
int *out2) unsigned int vary_r,
int *out2,
int parent_r)
{ {
int rep; int rep;
unsigned int ftotal, flocal; unsigned int ftotal, flocal;
...@@ -319,8 +323,11 @@ static int crush_choose_firstn(const struct crush_map *map, ...@@ -319,8 +323,11 @@ static int crush_choose_firstn(const struct crush_map *map,
int itemtype; int itemtype;
int collide, reject; int collide, reject;
dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "", dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d tries %d recurse_tries %d local_retries %d local_fallback_retries %d parent_r %d\n",
bucket->id, x, outpos, numrep); recurse_to_leaf ? "_LEAF" : "",
bucket->id, x, outpos, numrep,
tries, recurse_tries, local_retries, local_fallback_retries,
parent_r);
for (rep = outpos; rep < numrep; rep++) { for (rep = outpos; rep < numrep; rep++) {
/* keep trying until we get a non-out, non-colliding item */ /* keep trying until we get a non-out, non-colliding item */
...@@ -335,7 +342,7 @@ static int crush_choose_firstn(const struct crush_map *map, ...@@ -335,7 +342,7 @@ static int crush_choose_firstn(const struct crush_map *map,
do { do {
collide = 0; collide = 0;
retry_bucket = 0; retry_bucket = 0;
r = rep; r = rep + parent_r;
/* r' = r + f_total */ /* r' = r + f_total */
r += ftotal; r += ftotal;
...@@ -344,9 +351,9 @@ static int crush_choose_firstn(const struct crush_map *map, ...@@ -344,9 +351,9 @@ static int crush_choose_firstn(const struct crush_map *map,
reject = 1; reject = 1;
goto reject; goto reject;
} }
if (local_fallback_tries > 0 && if (local_fallback_retries > 0 &&
flocal >= (in->size>>1) && flocal >= (in->size>>1) &&
flocal > local_fallback_tries) flocal > local_fallback_retries)
item = bucket_perm_choose(in, x, r); item = bucket_perm_choose(in, x, r);
else else
item = crush_bucket_choose(in, x, r); item = crush_bucket_choose(in, x, r);
...@@ -387,16 +394,23 @@ static int crush_choose_firstn(const struct crush_map *map, ...@@ -387,16 +394,23 @@ static int crush_choose_firstn(const struct crush_map *map,
reject = 0; reject = 0;
if (!collide && recurse_to_leaf) { if (!collide && recurse_to_leaf) {
if (item < 0) { if (item < 0) {
int sub_r;
if (vary_r)
sub_r = r >> (vary_r-1);
else
sub_r = 0;
if (crush_choose_firstn(map, if (crush_choose_firstn(map,
map->buckets[-1-item], map->buckets[-1-item],
weight, weight_max, weight, weight_max,
x, outpos+1, 0, x, outpos+1, 0,
out2, outpos, out2, outpos,
recurse_tries, 0, recurse_tries, 0,
local_tries, local_retries,
local_fallback_tries, local_fallback_retries,
0, 0,
NULL) <= outpos) vary_r,
NULL,
sub_r) <= outpos)
/* didn't get leaf */ /* didn't get leaf */
reject = 1; reject = 1;
} else { } else {
...@@ -420,14 +434,14 @@ static int crush_choose_firstn(const struct crush_map *map, ...@@ -420,14 +434,14 @@ static int crush_choose_firstn(const struct crush_map *map,
ftotal++; ftotal++;
flocal++; flocal++;
if (collide && flocal <= local_tries) if (collide && flocal <= local_retries)
/* retry locally a few times */ /* retry locally a few times */
retry_bucket = 1; retry_bucket = 1;
else if (local_fallback_tries > 0 && else if (local_fallback_retries > 0 &&
flocal <= in->size + local_fallback_tries) flocal <= in->size + local_fallback_retries)
/* exhaustive bucket search */ /* exhaustive bucket search */
retry_bucket = 1; retry_bucket = 1;
else if (ftotal <= tries) else if (ftotal < tries)
/* then retry descent */ /* then retry descent */
retry_descent = 1; retry_descent = 1;
else else
...@@ -640,10 +654,20 @@ int crush_do_rule(const struct crush_map *map, ...@@ -640,10 +654,20 @@ int crush_do_rule(const struct crush_map *map,
__u32 step; __u32 step;
int i, j; int i, j;
int numrep; int numrep;
int choose_tries = map->choose_total_tries; /*
int choose_local_tries = map->choose_local_tries; * the original choose_total_tries value was off by one (it
int choose_local_fallback_tries = map->choose_local_fallback_tries; * counted "retries" and not "tries"). add one.
*/
int choose_tries = map->choose_total_tries + 1;
int choose_leaf_tries = 0; int choose_leaf_tries = 0;
/*
* the local tries values were counted as "retries", though,
* and need no adjustment
*/
int choose_local_retries = map->choose_local_tries;
int choose_local_fallback_retries = map->choose_local_fallback_tries;
int vary_r = map->chooseleaf_vary_r;
if ((__u32)ruleno >= map->max_rules) { if ((__u32)ruleno >= map->max_rules) {
dprintk(" bad ruleno %d\n", ruleno); dprintk(" bad ruleno %d\n", ruleno);
...@@ -676,13 +700,18 @@ int crush_do_rule(const struct crush_map *map, ...@@ -676,13 +700,18 @@ int crush_do_rule(const struct crush_map *map,
break; break;
case CRUSH_RULE_SET_CHOOSE_LOCAL_TRIES: case CRUSH_RULE_SET_CHOOSE_LOCAL_TRIES:
if (curstep->arg1 > 0) if (curstep->arg1 >= 0)
choose_local_tries = curstep->arg1; choose_local_retries = curstep->arg1;
break; break;
case CRUSH_RULE_SET_CHOOSE_LOCAL_FALLBACK_TRIES: case CRUSH_RULE_SET_CHOOSE_LOCAL_FALLBACK_TRIES:
if (curstep->arg1 > 0) if (curstep->arg1 >= 0)
choose_local_fallback_tries = curstep->arg1; choose_local_fallback_retries = curstep->arg1;
break;
case CRUSH_RULE_SET_CHOOSELEAF_VARY_R:
if (curstep->arg1 >= 0)
vary_r = curstep->arg1;
break; break;
case CRUSH_RULE_CHOOSELEAF_FIRSTN: case CRUSH_RULE_CHOOSELEAF_FIRSTN:
...@@ -734,10 +763,12 @@ int crush_do_rule(const struct crush_map *map, ...@@ -734,10 +763,12 @@ int crush_do_rule(const struct crush_map *map,
o+osize, j, o+osize, j,
choose_tries, choose_tries,
recurse_tries, recurse_tries,
choose_local_tries, choose_local_retries,
choose_local_fallback_tries, choose_local_fallback_retries,
recurse_to_leaf, recurse_to_leaf,
c+osize); vary_r,
c+osize,
0);
} else { } else {
crush_choose_indep( crush_choose_indep(
map, map,
......
...@@ -53,34 +53,55 @@ static int osdmap_show(struct seq_file *s, void *p) ...@@ -53,34 +53,55 @@ static int osdmap_show(struct seq_file *s, void *p)
{ {
int i; int i;
struct ceph_client *client = s->private; struct ceph_client *client = s->private;
struct ceph_osdmap *map = client->osdc.osdmap;
struct rb_node *n; struct rb_node *n;
if (client->osdc.osdmap == NULL) if (map == NULL)
return 0; return 0;
seq_printf(s, "epoch %d\n", client->osdc.osdmap->epoch);
seq_printf(s, "epoch %d\n", map->epoch);
seq_printf(s, "flags%s%s\n", seq_printf(s, "flags%s%s\n",
(client->osdc.osdmap->flags & CEPH_OSDMAP_NEARFULL) ? (map->flags & CEPH_OSDMAP_NEARFULL) ? " NEARFULL" : "",
" NEARFULL" : "", (map->flags & CEPH_OSDMAP_FULL) ? " FULL" : "");
(client->osdc.osdmap->flags & CEPH_OSDMAP_FULL) ?
" FULL" : ""); for (n = rb_first(&map->pg_pools); n; n = rb_next(n)) {
for (n = rb_first(&client->osdc.osdmap->pg_pools); n; n = rb_next(n)) {
struct ceph_pg_pool_info *pool = struct ceph_pg_pool_info *pool =
rb_entry(n, struct ceph_pg_pool_info, node); rb_entry(n, struct ceph_pg_pool_info, node);
seq_printf(s, "pg_pool %llu pg_num %d / %d\n",
(unsigned long long)pool->id, pool->pg_num, seq_printf(s, "pool %lld pg_num %u (%d) read_tier %lld write_tier %lld\n",
pool->pg_num_mask); pool->id, pool->pg_num, pool->pg_num_mask,
pool->read_tier, pool->write_tier);
} }
for (i = 0; i < client->osdc.osdmap->max_osd; i++) { for (i = 0; i < map->max_osd; i++) {
struct ceph_entity_addr *addr = struct ceph_entity_addr *addr = &map->osd_addr[i];
&client->osdc.osdmap->osd_addr[i]; int state = map->osd_state[i];
int state = client->osdc.osdmap->osd_state[i];
char sb[64]; char sb[64];
seq_printf(s, "\tosd%d\t%s\t%3d%%\t(%s)\n", seq_printf(s, "osd%d\t%s\t%3d%%\t(%s)\t%3d%%\n",
i, ceph_pr_addr(&addr->in_addr), i, ceph_pr_addr(&addr->in_addr),
((client->osdc.osdmap->osd_weight[i]*100) >> 16), ((map->osd_weight[i]*100) >> 16),
ceph_osdmap_state_str(sb, sizeof(sb), state)); ceph_osdmap_state_str(sb, sizeof(sb), state),
((ceph_get_primary_affinity(map, i)*100) >> 16));
}
for (n = rb_first(&map->pg_temp); n; n = rb_next(n)) {
struct ceph_pg_mapping *pg =
rb_entry(n, struct ceph_pg_mapping, node);
seq_printf(s, "pg_temp %llu.%x [", pg->pgid.pool,
pg->pgid.seed);
for (i = 0; i < pg->pg_temp.len; i++)
seq_printf(s, "%s%d", (i == 0 ? "" : ","),
pg->pg_temp.osds[i]);
seq_printf(s, "]\n");
} }
for (n = rb_first(&map->primary_temp); n; n = rb_next(n)) {
struct ceph_pg_mapping *pg =
rb_entry(n, struct ceph_pg_mapping, node);
seq_printf(s, "primary_temp %llu.%x %d\n", pg->pgid.pool,
pg->pgid.seed, pg->primary_temp.osd);
}
return 0; return 0;
} }
......
...@@ -919,6 +919,9 @@ static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor *cursor, ...@@ -919,6 +919,9 @@ static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor *cursor,
if (!bytes || cursor->page_offset) if (!bytes || cursor->page_offset)
return false; /* more bytes to process in the current page */ return false; /* more bytes to process in the current page */
if (!cursor->resid)
return false; /* no more data */
/* Move on to the next page; offset is already at 0 */ /* Move on to the next page; offset is already at 0 */
BUG_ON(cursor->page_index >= cursor->page_count); BUG_ON(cursor->page_index >= cursor->page_count);
...@@ -1004,6 +1007,9 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor, ...@@ -1004,6 +1007,9 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
if (!bytes || cursor->offset & ~PAGE_MASK) if (!bytes || cursor->offset & ~PAGE_MASK)
return false; /* more bytes to process in the current page */ return false; /* more bytes to process in the current page */
if (!cursor->resid)
return false; /* no more data */
/* Move on to the next page */ /* Move on to the next page */
BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head)); BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head));
......
...@@ -436,6 +436,7 @@ static bool osd_req_opcode_valid(u16 opcode) ...@@ -436,6 +436,7 @@ static bool osd_req_opcode_valid(u16 opcode)
case CEPH_OSD_OP_OMAPCLEAR: case CEPH_OSD_OP_OMAPCLEAR:
case CEPH_OSD_OP_OMAPRMKEYS: case CEPH_OSD_OP_OMAPRMKEYS:
case CEPH_OSD_OP_OMAP_CMP: case CEPH_OSD_OP_OMAP_CMP:
case CEPH_OSD_OP_SETALLOCHINT:
case CEPH_OSD_OP_CLONERANGE: case CEPH_OSD_OP_CLONERANGE:
case CEPH_OSD_OP_ASSERT_SRC_VERSION: case CEPH_OSD_OP_ASSERT_SRC_VERSION:
case CEPH_OSD_OP_SRC_CMPXATTR: case CEPH_OSD_OP_SRC_CMPXATTR:
...@@ -591,6 +592,26 @@ void osd_req_op_watch_init(struct ceph_osd_request *osd_req, ...@@ -591,6 +592,26 @@ void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
} }
EXPORT_SYMBOL(osd_req_op_watch_init); EXPORT_SYMBOL(osd_req_op_watch_init);
void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
unsigned int which,
u64 expected_object_size,
u64 expected_write_size)
{
struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
CEPH_OSD_OP_SETALLOCHINT);
op->alloc_hint.expected_object_size = expected_object_size;
op->alloc_hint.expected_write_size = expected_write_size;
/*
* CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
* not worth a feature bit. Set FAILOK per-op flag to make
* sure older osds don't trip over an unsupported opcode.
*/
op->flags |= CEPH_OSD_OP_FLAG_FAILOK;
}
EXPORT_SYMBOL(osd_req_op_alloc_hint_init);
static void ceph_osdc_msg_data_add(struct ceph_msg *msg, static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
struct ceph_osd_data *osd_data) struct ceph_osd_data *osd_data)
{ {
...@@ -681,6 +702,12 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req, ...@@ -681,6 +702,12 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
dst->watch.ver = cpu_to_le64(src->watch.ver); dst->watch.ver = cpu_to_le64(src->watch.ver);
dst->watch.flag = src->watch.flag; dst->watch.flag = src->watch.flag;
break; break;
case CEPH_OSD_OP_SETALLOCHINT:
dst->alloc_hint.expected_object_size =
cpu_to_le64(src->alloc_hint.expected_object_size);
dst->alloc_hint.expected_write_size =
cpu_to_le64(src->alloc_hint.expected_write_size);
break;
default: default:
pr_err("unsupported osd opcode %s\n", pr_err("unsupported osd opcode %s\n",
ceph_osd_op_name(src->op)); ceph_osd_op_name(src->op));
...@@ -688,7 +715,9 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req, ...@@ -688,7 +715,9 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
return 0; return 0;
} }
dst->op = cpu_to_le16(src->op); dst->op = cpu_to_le16(src->op);
dst->flags = cpu_to_le32(src->flags);
dst->payload_len = cpu_to_le32(src->payload_len); dst->payload_len = cpu_to_le32(src->payload_len);
return request_data_len; return request_data_len;
...@@ -1304,7 +1333,7 @@ static int __map_request(struct ceph_osd_client *osdc, ...@@ -1304,7 +1333,7 @@ static int __map_request(struct ceph_osd_client *osdc,
{ {
struct ceph_pg pgid; struct ceph_pg pgid;
int acting[CEPH_PG_MAX_SIZE]; int acting[CEPH_PG_MAX_SIZE];
int o = -1, num = 0; int num, o;
int err; int err;
bool was_paused; bool was_paused;
...@@ -1317,11 +1346,9 @@ static int __map_request(struct ceph_osd_client *osdc, ...@@ -1317,11 +1346,9 @@ static int __map_request(struct ceph_osd_client *osdc,
} }
req->r_pgid = pgid; req->r_pgid = pgid;
err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting); num = ceph_calc_pg_acting(osdc->osdmap, pgid, acting, &o);
if (err > 0) { if (num < 0)
o = acting[0]; num = 0;
num = err;
}
was_paused = req->r_paused; was_paused = req->r_paused;
req->r_paused = __req_should_be_paused(osdc, req); req->r_paused = __req_should_be_paused(osdc, req);
...@@ -2033,7 +2060,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) ...@@ -2033,7 +2060,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
int skipped_map = 0; int skipped_map = 0;
dout("taking full map %u len %d\n", epoch, maplen); dout("taking full map %u len %d\n", epoch, maplen);
newmap = osdmap_decode(&p, p+maplen); newmap = ceph_osdmap_decode(&p, p+maplen);
if (IS_ERR(newmap)) { if (IS_ERR(newmap)) {
err = PTR_ERR(newmap); err = PTR_ERR(newmap);
goto bad; goto bad;
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment