Commit 0da5d703 authored by Sage Weil's avatar Sage Weil

libceph: handle connection reopen race with callbacks

If a connection is closed and/or reopened (ceph_con_close, ceph_con_open)
it can race with a callback.  con_work does various state checks for
closed or reopened sockets at the beginning, but drops con->mutex before
making callbacks.  We need to check for state bit changes after retaking
the lock to ensure we restart con_work and execute those CLOSED/OPENING
tests or else we may end up operating under stale assumptions.

In Jim's case, this was causing 'bad tag' errors.

There are four cases where we re-take the con->mutex inside con_work: catch
them all and return EAGAIN from try_{read,write} so that we can restart
con_work.
Reported-by: default avatarJim Schutt <jaschut@sandia.gov>
Tested-by: default avatarJim Schutt <jaschut@sandia.gov>
Signed-off-by: default avatarSage Weil <sage@newdream.net>
parent 3b663780
...@@ -598,7 +598,7 @@ static void prepare_write_keepalive(struct ceph_connection *con) ...@@ -598,7 +598,7 @@ static void prepare_write_keepalive(struct ceph_connection *con)
* Connection negotiation. * Connection negotiation.
*/ */
static void prepare_connect_authorizer(struct ceph_connection *con) static int prepare_connect_authorizer(struct ceph_connection *con)
{ {
void *auth_buf; void *auth_buf;
int auth_len = 0; int auth_len = 0;
...@@ -612,6 +612,10 @@ static void prepare_connect_authorizer(struct ceph_connection *con) ...@@ -612,6 +612,10 @@ static void prepare_connect_authorizer(struct ceph_connection *con)
con->auth_retry); con->auth_retry);
mutex_lock(&con->mutex); mutex_lock(&con->mutex);
if (test_bit(CLOSED, &con->state) ||
test_bit(OPENING, &con->state))
return -EAGAIN;
con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol); con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol);
con->out_connect.authorizer_len = cpu_to_le32(auth_len); con->out_connect.authorizer_len = cpu_to_le32(auth_len);
...@@ -619,6 +623,8 @@ static void prepare_connect_authorizer(struct ceph_connection *con) ...@@ -619,6 +623,8 @@ static void prepare_connect_authorizer(struct ceph_connection *con)
con->out_kvec[con->out_kvec_left].iov_len = auth_len; con->out_kvec[con->out_kvec_left].iov_len = auth_len;
con->out_kvec_left++; con->out_kvec_left++;
con->out_kvec_bytes += auth_len; con->out_kvec_bytes += auth_len;
return 0;
} }
/* /*
...@@ -640,9 +646,9 @@ static void prepare_write_banner(struct ceph_messenger *msgr, ...@@ -640,9 +646,9 @@ static void prepare_write_banner(struct ceph_messenger *msgr,
set_bit(WRITE_PENDING, &con->state); set_bit(WRITE_PENDING, &con->state);
} }
static void prepare_write_connect(struct ceph_messenger *msgr, static int prepare_write_connect(struct ceph_messenger *msgr,
struct ceph_connection *con, struct ceph_connection *con,
int after_banner) int after_banner)
{ {
unsigned global_seq = get_global_seq(con->msgr, 0); unsigned global_seq = get_global_seq(con->msgr, 0);
int proto; int proto;
...@@ -683,7 +689,7 @@ static void prepare_write_connect(struct ceph_messenger *msgr, ...@@ -683,7 +689,7 @@ static void prepare_write_connect(struct ceph_messenger *msgr,
con->out_more = 0; con->out_more = 0;
set_bit(WRITE_PENDING, &con->state); set_bit(WRITE_PENDING, &con->state);
prepare_connect_authorizer(con); return prepare_connect_authorizer(con);
} }
...@@ -1216,6 +1222,7 @@ static int process_connect(struct ceph_connection *con) ...@@ -1216,6 +1222,7 @@ static int process_connect(struct ceph_connection *con)
u64 sup_feat = con->msgr->supported_features; u64 sup_feat = con->msgr->supported_features;
u64 req_feat = con->msgr->required_features; u64 req_feat = con->msgr->required_features;
u64 server_feat = le64_to_cpu(con->in_reply.features); u64 server_feat = le64_to_cpu(con->in_reply.features);
int ret;
dout("process_connect on %p tag %d\n", con, (int)con->in_tag); dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
...@@ -1250,7 +1257,9 @@ static int process_connect(struct ceph_connection *con) ...@@ -1250,7 +1257,9 @@ static int process_connect(struct ceph_connection *con)
return -1; return -1;
} }
con->auth_retry = 1; con->auth_retry = 1;
prepare_write_connect(con->msgr, con, 0); ret = prepare_write_connect(con->msgr, con, 0);
if (ret < 0)
return ret;
prepare_read_connect(con); prepare_read_connect(con);
break; break;
...@@ -1277,6 +1286,9 @@ static int process_connect(struct ceph_connection *con) ...@@ -1277,6 +1286,9 @@ static int process_connect(struct ceph_connection *con)
if (con->ops->peer_reset) if (con->ops->peer_reset)
con->ops->peer_reset(con); con->ops->peer_reset(con);
mutex_lock(&con->mutex); mutex_lock(&con->mutex);
if (test_bit(CLOSED, &con->state) ||
test_bit(OPENING, &con->state))
return -EAGAIN;
break; break;
case CEPH_MSGR_TAG_RETRY_SESSION: case CEPH_MSGR_TAG_RETRY_SESSION:
...@@ -1810,6 +1822,17 @@ static int try_read(struct ceph_connection *con) ...@@ -1810,6 +1822,17 @@ static int try_read(struct ceph_connection *con)
more: more:
dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
con->in_base_pos); con->in_base_pos);
/*
* process_connect and process_message drop and re-take
* con->mutex. make sure we handle a racing close or reopen.
*/
if (test_bit(CLOSED, &con->state) ||
test_bit(OPENING, &con->state)) {
ret = -EAGAIN;
goto out;
}
if (test_bit(CONNECTING, &con->state)) { if (test_bit(CONNECTING, &con->state)) {
if (!test_bit(NEGOTIATING, &con->state)) { if (!test_bit(NEGOTIATING, &con->state)) {
dout("try_read connecting\n"); dout("try_read connecting\n");
...@@ -1938,8 +1961,10 @@ static void con_work(struct work_struct *work) ...@@ -1938,8 +1961,10 @@ static void con_work(struct work_struct *work)
{ {
struct ceph_connection *con = container_of(work, struct ceph_connection, struct ceph_connection *con = container_of(work, struct ceph_connection,
work.work); work.work);
int ret;
mutex_lock(&con->mutex); mutex_lock(&con->mutex);
restart:
if (test_and_clear_bit(BACKOFF, &con->state)) { if (test_and_clear_bit(BACKOFF, &con->state)) {
dout("con_work %p backing off\n", con); dout("con_work %p backing off\n", con);
if (queue_delayed_work(ceph_msgr_wq, &con->work, if (queue_delayed_work(ceph_msgr_wq, &con->work,
...@@ -1969,18 +1994,31 @@ static void con_work(struct work_struct *work) ...@@ -1969,18 +1994,31 @@ static void con_work(struct work_struct *work)
con_close_socket(con); con_close_socket(con);
} }
if (test_and_clear_bit(SOCK_CLOSED, &con->state) || if (test_and_clear_bit(SOCK_CLOSED, &con->state))
try_read(con) < 0 || goto fault;
try_write(con) < 0) {
mutex_unlock(&con->mutex); ret = try_read(con);
ceph_fault(con); /* error/fault path */ if (ret == -EAGAIN)
goto done_unlocked; goto restart;
} if (ret < 0)
goto fault;
ret = try_write(con);
if (ret == -EAGAIN)
goto restart;
if (ret < 0)
goto fault;
done: done:
mutex_unlock(&con->mutex); mutex_unlock(&con->mutex);
done_unlocked: done_unlocked:
con->ops->put(con); con->ops->put(con);
return;
fault:
mutex_unlock(&con->mutex);
ceph_fault(con); /* error/fault path */
goto done_unlocked;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment