Commit e55b71f8 authored by Greg Farnum's avatar Greg Farnum Committed by Sage Weil

ceph: handle ESTALE properly; on receipt send to authority if it wasn't

Signed-off-by: default avatarGreg Farnum <gregf@hq.newdream.net>
Signed-off-by: default avatarSage Weil <sage@newdream.net>
parent 2bc50259
...@@ -1628,6 +1628,15 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc, ...@@ -1628,6 +1628,15 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
req->r_mds = mds; req->r_mds = mds;
req->r_attempts++; req->r_attempts++;
if (req->r_inode) {
struct ceph_cap *cap =
ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
if (cap)
req->r_sent_on_mseq = cap->mseq;
else
req->r_sent_on_mseq = -1;
}
dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req, dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts); req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
...@@ -1962,21 +1971,39 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) ...@@ -1962,21 +1971,39 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
result = le32_to_cpu(head->result); result = le32_to_cpu(head->result);
/* /*
* Tolerate 2 consecutive ESTALEs from the same mds. * Handle an ESTALE
* FIXME: we should be looking at the cap migrate_seq. * if we're not talking to the authority, send to them
* if the authority has changed while we weren't looking,
* send to new authority
* Otherwise we just have to return an ESTALE
*/ */
if (result == -ESTALE) { if (result == -ESTALE) {
req->r_direct_mode = USE_AUTH_MDS; dout("got ESTALE on request %llu", req->r_tid);
req->r_num_stale++; if (!req->r_inode) ; //do nothing; not an authority problem
if (req->r_num_stale <= 2) { else if (req->r_direct_mode != USE_AUTH_MDS) {
dout("not using auth, setting for that now");
req->r_direct_mode = USE_AUTH_MDS;
__do_request(mdsc, req); __do_request(mdsc, req);
mutex_unlock(&mdsc->mutex); mutex_unlock(&mdsc->mutex);
goto out; goto out;
} else {
struct ceph_inode_info *ci = ceph_inode(req->r_inode);
struct ceph_cap *cap =
ceph_get_cap_for_mds(ci, req->r_mds);;
dout("already using auth");
if ((!cap || cap != ci->i_auth_cap) ||
(cap->mseq != req->r_sent_on_mseq)) {
dout("but cap changed, so resending");
__do_request(mdsc, req);
mutex_unlock(&mdsc->mutex);
goto out;
}
} }
} else { dout("have to return ESTALE on request %llu", req->r_tid);
req->r_num_stale = 0;
} }
if (head->safe) { if (head->safe) {
req->r_got_safe = true; req->r_got_safe = true;
__unregister_request(mdsc, req); __unregister_request(mdsc, req);
......
...@@ -208,8 +208,8 @@ struct ceph_mds_request { ...@@ -208,8 +208,8 @@ struct ceph_mds_request {
int r_attempts; /* resend attempts */ int r_attempts; /* resend attempts */
int r_num_fwd; /* number of forward attempts */ int r_num_fwd; /* number of forward attempts */
int r_num_stale;
int r_resend_mds; /* mds to resend to next, if any*/ int r_resend_mds; /* mds to resend to next, if any*/
u32 r_sent_on_mseq; /* cap mseq request was sent at*/
struct kref r_kref; struct kref r_kref;
struct list_head r_wait; struct list_head r_wait;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment