Commit 0ffaa9c8 authored by Henri Doreau's avatar Henri Doreau Committed by Greg Kroah-Hartman

staging: lustre: hsm: Use file lease to implement migration

Implement non-blocking migration based on exclusive open instead of
group lock. Implemented exclusive close operation to atomically put
a lease, swap two layouts and close a file. This allows race-free
migrations.

Make the caller responsible for retrying on failure (EBUSY, EAGAIN)
in non-blocking mode.

In blocking mode, allow applications to trigger layout swaps using a
grouplock they already own, to prevent race conditions between the
actual data copy and the layout swap. Updated lfs accordingly. File
leases are also taken in blocking mode, so that lfs migrate can issue
a warning if an application attempts to open a file that is being
migrated and gets blocked.

Timestamps (atime/mtime) are set from userland, after the layout swap
is performed, to prevent conflicts with the grouplock.

lli_trunc_sem is taken/released in the vvp_io layer, under the DLM
lock. This re-ordering fixes the original issue between truncate and
migrate.
Signed-off-by: default avatarHenri Doreau <henri.doreau@cea.fr>
Signed-off-by: default avatarJinshan Xiong <jinshan.xiong@intel.com>
Intel-bug-id: https://jira.hpdd.intel.com/browse/LU-4840
Reviewed-on: http://review.whamcloud.com/10013Reviewed-by: default avatarJohn L. Hammond <john.hammond@intel.com>
Reviewed-by: default avatarfrank zago <fzago@cray.com>
Reviewed-by: default avatarOleg Drokin <oleg.drokin@intel.com>
Signed-off-by: default avatarJames Simmons <jsimmons@infradead.org>
Signed-off-by: default avatarGreg Kroah-Hartman <gregkh@linuxfoundation.org>
parent b73d803b
......@@ -1604,7 +1604,9 @@ lov_mds_md_max_stripe_count(size_t buf_size, __u32 lmm_magic)
/* OBD_MD_FLRMTRGETFACL (0x0008000000000000ULL) lfs rgetfacl, obsolete */
#define OBD_MD_FLDATAVERSION (0x0010000000000000ULL) /* iversion sum */
#define OBD_MD_FLRELEASED (0x0020000000000000ULL) /* file released */
#define OBD_MD_CLOSE_INTENT_EXECED (0x0020000000000000ULL) /* close intent
* executed
*/
#define OBD_MD_DEFAULT_MEA (0x0040000000000000ULL) /* default MEA */
......@@ -2139,6 +2141,7 @@ enum mds_op_bias {
MDS_OWNEROVERRIDE = 1 << 11,
MDS_HSM_RELEASE = 1 << 12,
MDS_RENAME_MIGRATE = BIT(13),
MDS_CLOSE_LAYOUT_SWAP = BIT(14),
};
/* instance of mdt_reint_rec */
......
......@@ -645,6 +645,7 @@ struct if_quotactl {
#define SWAP_LAYOUTS_CHECK_DV2 (1 << 1)
#define SWAP_LAYOUTS_KEEP_MTIME (1 << 2)
#define SWAP_LAYOUTS_KEEP_ATIME (1 << 3)
#define SWAP_LAYOUTS_CLOSE BIT(4)
/* Swap XATTR_NAME_HSM as well, only on the MDT so far */
#define SWAP_LAYOUTS_MDS_HSM (1 << 31)
......
......@@ -148,7 +148,7 @@ extern struct req_format RQF_MDS_GETATTR;
*/
extern struct req_format RQF_MDS_GETATTR_NAME;
extern struct req_format RQF_MDS_CLOSE;
extern struct req_format RQF_MDS_RELEASE_CLOSE;
extern struct req_format RQF_MDS_INTENT_CLOSE;
extern struct req_format RQF_MDS_CONNECT;
extern struct req_format RQF_MDS_DISCONNECT;
extern struct req_format RQF_MDS_GET_INFO;
......
This diff is collapsed.
......@@ -1567,11 +1567,7 @@ int ll_setattr_raw(struct dentry *dentry, struct iattr *attr, bool hsm_import)
* setting times to past, but it is necessary due to possible
* time de-synchronization between MDT inode and OST objects
*/
if (attr->ia_valid & ATTR_SIZE)
down_write(&lli->lli_trunc_sem);
rc = cl_setattr_ost(ll_i2info(inode)->lli_clob, attr, 0);
if (attr->ia_valid & ATTR_SIZE)
up_write(&lli->lli_trunc_sem);
}
out:
if (op_data)
......
......@@ -589,14 +589,6 @@ static int vvp_do_vmtruncate(struct inode *inode, size_t size)
return result;
}
static int vvp_io_setattr_trunc(const struct lu_env *env,
const struct cl_io_slice *ios,
struct inode *inode, loff_t size)
{
inode_dio_wait(inode);
return 0;
}
static int vvp_io_setattr_time(const struct lu_env *env,
const struct cl_io_slice *ios)
{
......@@ -627,15 +619,20 @@ static int vvp_io_setattr_start(const struct lu_env *env,
{
struct cl_io *io = ios->cis_io;
struct inode *inode = vvp_object_inode(io->ci_obj);
int result = 0;
struct ll_inode_info *lli = ll_i2info(inode);
if (cl_io_is_trunc(io)) {
down_write(&lli->lli_trunc_sem);
inode_lock(inode);
if (cl_io_is_trunc(io))
result = vvp_io_setattr_trunc(env, ios, inode,
io->u.ci_setattr.sa_attr.lvb_size);
if (!result && io->u.ci_setattr.sa_valid & TIMES_SET_FLAGS)
result = vvp_io_setattr_time(env, ios);
return result;
inode_dio_wait(inode);
} else {
inode_lock(inode);
}
if (io->u.ci_setattr.sa_valid & TIMES_SET_FLAGS)
return vvp_io_setattr_time(env, ios);
return 0;
}
static void vvp_io_setattr_end(const struct lu_env *env,
......@@ -643,14 +640,18 @@ static void vvp_io_setattr_end(const struct lu_env *env,
{
struct cl_io *io = ios->cis_io;
struct inode *inode = vvp_object_inode(io->ci_obj);
struct ll_inode_info *lli = ll_i2info(inode);
if (cl_io_is_trunc(io))
if (cl_io_is_trunc(io)) {
/* Truncate in memory pages - they must be clean pages
* because osc has already notified to destroy osc_extents.
*/
vvp_do_vmtruncate(inode, io->u.ci_setattr.sa_attr.lvb_size);
inode_unlock(inode);
up_write(&lli->lli_trunc_sem);
} else {
inode_unlock(inode);
}
}
static void vvp_io_setattr_fini(const struct lu_env *env,
......@@ -666,6 +667,7 @@ static int vvp_io_read_start(const struct lu_env *env,
struct cl_io *io = ios->cis_io;
struct cl_object *obj = io->ci_obj;
struct inode *inode = vvp_object_inode(obj);
struct ll_inode_info *lli = ll_i2info(inode);
struct file *file = vio->vui_fd->fd_file;
int result;
......@@ -678,6 +680,8 @@ static int vvp_io_read_start(const struct lu_env *env,
CDEBUG(D_VFSTRACE, "read: -> [%lli, %lli)\n", pos, pos + cnt);
down_read(&lli->lli_trunc_sem);
if (!can_populate_pages(env, io, inode))
return 0;
......@@ -903,10 +907,13 @@ static int vvp_io_write_start(const struct lu_env *env,
struct cl_io *io = ios->cis_io;
struct cl_object *obj = io->ci_obj;
struct inode *inode = vvp_object_inode(obj);
struct ll_inode_info *lli = ll_i2info(inode);
ssize_t result = 0;
loff_t pos = io->u.ci_wr.wr.crw_pos;
size_t cnt = io->u.ci_wr.wr.crw_count;
down_read(&lli->lli_trunc_sem);
if (!can_populate_pages(env, io, inode))
return 0;
......@@ -990,6 +997,15 @@ static int vvp_io_write_start(const struct lu_env *env,
return result;
}
static void vvp_io_rw_end(const struct lu_env *env,
const struct cl_io_slice *ios)
{
struct inode *inode = vvp_object_inode(ios->cis_obj);
struct ll_inode_info *lli = ll_i2info(inode);
up_read(&lli->lli_trunc_sem);
}
static int vvp_io_kernel_fault(struct vvp_fault_io *cfio)
{
struct vm_fault *vmf = cfio->ft_vmf;
......@@ -1042,6 +1058,7 @@ static int vvp_io_fault_start(const struct lu_env *env,
struct cl_io *io = ios->cis_io;
struct cl_object *obj = io->ci_obj;
struct inode *inode = vvp_object_inode(obj);
struct ll_inode_info *lli = ll_i2info(inode);
struct cl_fault_io *fio = &io->u.ci_fault;
struct vvp_fault_io *cfio = &vio->u.fault;
loff_t offset;
......@@ -1057,6 +1074,8 @@ static int vvp_io_fault_start(const struct lu_env *env,
" changed while waiting for the page fault lock\n",
PFID(lu_object_fid(&obj->co_lu)));
down_read(&lli->lli_trunc_sem);
/* offset of the last byte on the page */
offset = cl_offset(obj, fio->ft_index + 1) - 1;
LASSERT(cl_index(obj, offset) == fio->ft_index);
......@@ -1204,6 +1223,17 @@ static int vvp_io_fault_start(const struct lu_env *env,
return result;
}
static void vvp_io_fault_end(const struct lu_env *env,
const struct cl_io_slice *ios)
{
struct inode *inode = vvp_object_inode(ios->cis_obj);
struct ll_inode_info *lli = ll_i2info(inode);
CLOBINVRNT(env, ios->cis_io->ci_obj,
vvp_object_invariant(ios->cis_io->ci_obj));
up_read(&lli->lli_trunc_sem);
}
static int vvp_io_fsync_start(const struct lu_env *env,
const struct cl_io_slice *ios)
{
......@@ -1233,18 +1263,13 @@ static int vvp_io_read_ahead(const struct lu_env *env,
return result;
}
static void vvp_io_end(const struct lu_env *env, const struct cl_io_slice *ios)
{
CLOBINVRNT(env, ios->cis_io->ci_obj,
vvp_object_invariant(ios->cis_io->ci_obj));
}
static const struct cl_io_operations vvp_io_ops = {
.op = {
[CIT_READ] = {
.cio_fini = vvp_io_fini,
.cio_lock = vvp_io_read_lock,
.cio_start = vvp_io_read_start,
.cio_end = vvp_io_rw_end,
.cio_advance = vvp_io_advance,
},
[CIT_WRITE] = {
......@@ -1253,6 +1278,7 @@ static const struct cl_io_operations vvp_io_ops = {
.cio_iter_fini = vvp_io_write_iter_fini,
.cio_lock = vvp_io_write_lock,
.cio_start = vvp_io_write_start,
.cio_end = vvp_io_rw_end,
.cio_advance = vvp_io_advance,
},
[CIT_SETATTR] = {
......@@ -1267,7 +1293,7 @@ static const struct cl_io_operations vvp_io_ops = {
.cio_iter_init = vvp_io_fault_iter_init,
.cio_lock = vvp_io_fault_lock,
.cio_start = vvp_io_fault_start,
.cio_end = vvp_io_end,
.cio_end = vvp_io_fault_end,
},
[CIT_FSYNC] = {
.cio_start = vvp_io_fsync_start,
......
......@@ -430,14 +430,19 @@ void mdc_getattr_pack(struct ptlrpc_request *req, __u64 valid, u32 flags,
op_data->op_namelen);
}
static void mdc_hsm_release_pack(struct ptlrpc_request *req,
static void mdc_intent_close_pack(struct ptlrpc_request *req,
struct md_op_data *op_data)
{
if (op_data->op_bias & MDS_HSM_RELEASE) {
enum mds_op_bias bias = op_data->op_bias;
struct close_data *data;
struct ldlm_lock *lock;
if (!(bias & (MDS_HSM_RELEASE | MDS_CLOSE_LAYOUT_SWAP |
MDS_RENAME_MIGRATE)))
return;
data = req_capsule_client_get(&req->rq_pill, &RMF_CLOSE_DATA);
LASSERT(data);
lock = ldlm_handle2lock(&op_data->op_lease_handle);
if (lock) {
......@@ -448,7 +453,6 @@ static void mdc_hsm_release_pack(struct ptlrpc_request *req,
data->cd_data_version = op_data->op_data_version;
data->cd_fid = op_data->op_fid2;
}
}
void mdc_close_pack(struct ptlrpc_request *req, struct md_op_data *op_data)
......@@ -473,5 +477,5 @@ void mdc_close_pack(struct ptlrpc_request *req, struct md_op_data *op_data)
rec->sa_valid &= ~MDS_ATTR_ATIME;
mdc_ioepoch_pack(epoch, op_data);
mdc_hsm_release_pack(req, op_data);
mdc_intent_close_pack(req, op_data);
}
......@@ -704,9 +704,8 @@ static int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
int rc;
int saved_rc = 0;
req_fmt = &RQF_MDS_CLOSE;
if (op_data->op_bias & MDS_HSM_RELEASE) {
req_fmt = &RQF_MDS_RELEASE_CLOSE;
req_fmt = &RQF_MDS_INTENT_CLOSE;
/* allocate a FID for volatile file */
rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
......@@ -716,6 +715,10 @@ static int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
/* save the errcode and proceed to close */
saved_rc = rc;
}
} else if (op_data->op_bias & MDS_CLOSE_LAYOUT_SWAP) {
req_fmt = &RQF_MDS_INTENT_CLOSE;
} else {
req_fmt = &RQF_MDS_CLOSE;
}
*request = NULL;
......
......@@ -121,7 +121,7 @@ static const struct req_msg_field *mdt_close_client[] = {
&RMF_CAPA1
};
static const struct req_msg_field *mdt_release_close_client[] = {
static const struct req_msg_field *mdt_intent_close_client[] = {
&RMF_PTLRPC_BODY,
&RMF_MDT_EPOCH,
&RMF_REC_REINT,
......@@ -666,7 +666,7 @@ static struct req_format *req_formats[] = {
&RQF_MDS_GETXATTR,
&RQF_MDS_SYNC,
&RQF_MDS_CLOSE,
&RQF_MDS_RELEASE_CLOSE,
&RQF_MDS_INTENT_CLOSE,
&RQF_MDS_READPAGE,
&RQF_MDS_WRITEPAGE,
&RQF_MDS_REINT,
......@@ -1365,10 +1365,10 @@ struct req_format RQF_MDS_CLOSE =
mdt_close_client, mds_last_unlink_server);
EXPORT_SYMBOL(RQF_MDS_CLOSE);
struct req_format RQF_MDS_RELEASE_CLOSE =
struct req_format RQF_MDS_INTENT_CLOSE =
DEFINE_REQ_FMT0("MDS_CLOSE",
mdt_release_close_client, mds_last_unlink_server);
EXPORT_SYMBOL(RQF_MDS_RELEASE_CLOSE);
mdt_intent_close_client, mds_last_unlink_server);
EXPORT_SYMBOL(RQF_MDS_INTENT_CLOSE);
struct req_format RQF_MDS_READPAGE =
DEFINE_REQ_FMT0("MDS_READPAGE",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment