Commit 82dcabad authored by Ilya Dryomov's avatar Ilya Dryomov

libceph: revamp subs code, switch to SUBSCRIBE2 protocol

It is currently hard-coded in the mon_client that mdsmap and monmap
subs are continuous, while osdmap sub is always "onetime".  To better
handle full clusters/pools in the osd_client, we need to be able to
issue continuous osdmap subs.  Revamp subs code to allow us to specify
for each sub whether it should be continuous or not.

Although not strictly required for the above, switch to SUBSCRIBE2
protocol while at it, eliminating the ambiguity between a request for
"every map since X" and a request for "just the latest" when we don't
have a map yet (i.e. have epoch 0).  SUBSCRIBE2 feature bit is now
required - it's been supported since pre-argonaut (2010).

Move "got mdsmap" call to the end of ceph_mdsc_handle_map() - calling
in before we validate the epoch and successfully install the new map
can mess up mon_client sub state.
Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent 0f9af169
...@@ -3764,7 +3764,6 @@ void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg) ...@@ -3764,7 +3764,6 @@ void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
dout("handle_map epoch %u len %d\n", epoch, (int)maplen); dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
/* do we need it? */ /* do we need it? */
ceph_monc_got_mdsmap(&mdsc->fsc->client->monc, epoch);
mutex_lock(&mdsc->mutex); mutex_lock(&mdsc->mutex);
if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
dout("handle_map epoch %u <= our %u\n", dout("handle_map epoch %u <= our %u\n",
...@@ -3791,6 +3790,8 @@ void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg) ...@@ -3791,6 +3790,8 @@ void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size; mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
__wake_requests(mdsc, &mdsc->waiting_for_map); __wake_requests(mdsc, &mdsc->waiting_for_map);
ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
mdsc->mdsmap->m_epoch);
mutex_unlock(&mdsc->mutex); mutex_unlock(&mdsc->mutex);
schedule_delayed(mdsc); schedule_delayed(mdsc);
......
...@@ -530,7 +530,7 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt, ...@@ -530,7 +530,7 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
goto fail; goto fail;
} }
fsc->client->extra_mon_dispatch = extra_mon_dispatch; fsc->client->extra_mon_dispatch = extra_mon_dispatch;
fsc->client->monc.want_mdsmap = 1; ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 0, true);
fsc->mount_options = fsopt; fsc->mount_options = fsopt;
......
...@@ -105,6 +105,7 @@ static inline u64 ceph_sanitize_features(u64 features) ...@@ -105,6 +105,7 @@ static inline u64 ceph_sanitize_features(u64 features)
*/ */
#define CEPH_FEATURES_SUPPORTED_DEFAULT \ #define CEPH_FEATURES_SUPPORTED_DEFAULT \
(CEPH_FEATURE_NOSRCADDR | \ (CEPH_FEATURE_NOSRCADDR | \
CEPH_FEATURE_SUBSCRIBE2 | \
CEPH_FEATURE_RECONNECT_SEQ | \ CEPH_FEATURE_RECONNECT_SEQ | \
CEPH_FEATURE_PGID64 | \ CEPH_FEATURE_PGID64 | \
CEPH_FEATURE_PGPOOL3 | \ CEPH_FEATURE_PGPOOL3 | \
...@@ -127,6 +128,7 @@ static inline u64 ceph_sanitize_features(u64 features) ...@@ -127,6 +128,7 @@ static inline u64 ceph_sanitize_features(u64 features)
#define CEPH_FEATURES_REQUIRED_DEFAULT \ #define CEPH_FEATURES_REQUIRED_DEFAULT \
(CEPH_FEATURE_NOSRCADDR | \ (CEPH_FEATURE_NOSRCADDR | \
CEPH_FEATURE_SUBSCRIBE2 | \
CEPH_FEATURE_RECONNECT_SEQ | \ CEPH_FEATURE_RECONNECT_SEQ | \
CEPH_FEATURE_PGID64 | \ CEPH_FEATURE_PGID64 | \
CEPH_FEATURE_PGPOOL3 | \ CEPH_FEATURE_PGPOOL3 | \
......
...@@ -198,8 +198,8 @@ struct ceph_client_mount { ...@@ -198,8 +198,8 @@ struct ceph_client_mount {
#define CEPH_SUBSCRIBE_ONETIME 1 /* i want only 1 update after have */ #define CEPH_SUBSCRIBE_ONETIME 1 /* i want only 1 update after have */
struct ceph_mon_subscribe_item { struct ceph_mon_subscribe_item {
__le64 have_version; __le64 have; __le64 start;
__u8 onetime; __u8 flags;
} __attribute__ ((packed)); } __attribute__ ((packed));
struct ceph_mon_subscribe_ack { struct ceph_mon_subscribe_ack {
......
...@@ -68,7 +68,8 @@ struct ceph_mon_client { ...@@ -68,7 +68,8 @@ struct ceph_mon_client {
bool hunting; bool hunting;
int cur_mon; /* last monitor i contacted */ int cur_mon; /* last monitor i contacted */
unsigned long sub_sent, sub_renew_after; unsigned long sub_renew_after;
unsigned long sub_renew_sent;
struct ceph_connection con; struct ceph_connection con;
/* pending generic requests */ /* pending generic requests */
...@@ -76,10 +77,12 @@ struct ceph_mon_client { ...@@ -76,10 +77,12 @@ struct ceph_mon_client {
int num_generic_requests; int num_generic_requests;
u64 last_tid; u64 last_tid;
/* mds/osd map */ /* subs, indexed with CEPH_SUB_* */
int want_mdsmap; struct {
int want_next_osdmap; /* 1 = want, 2 = want+asked */ struct ceph_mon_subscribe_item item;
u32 have_osdmap, have_mdsmap; bool want;
u32 have; /* epoch */
} subs[3];
#ifdef CONFIG_DEBUG_FS #ifdef CONFIG_DEBUG_FS
struct dentry *debugfs_file; struct dentry *debugfs_file;
...@@ -93,14 +96,23 @@ extern int ceph_monmap_contains(struct ceph_monmap *m, ...@@ -93,14 +96,23 @@ extern int ceph_monmap_contains(struct ceph_monmap *m,
extern int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl); extern int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl);
extern void ceph_monc_stop(struct ceph_mon_client *monc); extern void ceph_monc_stop(struct ceph_mon_client *monc);
enum {
CEPH_SUB_MDSMAP = 0,
CEPH_SUB_MONMAP,
CEPH_SUB_OSDMAP,
};
extern const char *ceph_sub_str[];
/* /*
* The model here is to indicate that we need a new map of at least * The model here is to indicate that we need a new map of at least
* epoch @want, and also call in when we receive a map. We will * epoch @epoch, and also call in when we receive a map. We will
* periodically rerequest the map from the monitor cluster until we * periodically rerequest the map from the monitor cluster until we
* get what we want. * get what we want.
*/ */
extern int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 have); bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
extern int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 have); bool continuous);
void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch);
extern void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc); extern void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc);
extern int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, extern int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
......
...@@ -112,15 +112,20 @@ static int monc_show(struct seq_file *s, void *p) ...@@ -112,15 +112,20 @@ static int monc_show(struct seq_file *s, void *p)
struct ceph_mon_generic_request *req; struct ceph_mon_generic_request *req;
struct ceph_mon_client *monc = &client->monc; struct ceph_mon_client *monc = &client->monc;
struct rb_node *rp; struct rb_node *rp;
int i;
mutex_lock(&monc->mutex); mutex_lock(&monc->mutex);
if (monc->have_mdsmap) for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
seq_printf(s, "have mdsmap %u\n", (unsigned int)monc->have_mdsmap); seq_printf(s, "have %s %u", ceph_sub_str[i],
if (monc->have_osdmap) monc->subs[i].have);
seq_printf(s, "have osdmap %u\n", (unsigned int)monc->have_osdmap); if (monc->subs[i].want)
if (monc->want_next_osdmap) seq_printf(s, " want %llu%s",
seq_printf(s, "want next osdmap\n"); le64_to_cpu(monc->subs[i].item.start),
(monc->subs[i].item.flags &
CEPH_SUBSCRIBE_ONETIME ? "" : "+"));
seq_putc(s, '\n');
}
for (rp = rb_first(&monc->generic_request_tree); rp; rp = rb_next(rp)) { for (rp = rb_first(&monc->generic_request_tree); rp; rp = rb_next(rp)) {
__u16 op; __u16 op;
......
...@@ -140,9 +140,8 @@ static int __open_session(struct ceph_mon_client *monc) ...@@ -140,9 +140,8 @@ static int __open_session(struct ceph_mon_client *monc)
monc->cur_mon = r % monc->monmap->num_mon; monc->cur_mon = r % monc->monmap->num_mon;
dout("open_session num=%d r=%d -> mon%d\n", dout("open_session num=%d r=%d -> mon%d\n",
monc->monmap->num_mon, r, monc->cur_mon); monc->monmap->num_mon, r, monc->cur_mon);
monc->sub_sent = 0;
monc->sub_renew_after = jiffies; /* i.e., expired */ monc->sub_renew_after = jiffies; /* i.e., expired */
monc->want_next_osdmap = !!monc->want_next_osdmap; monc->sub_renew_sent = 0;
dout("open_session mon%d opening\n", monc->cur_mon); dout("open_session mon%d opening\n", monc->cur_mon);
ceph_con_open(&monc->con, ceph_con_open(&monc->con,
...@@ -189,59 +188,58 @@ static void __schedule_delayed(struct ceph_mon_client *monc) ...@@ -189,59 +188,58 @@ static void __schedule_delayed(struct ceph_mon_client *monc)
round_jiffies_relative(delay)); round_jiffies_relative(delay));
} }
const char *ceph_sub_str[] = {
[CEPH_SUB_MDSMAP] = "mdsmap",
[CEPH_SUB_MONMAP] = "monmap",
[CEPH_SUB_OSDMAP] = "osdmap",
};
/* /*
* Send subscribe request for mdsmap and/or osdmap. * Send subscribe request for one or more maps, according to
* monc->subs.
*/ */
static void __send_subscribe(struct ceph_mon_client *monc) static void __send_subscribe(struct ceph_mon_client *monc)
{ {
dout("__send_subscribe sub_sent=%u exp=%u want_osd=%d\n", struct ceph_msg *msg = monc->m_subscribe;
(unsigned int)monc->sub_sent, __sub_expired(monc), void *p = msg->front.iov_base;
monc->want_next_osdmap); void *const end = p + msg->front_alloc_len;
if ((__sub_expired(monc) && !monc->sub_sent) || int num = 0;
monc->want_next_osdmap == 1) { int i;
struct ceph_msg *msg = monc->m_subscribe;
struct ceph_mon_subscribe_item *i; dout("%s sent %lu\n", __func__, monc->sub_renew_sent);
void *p, *end;
int num; BUG_ON(monc->cur_mon < 0);
p = msg->front.iov_base; if (!monc->sub_renew_sent)
end = p + msg->front_alloc_len; monc->sub_renew_sent = jiffies | 1; /* never 0 */
num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap; msg->hdr.version = cpu_to_le16(2);
ceph_encode_32(&p, num);
for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
if (monc->want_next_osdmap) { if (monc->subs[i].want)
dout("__send_subscribe to 'osdmap' %u\n", num++;
(unsigned int)monc->have_osdmap);
ceph_encode_string(&p, end, "osdmap", 6);
i = p;
i->have = cpu_to_le64(monc->have_osdmap);
i->onetime = 1;
p += sizeof(*i);
monc->want_next_osdmap = 2; /* requested */
}
if (monc->want_mdsmap) {
dout("__send_subscribe to 'mdsmap' %u+\n",
(unsigned int)monc->have_mdsmap);
ceph_encode_string(&p, end, "mdsmap", 6);
i = p;
i->have = cpu_to_le64(monc->have_mdsmap);
i->onetime = 0;
p += sizeof(*i);
}
ceph_encode_string(&p, end, "monmap", 6);
i = p;
i->have = 0;
i->onetime = 0;
p += sizeof(*i);
msg->front.iov_len = p - msg->front.iov_base;
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
ceph_msg_revoke(msg);
ceph_con_send(&monc->con, ceph_msg_get(msg));
monc->sub_sent = jiffies | 1; /* never 0 */
} }
BUG_ON(num < 1); /* monmap sub is always there */
ceph_encode_32(&p, num);
for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
const char *s = ceph_sub_str[i];
if (!monc->subs[i].want)
continue;
dout("%s %s start %llu flags 0x%x\n", __func__, s,
le64_to_cpu(monc->subs[i].item.start),
monc->subs[i].item.flags);
ceph_encode_string(&p, end, s, strlen(s));
memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item));
p += sizeof(monc->subs[i].item);
}
BUG_ON(p != (end - 35 - (ARRAY_SIZE(monc->subs) - num) * 19));
msg->front.iov_len = p - msg->front.iov_base;
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
ceph_msg_revoke(msg);
ceph_con_send(&monc->con, ceph_msg_get(msg));
} }
static void handle_subscribe_ack(struct ceph_mon_client *monc, static void handle_subscribe_ack(struct ceph_mon_client *monc,
...@@ -255,9 +253,16 @@ static void handle_subscribe_ack(struct ceph_mon_client *monc, ...@@ -255,9 +253,16 @@ static void handle_subscribe_ack(struct ceph_mon_client *monc,
seconds = le32_to_cpu(h->duration); seconds = le32_to_cpu(h->duration);
mutex_lock(&monc->mutex); mutex_lock(&monc->mutex);
dout("handle_subscribe_ack after %d seconds\n", seconds); if (monc->sub_renew_sent) {
monc->sub_renew_after = monc->sub_sent + (seconds >> 1)*HZ - 1; monc->sub_renew_after = monc->sub_renew_sent +
monc->sub_sent = 0; (seconds >> 1) * HZ - 1;
dout("%s sent %lu duration %d renew after %lu\n", __func__,
monc->sub_renew_sent, seconds, monc->sub_renew_after);
monc->sub_renew_sent = 0;
} else {
dout("%s sent %lu renew after %lu, ignoring\n", __func__,
monc->sub_renew_sent, monc->sub_renew_after);
}
mutex_unlock(&monc->mutex); mutex_unlock(&monc->mutex);
return; return;
bad: bad:
...@@ -266,36 +271,82 @@ static void handle_subscribe_ack(struct ceph_mon_client *monc, ...@@ -266,36 +271,82 @@ static void handle_subscribe_ack(struct ceph_mon_client *monc,
} }
/* /*
* Keep track of which maps we have * Register interest in a map
*
* @sub: one of CEPH_SUB_*
* @epoch: X for "every map since X", or 0 for "just the latest"
*/ */
int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got) static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub,
u32 epoch, bool continuous)
{
__le64 start = cpu_to_le64(epoch);
u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0;
dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub],
epoch, continuous);
if (monc->subs[sub].want &&
monc->subs[sub].item.start == start &&
monc->subs[sub].item.flags == flags)
return false;
monc->subs[sub].item.start = start;
monc->subs[sub].item.flags = flags;
monc->subs[sub].want = true;
return true;
}
bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
bool continuous)
{ {
bool need_request;
mutex_lock(&monc->mutex); mutex_lock(&monc->mutex);
monc->have_mdsmap = got; need_request = __ceph_monc_want_map(monc, sub, epoch, continuous);
mutex_unlock(&monc->mutex); mutex_unlock(&monc->mutex);
return 0;
return need_request;
} }
EXPORT_SYMBOL(ceph_monc_got_mdsmap); EXPORT_SYMBOL(ceph_monc_want_map);
int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got) /*
* Keep track of which maps we have
*
* @sub: one of CEPH_SUB_*
*/
static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub,
u32 epoch)
{
dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch);
if (monc->subs[sub].want) {
if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME)
monc->subs[sub].want = false;
else
monc->subs[sub].item.start = cpu_to_le64(epoch + 1);
}
monc->subs[sub].have = epoch;
}
void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch)
{ {
mutex_lock(&monc->mutex); mutex_lock(&monc->mutex);
monc->have_osdmap = got; __ceph_monc_got_map(monc, sub, epoch);
monc->want_next_osdmap = 0;
mutex_unlock(&monc->mutex); mutex_unlock(&monc->mutex);
return 0;
} }
EXPORT_SYMBOL(ceph_monc_got_map);
/* /*
* Register interest in the next osdmap * Register interest in the next osdmap
*/ */
void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc) void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc)
{ {
dout("request_next_osdmap have %u\n", monc->have_osdmap); dout("%s have %u\n", __func__, monc->subs[CEPH_SUB_OSDMAP].have);
mutex_lock(&monc->mutex); mutex_lock(&monc->mutex);
if (!monc->want_next_osdmap) if (__ceph_monc_want_map(monc, CEPH_SUB_OSDMAP,
monc->want_next_osdmap = 1; monc->subs[CEPH_SUB_OSDMAP].have + 1, false))
if (monc->want_next_osdmap < 2)
__send_subscribe(monc); __send_subscribe(monc);
mutex_unlock(&monc->mutex); mutex_unlock(&monc->mutex);
} }
...@@ -314,15 +365,15 @@ int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, ...@@ -314,15 +365,15 @@ int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
long ret; long ret;
mutex_lock(&monc->mutex); mutex_lock(&monc->mutex);
while (monc->have_osdmap < epoch) { while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) {
mutex_unlock(&monc->mutex); mutex_unlock(&monc->mutex);
if (timeout && time_after_eq(jiffies, started + timeout)) if (timeout && time_after_eq(jiffies, started + timeout))
return -ETIMEDOUT; return -ETIMEDOUT;
ret = wait_event_interruptible_timeout(monc->client->auth_wq, ret = wait_event_interruptible_timeout(monc->client->auth_wq,
monc->have_osdmap >= epoch, monc->subs[CEPH_SUB_OSDMAP].have >= epoch,
ceph_timeout_jiffies(timeout)); ceph_timeout_jiffies(timeout));
if (ret < 0) if (ret < 0)
return ret; return ret;
...@@ -335,11 +386,14 @@ int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, ...@@ -335,11 +386,14 @@ int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
EXPORT_SYMBOL(ceph_monc_wait_osdmap); EXPORT_SYMBOL(ceph_monc_wait_osdmap);
/* /*
* * Open a session with a random monitor. Request monmap and osdmap,
* which are waited upon in __ceph_open_session().
*/ */
int ceph_monc_open_session(struct ceph_mon_client *monc) int ceph_monc_open_session(struct ceph_mon_client *monc)
{ {
mutex_lock(&monc->mutex); mutex_lock(&monc->mutex);
__ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true);
__ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false);
__open_session(monc); __open_session(monc);
__schedule_delayed(monc); __schedule_delayed(monc);
mutex_unlock(&monc->mutex); mutex_unlock(&monc->mutex);
...@@ -375,6 +429,7 @@ static void ceph_monc_handle_map(struct ceph_mon_client *monc, ...@@ -375,6 +429,7 @@ static void ceph_monc_handle_map(struct ceph_mon_client *monc,
client->monc.monmap = monmap; client->monc.monmap = monmap;
kfree(old); kfree(old);
__ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch);
client->have_fsid = true; client->have_fsid = true;
out: out:
...@@ -725,8 +780,14 @@ static void delayed_work(struct work_struct *work) ...@@ -725,8 +780,14 @@ static void delayed_work(struct work_struct *work)
__validate_auth(monc); __validate_auth(monc);
} }
if (is_auth) if (is_auth) {
__send_subscribe(monc); unsigned long now = jiffies;
dout("%s renew subs? now %lu renew after %lu\n",
__func__, now, monc->sub_renew_after);
if (time_after_eq(now, monc->sub_renew_after))
__send_subscribe(monc);
}
} }
__schedule_delayed(monc); __schedule_delayed(monc);
mutex_unlock(&monc->mutex); mutex_unlock(&monc->mutex);
...@@ -815,16 +876,13 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) ...@@ -815,16 +876,13 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
monc->cur_mon = -1; monc->cur_mon = -1;
monc->hunting = true; monc->hunting = true;
monc->sub_renew_after = jiffies; monc->sub_renew_after = jiffies;
monc->sub_sent = 0; monc->sub_renew_sent = 0;
INIT_DELAYED_WORK(&monc->delayed_work, delayed_work); INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
monc->generic_request_tree = RB_ROOT; monc->generic_request_tree = RB_ROOT;
monc->num_generic_requests = 0; monc->num_generic_requests = 0;
monc->last_tid = 0; monc->last_tid = 0;
monc->have_mdsmap = 0;
monc->have_osdmap = 0;
monc->want_next_osdmap = 1;
return 0; return 0;
out_auth_reply: out_auth_reply:
......
...@@ -2187,7 +2187,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) ...@@ -2187,7 +2187,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
goto bad; goto bad;
done: done:
downgrade_write(&osdc->map_sem); downgrade_write(&osdc->map_sem);
ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch); ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
osdc->osdmap->epoch);
/* /*
* subscribe to subsequent osdmap updates if full to ensure * subscribe to subsequent osdmap updates if full to ensure
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment