Commit 58890c06 authored by Linus Torvalds's avatar Linus Torvalds

Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

Pull Ceph fixes from Sage Weil:
 "Two of Alex's patches deal with a race when reseting server
  connections for open RBD images, one demotes some non-fatal BUGs to
  WARNs, and my patch fixes a protocol feature bit failure path."

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
  libceph: fix protocol feature mismatch failure path
  libceph: WARN, don't BUG on unexpected connection states
  libceph: always reset osds when kicking
  libceph: move linger requests sooner in kick_requests()
parents 42288fe3 0fa6ebc6
...@@ -506,6 +506,7 @@ static void reset_connection(struct ceph_connection *con) ...@@ -506,6 +506,7 @@ static void reset_connection(struct ceph_connection *con)
{ {
/* reset connection, out_queue, msg_ and connect_seq */ /* reset connection, out_queue, msg_ and connect_seq */
/* discard existing out_queue and msg_seq */ /* discard existing out_queue and msg_seq */
dout("reset_connection %p\n", con);
ceph_msg_remove_list(&con->out_queue); ceph_msg_remove_list(&con->out_queue);
ceph_msg_remove_list(&con->out_sent); ceph_msg_remove_list(&con->out_sent);
...@@ -561,7 +562,7 @@ void ceph_con_open(struct ceph_connection *con, ...@@ -561,7 +562,7 @@ void ceph_con_open(struct ceph_connection *con,
mutex_lock(&con->mutex); mutex_lock(&con->mutex);
dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr));
BUG_ON(con->state != CON_STATE_CLOSED); WARN_ON(con->state != CON_STATE_CLOSED);
con->state = CON_STATE_PREOPEN; con->state = CON_STATE_PREOPEN;
con->peer_name.type = (__u8) entity_type; con->peer_name.type = (__u8) entity_type;
...@@ -1506,13 +1507,6 @@ static int process_banner(struct ceph_connection *con) ...@@ -1506,13 +1507,6 @@ static int process_banner(struct ceph_connection *con)
return 0; return 0;
} }
static void fail_protocol(struct ceph_connection *con)
{
reset_connection(con);
BUG_ON(con->state != CON_STATE_NEGOTIATING);
con->state = CON_STATE_CLOSED;
}
static int process_connect(struct ceph_connection *con) static int process_connect(struct ceph_connection *con)
{ {
u64 sup_feat = con->msgr->supported_features; u64 sup_feat = con->msgr->supported_features;
...@@ -1530,7 +1524,7 @@ static int process_connect(struct ceph_connection *con) ...@@ -1530,7 +1524,7 @@ static int process_connect(struct ceph_connection *con)
ceph_pr_addr(&con->peer_addr.in_addr), ceph_pr_addr(&con->peer_addr.in_addr),
sup_feat, server_feat, server_feat & ~sup_feat); sup_feat, server_feat, server_feat & ~sup_feat);
con->error_msg = "missing required protocol features"; con->error_msg = "missing required protocol features";
fail_protocol(con); reset_connection(con);
return -1; return -1;
case CEPH_MSGR_TAG_BADPROTOVER: case CEPH_MSGR_TAG_BADPROTOVER:
...@@ -1541,7 +1535,7 @@ static int process_connect(struct ceph_connection *con) ...@@ -1541,7 +1535,7 @@ static int process_connect(struct ceph_connection *con)
le32_to_cpu(con->out_connect.protocol_version), le32_to_cpu(con->out_connect.protocol_version),
le32_to_cpu(con->in_reply.protocol_version)); le32_to_cpu(con->in_reply.protocol_version));
con->error_msg = "protocol version mismatch"; con->error_msg = "protocol version mismatch";
fail_protocol(con); reset_connection(con);
return -1; return -1;
case CEPH_MSGR_TAG_BADAUTHORIZER: case CEPH_MSGR_TAG_BADAUTHORIZER:
...@@ -1631,11 +1625,11 @@ static int process_connect(struct ceph_connection *con) ...@@ -1631,11 +1625,11 @@ static int process_connect(struct ceph_connection *con)
ceph_pr_addr(&con->peer_addr.in_addr), ceph_pr_addr(&con->peer_addr.in_addr),
req_feat, server_feat, req_feat & ~server_feat); req_feat, server_feat, req_feat & ~server_feat);
con->error_msg = "missing required protocol features"; con->error_msg = "missing required protocol features";
fail_protocol(con); reset_connection(con);
return -1; return -1;
} }
BUG_ON(con->state != CON_STATE_NEGOTIATING); WARN_ON(con->state != CON_STATE_NEGOTIATING);
con->state = CON_STATE_OPEN; con->state = CON_STATE_OPEN;
con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
...@@ -2132,7 +2126,6 @@ static int try_read(struct ceph_connection *con) ...@@ -2132,7 +2126,6 @@ static int try_read(struct ceph_connection *con)
if (ret < 0) if (ret < 0)
goto out; goto out;
BUG_ON(con->state != CON_STATE_CONNECTING);
con->state = CON_STATE_NEGOTIATING; con->state = CON_STATE_NEGOTIATING;
/* /*
...@@ -2160,7 +2153,7 @@ static int try_read(struct ceph_connection *con) ...@@ -2160,7 +2153,7 @@ static int try_read(struct ceph_connection *con)
goto more; goto more;
} }
BUG_ON(con->state != CON_STATE_OPEN); WARN_ON(con->state != CON_STATE_OPEN);
if (con->in_base_pos < 0) { if (con->in_base_pos < 0) {
/* /*
...@@ -2382,7 +2375,7 @@ static void ceph_fault(struct ceph_connection *con) ...@@ -2382,7 +2375,7 @@ static void ceph_fault(struct ceph_connection *con)
dout("fault %p state %lu to peer %s\n", dout("fault %p state %lu to peer %s\n",
con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));
BUG_ON(con->state != CON_STATE_CONNECTING && WARN_ON(con->state != CON_STATE_CONNECTING &&
con->state != CON_STATE_NEGOTIATING && con->state != CON_STATE_NEGOTIATING &&
con->state != CON_STATE_OPEN); con->state != CON_STATE_OPEN);
......
...@@ -1270,7 +1270,7 @@ static void reset_changed_osds(struct ceph_osd_client *osdc) ...@@ -1270,7 +1270,7 @@ static void reset_changed_osds(struct ceph_osd_client *osdc)
* Requeue requests whose mapping to an OSD has changed. If requests map to * Requeue requests whose mapping to an OSD has changed. If requests map to
* no osd, request a new map. * no osd, request a new map.
* *
* Caller should hold map_sem for read and request_mutex. * Caller should hold map_sem for read.
*/ */
static void kick_requests(struct ceph_osd_client *osdc, int force_resend) static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
{ {
...@@ -1284,6 +1284,24 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend) ...@@ -1284,6 +1284,24 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
for (p = rb_first(&osdc->requests); p; ) { for (p = rb_first(&osdc->requests); p; ) {
req = rb_entry(p, struct ceph_osd_request, r_node); req = rb_entry(p, struct ceph_osd_request, r_node);
p = rb_next(p); p = rb_next(p);
/*
* For linger requests that have not yet been
* registered, move them to the linger list; they'll
* be sent to the osd in the loop below. Unregister
* the request before re-registering it as a linger
* request to ensure the __map_request() below
* will decide it needs to be sent.
*/
if (req->r_linger && list_empty(&req->r_linger_item)) {
dout("%p tid %llu restart on osd%d\n",
req, req->r_tid,
req->r_osd ? req->r_osd->o_osd : -1);
__unregister_request(osdc, req);
__register_linger_request(osdc, req);
continue;
}
err = __map_request(osdc, req, force_resend); err = __map_request(osdc, req, force_resend);
if (err < 0) if (err < 0)
continue; /* error */ continue; /* error */
...@@ -1298,17 +1316,6 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend) ...@@ -1298,17 +1316,6 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
req->r_flags |= CEPH_OSD_FLAG_RETRY; req->r_flags |= CEPH_OSD_FLAG_RETRY;
} }
} }
if (req->r_linger && list_empty(&req->r_linger_item)) {
/*
* register as a linger so that we will
* re-submit below and get a new tid
*/
dout("%p tid %llu restart on osd%d\n",
req, req->r_tid,
req->r_osd ? req->r_osd->o_osd : -1);
__register_linger_request(osdc, req);
__unregister_request(osdc, req);
}
} }
list_for_each_entry_safe(req, nreq, &osdc->req_linger, list_for_each_entry_safe(req, nreq, &osdc->req_linger,
...@@ -1316,6 +1323,7 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend) ...@@ -1316,6 +1323,7 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
dout("linger req=%p req->r_osd=%p\n", req, req->r_osd); dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
err = __map_request(osdc, req, force_resend); err = __map_request(osdc, req, force_resend);
dout("__map_request returned %d\n", err);
if (err == 0) if (err == 0)
continue; /* no change and no osd was specified */ continue; /* no change and no osd was specified */
if (err < 0) if (err < 0)
...@@ -1337,6 +1345,7 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend) ...@@ -1337,6 +1345,7 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
dout("%d requests for down osds, need new map\n", needmap); dout("%d requests for down osds, need new map\n", needmap);
ceph_monc_request_next_osdmap(&osdc->client->monc); ceph_monc_request_next_osdmap(&osdc->client->monc);
} }
reset_changed_osds(osdc);
} }
...@@ -1393,7 +1402,6 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) ...@@ -1393,7 +1402,6 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
osdc->osdmap = newmap; osdc->osdmap = newmap;
} }
kick_requests(osdc, 0); kick_requests(osdc, 0);
reset_changed_osds(osdc);
} else { } else {
dout("ignoring incremental map %u len %d\n", dout("ignoring incremental map %u len %d\n",
epoch, maplen); epoch, maplen);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment