Commit fb01d1f8 authored by Yan, Zheng's avatar Yan, Zheng Committed by Ilya Dryomov

ceph: parse inline data in MClientReply and MClientCaps

Signed-off-by: default avatarYan, Zheng <zyan@redhat.com>
parent 715e4cd4
...@@ -2383,6 +2383,8 @@ static void invalidate_aliases(struct inode *inode) ...@@ -2383,6 +2383,8 @@ static void invalidate_aliases(struct inode *inode)
static void handle_cap_grant(struct ceph_mds_client *mdsc, static void handle_cap_grant(struct ceph_mds_client *mdsc,
struct inode *inode, struct ceph_mds_caps *grant, struct inode *inode, struct ceph_mds_caps *grant,
void *snaptrace, int snaptrace_len, void *snaptrace, int snaptrace_len,
u64 inline_version,
void *inline_data, int inline_len,
struct ceph_buffer *xattr_buf, struct ceph_buffer *xattr_buf,
struct ceph_mds_session *session, struct ceph_mds_session *session,
struct ceph_cap *cap, int issued) struct ceph_cap *cap, int issued)
...@@ -2996,11 +2998,12 @@ void ceph_handle_caps(struct ceph_mds_session *session, ...@@ -2996,11 +2998,12 @@ void ceph_handle_caps(struct ceph_mds_session *session,
u64 cap_id; u64 cap_id;
u64 size, max_size; u64 size, max_size;
u64 tid; u64 tid;
u64 inline_version = 0;
void *inline_data = NULL;
u32 inline_len = 0;
void *snaptrace; void *snaptrace;
size_t snaptrace_len; size_t snaptrace_len;
void *flock; void *p, *end;
void *end;
u32 flock_len;
dout("handle_caps from mds%d\n", mds); dout("handle_caps from mds%d\n", mds);
...@@ -3021,30 +3024,37 @@ void ceph_handle_caps(struct ceph_mds_session *session, ...@@ -3021,30 +3024,37 @@ void ceph_handle_caps(struct ceph_mds_session *session,
snaptrace = h + 1; snaptrace = h + 1;
snaptrace_len = le32_to_cpu(h->snap_trace_len); snaptrace_len = le32_to_cpu(h->snap_trace_len);
p = snaptrace + snaptrace_len;
if (le16_to_cpu(msg->hdr.version) >= 2) { if (le16_to_cpu(msg->hdr.version) >= 2) {
void *p = snaptrace + snaptrace_len; u32 flock_len;
ceph_decode_32_safe(&p, end, flock_len, bad); ceph_decode_32_safe(&p, end, flock_len, bad);
if (p + flock_len > end) if (p + flock_len > end)
goto bad; goto bad;
flock = p; p += flock_len;
} else {
flock = NULL;
flock_len = 0;
} }
if (le16_to_cpu(msg->hdr.version) >= 3) { if (le16_to_cpu(msg->hdr.version) >= 3) {
if (op == CEPH_CAP_OP_IMPORT) { if (op == CEPH_CAP_OP_IMPORT) {
void *p = flock + flock_len;
if (p + sizeof(*peer) > end) if (p + sizeof(*peer) > end)
goto bad; goto bad;
peer = p; peer = p;
p += sizeof(*peer);
} else if (op == CEPH_CAP_OP_EXPORT) { } else if (op == CEPH_CAP_OP_EXPORT) {
/* recorded in unused fields */ /* recorded in unused fields */
peer = (void *)&h->size; peer = (void *)&h->size;
} }
} }
if (le16_to_cpu(msg->hdr.version) >= 4) {
ceph_decode_64_safe(&p, end, inline_version, bad);
ceph_decode_32_safe(&p, end, inline_len, bad);
if (p + inline_len > end)
goto bad;
inline_data = p;
p += inline_len;
}
/* lookup ino */ /* lookup ino */
inode = ceph_find_inode(sb, vino); inode = ceph_find_inode(sb, vino);
ci = ceph_inode(inode); ci = ceph_inode(inode);
...@@ -3085,6 +3095,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, ...@@ -3085,6 +3095,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
handle_cap_import(mdsc, inode, h, peer, session, handle_cap_import(mdsc, inode, h, peer, session,
&cap, &issued); &cap, &issued);
handle_cap_grant(mdsc, inode, h, snaptrace, snaptrace_len, handle_cap_grant(mdsc, inode, h, snaptrace, snaptrace_len,
inline_version, inline_data, inline_len,
msg->middle, session, cap, issued); msg->middle, session, cap, issued);
goto done_unlocked; goto done_unlocked;
} }
...@@ -3105,8 +3116,9 @@ void ceph_handle_caps(struct ceph_mds_session *session, ...@@ -3105,8 +3116,9 @@ void ceph_handle_caps(struct ceph_mds_session *session,
case CEPH_CAP_OP_GRANT: case CEPH_CAP_OP_GRANT:
__ceph_caps_issued(ci, &issued); __ceph_caps_issued(ci, &issued);
issued |= __ceph_caps_dirty(ci); issued |= __ceph_caps_dirty(ci);
handle_cap_grant(mdsc, inode, h, NULL, 0, msg->middle, handle_cap_grant(mdsc, inode, h, NULL, 0,
session, cap, issued); inline_version, inline_data, inline_len,
msg->middle, session, cap, issued);
goto done_unlocked; goto done_unlocked;
case CEPH_CAP_OP_FLUSH_ACK: case CEPH_CAP_OP_FLUSH_ACK:
......
...@@ -89,6 +89,16 @@ static int parse_reply_info_in(void **p, void *end, ...@@ -89,6 +89,16 @@ static int parse_reply_info_in(void **p, void *end,
ceph_decode_need(p, end, info->xattr_len, bad); ceph_decode_need(p, end, info->xattr_len, bad);
info->xattr_data = *p; info->xattr_data = *p;
*p += info->xattr_len; *p += info->xattr_len;
if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
ceph_decode_64_safe(p, end, info->inline_version, bad);
ceph_decode_32_safe(p, end, info->inline_len, bad);
ceph_decode_need(p, end, info->inline_len, bad);
info->inline_data = *p;
*p += info->inline_len;
} else
info->inline_version = CEPH_INLINE_NONE;
return 0; return 0;
bad: bad:
return err; return err;
......
...@@ -41,6 +41,9 @@ struct ceph_mds_reply_info_in { ...@@ -41,6 +41,9 @@ struct ceph_mds_reply_info_in {
char *symlink; char *symlink;
u32 xattr_len; u32 xattr_len;
char *xattr_data; char *xattr_data;
u64 inline_version;
u32 inline_len;
char *inline_data;
}; };
/* /*
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment