Commit 42c1b124 authored by Ilya Dryomov's avatar Ilya Dryomov

libceph: handle_one_map()

Separate osdmap handling from decoding and iterating over a bag of maps
in a fresh MOSDMap message.  This sets up the scene for the updated OSD
client.

Of particular importance here is the addition of pi->was_full, which
can be used to answer "did this pool go full -> not-full in this map?".
This is the key bit for supporting pool quotas.

We won't be able to downgrade map_sem for much longer, so drop
downgrade_write().
Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent e5253a7b
...@@ -115,6 +115,7 @@ extern const char *ceph_sub_str[]; ...@@ -115,6 +115,7 @@ extern const char *ceph_sub_str[];
bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch, bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
bool continuous); bool continuous);
void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch); void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch);
void ceph_monc_renew_subs(struct ceph_mon_client *monc);
extern void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc); extern void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc);
extern int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, extern int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
......
...@@ -45,6 +45,8 @@ struct ceph_pg_pool_info { ...@@ -45,6 +45,8 @@ struct ceph_pg_pool_info {
s64 write_tier; /* wins for read+write ops */ s64 write_tier; /* wins for read+write ops */
u64 flags; /* CEPH_POOL_FLAG_* */ u64 flags; /* CEPH_POOL_FLAG_* */
char *name; char *name;
bool was_full; /* for handle_one_map() */
}; };
static inline bool ceph_can_shift_osds(struct ceph_pg_pool_info *pool) static inline bool ceph_can_shift_osds(struct ceph_pg_pool_info *pool)
......
...@@ -376,6 +376,14 @@ void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch) ...@@ -376,6 +376,14 @@ void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch)
} }
EXPORT_SYMBOL(ceph_monc_got_map); EXPORT_SYMBOL(ceph_monc_got_map);
void ceph_monc_renew_subs(struct ceph_mon_client *monc)
{
mutex_lock(&monc->mutex);
__send_subscribe(monc);
mutex_unlock(&monc->mutex);
}
EXPORT_SYMBOL(ceph_monc_renew_subs);
/* /*
* Register interest in the next osdmap * Register interest in the next osdmap
*/ */
......
...@@ -1245,6 +1245,21 @@ static bool __pool_full(struct ceph_pg_pool_info *pi) ...@@ -1245,6 +1245,21 @@ static bool __pool_full(struct ceph_pg_pool_info *pi)
return pi->flags & CEPH_POOL_FLAG_FULL; return pi->flags & CEPH_POOL_FLAG_FULL;
} }
static bool have_pool_full(struct ceph_osd_client *osdc)
{
struct rb_node *n;
for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
struct ceph_pg_pool_info *pi =
rb_entry(n, struct ceph_pg_pool_info, node);
if (__pool_full(pi))
return true;
}
return false;
}
/* /*
* Returns whether a request should be blocked from being sent * Returns whether a request should be blocked from being sent
* based on the current osdmap and osd_client settings. * based on the current osdmap and osd_client settings.
...@@ -1639,6 +1654,26 @@ static void __send_queued(struct ceph_osd_client *osdc) ...@@ -1639,6 +1654,26 @@ static void __send_queued(struct ceph_osd_client *osdc)
} }
} }
static void maybe_request_map(struct ceph_osd_client *osdc)
{
bool continuous = false;
WARN_ON(!osdc->osdmap->epoch);
if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD) ||
ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR)) {
dout("%s osdc %p continuous\n", __func__, osdc);
continuous = true;
} else {
dout("%s osdc %p onetime\n", __func__, osdc);
}
if (ceph_monc_want_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
osdc->osdmap->epoch + 1, continuous))
ceph_monc_renew_subs(&osdc->client->monc);
}
/* /*
* Caller should hold map_sem for read and request_mutex. * Caller should hold map_sem for read and request_mutex.
*/ */
...@@ -2119,6 +2154,18 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) ...@@ -2119,6 +2154,18 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
up_read(&osdc->map_sem); up_read(&osdc->map_sem);
} }
static void set_pool_was_full(struct ceph_osd_client *osdc)
{
struct rb_node *n;
for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
struct ceph_pg_pool_info *pi =
rb_entry(n, struct ceph_pg_pool_info, node);
pi->was_full = __pool_full(pi);
}
}
static void reset_changed_osds(struct ceph_osd_client *osdc) static void reset_changed_osds(struct ceph_osd_client *osdc)
{ {
struct rb_node *p, *n; struct rb_node *p, *n;
...@@ -2237,6 +2284,57 @@ static void kick_requests(struct ceph_osd_client *osdc, bool force_resend, ...@@ -2237,6 +2284,57 @@ static void kick_requests(struct ceph_osd_client *osdc, bool force_resend,
} }
} }
static int handle_one_map(struct ceph_osd_client *osdc,
void *p, void *end, bool incremental)
{
struct ceph_osdmap *newmap;
struct rb_node *n;
bool skipped_map = false;
bool was_full;
was_full = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
set_pool_was_full(osdc);
if (incremental)
newmap = osdmap_apply_incremental(&p, end, osdc->osdmap);
else
newmap = ceph_osdmap_decode(&p, end);
if (IS_ERR(newmap))
return PTR_ERR(newmap);
if (newmap != osdc->osdmap) {
/*
* Preserve ->was_full before destroying the old map.
* For pools that weren't in the old map, ->was_full
* should be false.
*/
for (n = rb_first(&newmap->pg_pools); n; n = rb_next(n)) {
struct ceph_pg_pool_info *pi =
rb_entry(n, struct ceph_pg_pool_info, node);
struct ceph_pg_pool_info *old_pi;
old_pi = ceph_pg_pool_by_id(osdc->osdmap, pi->id);
if (old_pi)
pi->was_full = old_pi->was_full;
else
WARN_ON(pi->was_full);
}
if (osdc->osdmap->epoch &&
osdc->osdmap->epoch + 1 < newmap->epoch) {
WARN_ON(incremental);
skipped_map = true;
}
ceph_osdmap_destroy(osdc->osdmap);
osdc->osdmap = newmap;
}
was_full &= !ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
kick_requests(osdc, skipped_map, was_full);
return 0;
}
/* /*
* Process updated osd map. * Process updated osd map.
...@@ -2247,27 +2345,29 @@ static void kick_requests(struct ceph_osd_client *osdc, bool force_resend, ...@@ -2247,27 +2345,29 @@ static void kick_requests(struct ceph_osd_client *osdc, bool force_resend,
*/ */
void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
{ {
void *p, *end, *next; void *p = msg->front.iov_base;
void *const end = p + msg->front.iov_len;
u32 nr_maps, maplen; u32 nr_maps, maplen;
u32 epoch; u32 epoch;
struct ceph_osdmap *newmap = NULL, *oldmap;
int err;
struct ceph_fsid fsid; struct ceph_fsid fsid;
bool was_full; bool handled_incremental = false;
bool was_pauserd, was_pausewr;
bool pauserd, pausewr;
int err;
dout("handle_map have %u\n", osdc->osdmap->epoch); dout("%s have %u\n", __func__, osdc->osdmap->epoch);
p = msg->front.iov_base; down_write(&osdc->map_sem);
end = p + msg->front.iov_len;
/* verify fsid */ /* verify fsid */
ceph_decode_need(&p, end, sizeof(fsid), bad); ceph_decode_need(&p, end, sizeof(fsid), bad);
ceph_decode_copy(&p, &fsid, sizeof(fsid)); ceph_decode_copy(&p, &fsid, sizeof(fsid));
if (ceph_check_fsid(osdc->client, &fsid) < 0) if (ceph_check_fsid(osdc->client, &fsid) < 0)
return; goto bad;
down_write(&osdc->map_sem);
was_full = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL); was_pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD);
was_pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) ||
ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
have_pool_full(osdc);
/* incremental maps */ /* incremental maps */
ceph_decode_32_safe(&p, end, nr_maps, bad); ceph_decode_32_safe(&p, end, nr_maps, bad);
...@@ -2277,33 +2377,22 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) ...@@ -2277,33 +2377,22 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
epoch = ceph_decode_32(&p); epoch = ceph_decode_32(&p);
maplen = ceph_decode_32(&p); maplen = ceph_decode_32(&p);
ceph_decode_need(&p, end, maplen, bad); ceph_decode_need(&p, end, maplen, bad);
next = p + maplen; if (osdc->osdmap->epoch &&
if (osdc->osdmap->epoch+1 == epoch) { osdc->osdmap->epoch + 1 == epoch) {
dout("applying incremental map %u len %d\n", dout("applying incremental map %u len %d\n",
epoch, maplen); epoch, maplen);
newmap = osdmap_apply_incremental(&p, next, err = handle_one_map(osdc, p, p + maplen, true);
osdc->osdmap); if (err)
if (IS_ERR(newmap)) {
err = PTR_ERR(newmap);
goto bad; goto bad;
} handled_incremental = true;
BUG_ON(!newmap);
if (newmap != osdc->osdmap) {
ceph_osdmap_destroy(osdc->osdmap);
osdc->osdmap = newmap;
}
was_full = was_full ||
ceph_osdmap_flag(osdc->osdmap,
CEPH_OSDMAP_FULL);
kick_requests(osdc, 0, was_full);
} else { } else {
dout("ignoring incremental map %u len %d\n", dout("ignoring incremental map %u len %d\n",
epoch, maplen); epoch, maplen);
} }
p = next; p += maplen;
nr_maps--; nr_maps--;
} }
if (newmap) if (handled_incremental)
goto done; goto done;
/* full maps */ /* full maps */
...@@ -2322,50 +2411,35 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) ...@@ -2322,50 +2411,35 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
"older than our %u\n", epoch, maplen, "older than our %u\n", epoch, maplen,
osdc->osdmap->epoch); osdc->osdmap->epoch);
} else { } else {
int skipped_map = 0;
dout("taking full map %u len %d\n", epoch, maplen); dout("taking full map %u len %d\n", epoch, maplen);
newmap = ceph_osdmap_decode(&p, p+maplen); err = handle_one_map(osdc, p, p + maplen, false);
if (IS_ERR(newmap)) { if (err)
err = PTR_ERR(newmap);
goto bad; goto bad;
} }
BUG_ON(!newmap);
oldmap = osdc->osdmap;
osdc->osdmap = newmap;
if (oldmap) {
if (oldmap->epoch + 1 < newmap->epoch)
skipped_map = 1;
ceph_osdmap_destroy(oldmap);
}
was_full = was_full ||
ceph_osdmap_flag(osdc->osdmap,
CEPH_OSDMAP_FULL);
kick_requests(osdc, skipped_map, was_full);
}
p += maplen; p += maplen;
nr_maps--; nr_maps--;
} }
done: done:
downgrade_write(&osdc->map_sem);
ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
osdc->osdmap->epoch);
/* /*
* subscribe to subsequent osdmap updates if full to ensure * subscribe to subsequent osdmap updates if full to ensure
* we find out when we are no longer full and stop returning * we find out when we are no longer full and stop returning
* ENOSPC. * ENOSPC.
*/ */
if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) || pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD);
ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD) || pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) ||
ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR)) ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
ceph_monc_request_next_osdmap(&osdc->client->monc); have_pool_full(osdc);
if (was_pauserd || was_pausewr || pauserd || pausewr)
maybe_request_map(osdc);
mutex_lock(&osdc->request_mutex); mutex_lock(&osdc->request_mutex);
__send_queued(osdc); __send_queued(osdc);
mutex_unlock(&osdc->request_mutex); mutex_unlock(&osdc->request_mutex);
up_read(&osdc->map_sem);
ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
osdc->osdmap->epoch);
up_write(&osdc->map_sem);
wake_up_all(&osdc->client->auth_wq); wake_up_all(&osdc->client->auth_wq);
return; return;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment