Commit 3f1af42a authored by Ilya Dryomov's avatar Ilya Dryomov

libceph: enable large, variable-sized OSD requests

Turn r_ops into a flexible array member to enable large, consisting of
up to 16 ops, OSD requests.  The use case is scattered writeback in
cephfs and, as far as the kernel client is concerned, 16 is just a made
up number.

r_ops had size 3 for copyup+hint+write, but copyup is really a special
case - it can only happen once.  ceph_osd_request_cache is therefore
stuffed with num_ops=2 requests, anything bigger than that is allocated
with kmalloc().  req_mempool is backed by ceph_osd_request_cache, which
means either num_ops=1 or num_ops=2 for use_mempool=true - all existing
users (ceph_writepages_start(), ceph_osdc_writepages()) are fine with
that.
Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent 9e767adb
...@@ -1847,8 +1847,6 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req, ...@@ -1847,8 +1847,6 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
if (osd_req->r_result < 0) if (osd_req->r_result < 0)
obj_request->result = osd_req->r_result; obj_request->result = osd_req->r_result;
rbd_assert(osd_req->r_num_ops <= CEPH_OSD_MAX_OP);
/* /*
* We support a 64-bit length, but ultimately it has to be * We support a 64-bit length, but ultimately it has to be
* passed to the block layer, which just supports a 32-bit * passed to the block layer, which just supports a 32-bit
......
...@@ -43,7 +43,8 @@ struct ceph_osd { ...@@ -43,7 +43,8 @@ struct ceph_osd {
}; };
#define CEPH_OSD_MAX_OP 3 #define CEPH_OSD_SLAB_OPS 2
#define CEPH_OSD_MAX_OPS 16
enum ceph_osd_data_type { enum ceph_osd_data_type {
CEPH_OSD_DATA_TYPE_NONE = 0, CEPH_OSD_DATA_TYPE_NONE = 0,
...@@ -139,7 +140,6 @@ struct ceph_osd_request { ...@@ -139,7 +140,6 @@ struct ceph_osd_request {
/* request osd ops array */ /* request osd ops array */
unsigned int r_num_ops; unsigned int r_num_ops;
struct ceph_osd_req_op r_ops[CEPH_OSD_MAX_OP];
/* these are updated on each send */ /* these are updated on each send */
__le32 *r_request_osdmap_epoch; __le32 *r_request_osdmap_epoch;
...@@ -175,6 +175,8 @@ struct ceph_osd_request { ...@@ -175,6 +175,8 @@ struct ceph_osd_request {
unsigned long r_stamp; /* send OR check time */ unsigned long r_stamp; /* send OR check time */
struct ceph_snap_context *r_snapc; /* snap context for writes */ struct ceph_snap_context *r_snapc; /* snap context for writes */
struct ceph_osd_req_op r_ops[];
}; };
struct ceph_request_redirect { struct ceph_request_redirect {
......
...@@ -338,9 +338,10 @@ static void ceph_osdc_release_request(struct kref *kref) ...@@ -338,9 +338,10 @@ static void ceph_osdc_release_request(struct kref *kref)
ceph_put_snap_context(req->r_snapc); ceph_put_snap_context(req->r_snapc);
if (req->r_mempool) if (req->r_mempool)
mempool_free(req, req->r_osdc->req_mempool); mempool_free(req, req->r_osdc->req_mempool);
else else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS)
kmem_cache_free(ceph_osd_request_cache, req); kmem_cache_free(ceph_osd_request_cache, req);
else
kfree(req);
} }
void ceph_osdc_get_request(struct ceph_osd_request *req) void ceph_osdc_get_request(struct ceph_osd_request *req)
...@@ -369,18 +370,22 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, ...@@ -369,18 +370,22 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
struct ceph_msg *msg; struct ceph_msg *msg;
size_t msg_size; size_t msg_size;
BUILD_BUG_ON(CEPH_OSD_MAX_OP > U16_MAX);
BUG_ON(num_ops > CEPH_OSD_MAX_OP);
if (use_mempool) { if (use_mempool) {
BUG_ON(num_ops > CEPH_OSD_SLAB_OPS);
req = mempool_alloc(osdc->req_mempool, gfp_flags); req = mempool_alloc(osdc->req_mempool, gfp_flags);
memset(req, 0, sizeof(*req)); } else if (num_ops <= CEPH_OSD_SLAB_OPS) {
req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags);
} else { } else {
req = kmem_cache_zalloc(ceph_osd_request_cache, gfp_flags); BUG_ON(num_ops > CEPH_OSD_MAX_OPS);
req = kmalloc(sizeof(*req) + num_ops * sizeof(req->r_ops[0]),
gfp_flags);
} }
if (req == NULL) if (unlikely(!req))
return NULL; return NULL;
/* req only, each op is zeroed in _osd_req_op_init() */
memset(req, 0, sizeof(*req));
req->r_osdc = osdc; req->r_osdc = osdc;
req->r_mempool = use_mempool; req->r_mempool = use_mempool;
req->r_num_ops = num_ops; req->r_num_ops = num_ops;
...@@ -398,12 +403,19 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, ...@@ -398,12 +403,19 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
req->r_base_oloc.pool = -1; req->r_base_oloc.pool = -1;
req->r_target_oloc.pool = -1; req->r_target_oloc.pool = -1;
msg_size = OSD_OPREPLY_FRONT_LEN;
if (num_ops > CEPH_OSD_SLAB_OPS) {
/* ceph_osd_op and rval */
msg_size += (num_ops - CEPH_OSD_SLAB_OPS) *
(sizeof(struct ceph_osd_op) + 4);
}
/* create reply message */ /* create reply message */
if (use_mempool) if (use_mempool)
msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
else else
msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size,
OSD_OPREPLY_FRONT_LEN, gfp_flags, true); gfp_flags, true);
if (!msg) { if (!msg) {
ceph_osdc_put_request(req); ceph_osdc_put_request(req);
return NULL; return NULL;
...@@ -1811,7 +1823,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) ...@@ -1811,7 +1823,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
ceph_decode_need(&p, end, 4, bad_put); ceph_decode_need(&p, end, 4, bad_put);
numops = ceph_decode_32(&p); numops = ceph_decode_32(&p);
if (numops > CEPH_OSD_MAX_OP) if (numops > CEPH_OSD_MAX_OPS)
goto bad_put; goto bad_put;
if (numops != req->r_num_ops) if (numops != req->r_num_ops)
goto bad_put; goto bad_put;
...@@ -2784,11 +2796,12 @@ EXPORT_SYMBOL(ceph_osdc_writepages); ...@@ -2784,11 +2796,12 @@ EXPORT_SYMBOL(ceph_osdc_writepages);
int ceph_osdc_setup(void) int ceph_osdc_setup(void)
{ {
size_t size = sizeof(struct ceph_osd_request) +
CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op);
BUG_ON(ceph_osd_request_cache); BUG_ON(ceph_osd_request_cache);
ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", size,
sizeof (struct ceph_osd_request), 0, 0, NULL);
__alignof__(struct ceph_osd_request),
0, NULL);
return ceph_osd_request_cache ? 0 : -ENOMEM; return ceph_osd_request_cache ? 0 : -ENOMEM;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment