Commit 9d521470 authored by Ilya Dryomov's avatar Ilya Dryomov Committed by Yan, Zheng

libceph: a per-osdc crush scratch buffer

With the addition of erasure coding support in the future, scratch
variable-length array in crush_do_rule_ary() is going to grow to at
least 200 bytes on average, on top of another 128 bytes consumed by
rawosd/osd arrays in the call chain.  Replace it with a buffer inside
struct osdmap and a mutex.  This shouldn't result in any contention,
because all osd requests were already serialized by request_mutex at
that point; the only unlocked caller was ceph_ioctl_get_dataloc().
Signed-off-by: default avatarIlya Dryomov <ilya.dryomov@inktank.com>
Reviewed-by: default avatarSage Weil <sage@inktank.com>
parent 455c6fdb
...@@ -84,6 +84,9 @@ struct ceph_osdmap { ...@@ -84,6 +84,9 @@ struct ceph_osdmap {
/* the CRUSH map specifies the mapping of placement groups to /* the CRUSH map specifies the mapping of placement groups to
* the list of osds that store+replicate them. */ * the list of osds that store+replicate them. */
struct crush_map *crush; struct crush_map *crush;
struct mutex crush_scratch_mutex;
int crush_scratch_ary[CEPH_PG_MAX_SIZE * 3];
}; };
static inline void ceph_oid_set_name(struct ceph_object_id *oid, static inline void ceph_oid_set_name(struct ceph_object_id *oid,
......
...@@ -698,7 +698,9 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) ...@@ -698,7 +698,9 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
map = kzalloc(sizeof(*map), GFP_NOFS); map = kzalloc(sizeof(*map), GFP_NOFS);
if (map == NULL) if (map == NULL)
return ERR_PTR(-ENOMEM); return ERR_PTR(-ENOMEM);
map->pg_temp = RB_ROOT; map->pg_temp = RB_ROOT;
mutex_init(&map->crush_scratch_mutex);
ceph_decode_16_safe(p, end, version, bad); ceph_decode_16_safe(p, end, version, bad);
if (version > 6) { if (version > 6) {
...@@ -1142,14 +1144,20 @@ int ceph_oloc_oid_to_pg(struct ceph_osdmap *osdmap, ...@@ -1142,14 +1144,20 @@ int ceph_oloc_oid_to_pg(struct ceph_osdmap *osdmap,
} }
EXPORT_SYMBOL(ceph_oloc_oid_to_pg); EXPORT_SYMBOL(ceph_oloc_oid_to_pg);
static int crush_do_rule_ary(const struct crush_map *map, int ruleno, int x, static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
int *result, int result_max, int *result, int result_max,
const __u32 *weight, int weight_max) const __u32 *weight, int weight_max)
{ {
int scratch[result_max * 3]; int r;
BUG_ON(result_max > CEPH_PG_MAX_SIZE);
mutex_lock(&map->crush_scratch_mutex);
r = crush_do_rule(map->crush, ruleno, x, result, result_max,
weight, weight_max, map->crush_scratch_ary);
mutex_unlock(&map->crush_scratch_mutex);
return crush_do_rule(map, ruleno, x, result, result_max, return r;
weight, weight_max, scratch);
} }
/* /*
...@@ -1205,8 +1213,7 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, ...@@ -1205,8 +1213,7 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
pool->pgp_num_mask) + pool->pgp_num_mask) +
(unsigned)pgid.pool; (unsigned)pgid.pool;
} }
r = crush_do_rule_ary(osdmap->crush, ruleno, pps, r = do_crush(osdmap, ruleno, pps, osds, min_t(int, pool->size, *num),
osds, min_t(int, pool->size, *num),
osdmap->osd_weight, osdmap->max_osd); osdmap->osd_weight, osdmap->max_osd);
if (r < 0) { if (r < 0) {
pr_err("error %d from crush rule: pool %lld ruleset %d type %d" pr_err("error %d from crush rule: pool %lld ruleset %d type %d"
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment