Commit a3275539 authored by Omar Sandoval's avatar Omar Sandoval Committed by Jens Axboe

sbitmap: fix missed wakeups caused by sbitmap_queue_get_shallow()

The sbitmap queue wake batch is calculated such that once allocations
start blocking, all of the bits which are already allocated must be
enough to fulfill the batch counters of all of the waitqueues. However,
the shallow allocation depth can break this invariant, since we block
before our full depth is being utilized. Add
sbitmap_queue_min_shallow_depth(), which saves the minimum shallow depth
the sbq will use, and update sbq_calc_wake_batch() to take it into
account.
Acked-by: default avatarPaolo Valente <paolo.valente@linaro.org>
Signed-off-by: default avatarOmar Sandoval <osandov@fb.com>
Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent bd7d4ef6
...@@ -127,6 +127,12 @@ struct sbitmap_queue { ...@@ -127,6 +127,12 @@ struct sbitmap_queue {
* @round_robin: Allocate bits in strict round-robin order. * @round_robin: Allocate bits in strict round-robin order.
*/ */
bool round_robin; bool round_robin;
/**
* @min_shallow_depth: The minimum shallow depth which may be passed to
* sbitmap_queue_get_shallow() or __sbitmap_queue_get_shallow().
*/
unsigned int min_shallow_depth;
}; };
/** /**
...@@ -390,6 +396,9 @@ int __sbitmap_queue_get(struct sbitmap_queue *sbq); ...@@ -390,6 +396,9 @@ int __sbitmap_queue_get(struct sbitmap_queue *sbq);
* @shallow_depth: The maximum number of bits to allocate from a single word. * @shallow_depth: The maximum number of bits to allocate from a single word.
* See sbitmap_get_shallow(). * See sbitmap_get_shallow().
* *
* If you call this, make sure to call sbitmap_queue_min_shallow_depth() after
* initializing @sbq.
*
* Return: Non-negative allocated bit number if successful, -1 otherwise. * Return: Non-negative allocated bit number if successful, -1 otherwise.
*/ */
int __sbitmap_queue_get_shallow(struct sbitmap_queue *sbq, int __sbitmap_queue_get_shallow(struct sbitmap_queue *sbq,
...@@ -424,6 +433,9 @@ static inline int sbitmap_queue_get(struct sbitmap_queue *sbq, ...@@ -424,6 +433,9 @@ static inline int sbitmap_queue_get(struct sbitmap_queue *sbq,
* @shallow_depth: The maximum number of bits to allocate from a single word. * @shallow_depth: The maximum number of bits to allocate from a single word.
* See sbitmap_get_shallow(). * See sbitmap_get_shallow().
* *
* If you call this, make sure to call sbitmap_queue_min_shallow_depth() after
* initializing @sbq.
*
* Return: Non-negative allocated bit number if successful, -1 otherwise. * Return: Non-negative allocated bit number if successful, -1 otherwise.
*/ */
static inline int sbitmap_queue_get_shallow(struct sbitmap_queue *sbq, static inline int sbitmap_queue_get_shallow(struct sbitmap_queue *sbq,
...@@ -438,6 +450,23 @@ static inline int sbitmap_queue_get_shallow(struct sbitmap_queue *sbq, ...@@ -438,6 +450,23 @@ static inline int sbitmap_queue_get_shallow(struct sbitmap_queue *sbq,
return nr; return nr;
} }
/**
* sbitmap_queue_min_shallow_depth() - Inform a &struct sbitmap_queue of the
* minimum shallow depth that will be used.
* @sbq: Bitmap queue in question.
* @min_shallow_depth: The minimum shallow depth that will be passed to
* sbitmap_queue_get_shallow() or __sbitmap_queue_get_shallow().
*
* sbitmap_queue_clear() batches wakeups as an optimization. The batch size
* depends on the depth of the bitmap. Since the shallow allocation functions
* effectively operate with a different depth, the shallow depth must be taken
* into account when calculating the batch size. This function must be called
* with the minimum shallow depth that will be used. Failure to do so can result
* in missed wakeups.
*/
void sbitmap_queue_min_shallow_depth(struct sbitmap_queue *sbq,
unsigned int min_shallow_depth);
/** /**
* sbitmap_queue_clear() - Free an allocated bit and wake up waiters on a * sbitmap_queue_clear() - Free an allocated bit and wake up waiters on a
* &struct sbitmap_queue. * &struct sbitmap_queue.
......
...@@ -270,18 +270,33 @@ void sbitmap_bitmap_show(struct sbitmap *sb, struct seq_file *m) ...@@ -270,18 +270,33 @@ void sbitmap_bitmap_show(struct sbitmap *sb, struct seq_file *m)
} }
EXPORT_SYMBOL_GPL(sbitmap_bitmap_show); EXPORT_SYMBOL_GPL(sbitmap_bitmap_show);
static unsigned int sbq_calc_wake_batch(unsigned int depth) static unsigned int sbq_calc_wake_batch(struct sbitmap_queue *sbq,
unsigned int depth)
{ {
unsigned int wake_batch; unsigned int wake_batch;
unsigned int shallow_depth;
/* /*
* For each batch, we wake up one queue. We need to make sure that our * For each batch, we wake up one queue. We need to make sure that our
* batch size is small enough that the full depth of the bitmap is * batch size is small enough that the full depth of the bitmap,
* enough to wake up all of the queues. * potentially limited by a shallow depth, is enough to wake up all of
* the queues.
*
* Each full word of the bitmap has bits_per_word bits, and there might
* be a partial word. There are depth / bits_per_word full words and
* depth % bits_per_word bits left over. In bitwise arithmetic:
*
* bits_per_word = 1 << shift
* depth / bits_per_word = depth >> shift
* depth % bits_per_word = depth & ((1 << shift) - 1)
*
* Each word can be limited to sbq->min_shallow_depth bits.
*/ */
wake_batch = SBQ_WAKE_BATCH; shallow_depth = min(1U << sbq->sb.shift, sbq->min_shallow_depth);
if (wake_batch > depth / SBQ_WAIT_QUEUES) depth = ((depth >> sbq->sb.shift) * shallow_depth +
wake_batch = max(1U, depth / SBQ_WAIT_QUEUES); min(depth & ((1U << sbq->sb.shift) - 1), shallow_depth));
wake_batch = clamp_t(unsigned int, depth / SBQ_WAIT_QUEUES, 1,
SBQ_WAKE_BATCH);
return wake_batch; return wake_batch;
} }
...@@ -307,7 +322,8 @@ int sbitmap_queue_init_node(struct sbitmap_queue *sbq, unsigned int depth, ...@@ -307,7 +322,8 @@ int sbitmap_queue_init_node(struct sbitmap_queue *sbq, unsigned int depth,
*per_cpu_ptr(sbq->alloc_hint, i) = prandom_u32() % depth; *per_cpu_ptr(sbq->alloc_hint, i) = prandom_u32() % depth;
} }
sbq->wake_batch = sbq_calc_wake_batch(depth); sbq->min_shallow_depth = UINT_MAX;
sbq->wake_batch = sbq_calc_wake_batch(sbq, depth);
atomic_set(&sbq->wake_index, 0); atomic_set(&sbq->wake_index, 0);
sbq->ws = kzalloc_node(SBQ_WAIT_QUEUES * sizeof(*sbq->ws), flags, node); sbq->ws = kzalloc_node(SBQ_WAIT_QUEUES * sizeof(*sbq->ws), flags, node);
...@@ -327,9 +343,10 @@ int sbitmap_queue_init_node(struct sbitmap_queue *sbq, unsigned int depth, ...@@ -327,9 +343,10 @@ int sbitmap_queue_init_node(struct sbitmap_queue *sbq, unsigned int depth,
} }
EXPORT_SYMBOL_GPL(sbitmap_queue_init_node); EXPORT_SYMBOL_GPL(sbitmap_queue_init_node);
void sbitmap_queue_resize(struct sbitmap_queue *sbq, unsigned int depth) static void sbitmap_queue_update_wake_batch(struct sbitmap_queue *sbq,
unsigned int depth)
{ {
unsigned int wake_batch = sbq_calc_wake_batch(depth); unsigned int wake_batch = sbq_calc_wake_batch(sbq, depth);
int i; int i;
if (sbq->wake_batch != wake_batch) { if (sbq->wake_batch != wake_batch) {
...@@ -342,6 +359,11 @@ void sbitmap_queue_resize(struct sbitmap_queue *sbq, unsigned int depth) ...@@ -342,6 +359,11 @@ void sbitmap_queue_resize(struct sbitmap_queue *sbq, unsigned int depth)
for (i = 0; i < SBQ_WAIT_QUEUES; i++) for (i = 0; i < SBQ_WAIT_QUEUES; i++)
atomic_set(&sbq->ws[i].wait_cnt, 1); atomic_set(&sbq->ws[i].wait_cnt, 1);
} }
}
void sbitmap_queue_resize(struct sbitmap_queue *sbq, unsigned int depth)
{
sbitmap_queue_update_wake_batch(sbq, depth);
sbitmap_resize(&sbq->sb, depth); sbitmap_resize(&sbq->sb, depth);
} }
EXPORT_SYMBOL_GPL(sbitmap_queue_resize); EXPORT_SYMBOL_GPL(sbitmap_queue_resize);
...@@ -403,6 +425,14 @@ int __sbitmap_queue_get_shallow(struct sbitmap_queue *sbq, ...@@ -403,6 +425,14 @@ int __sbitmap_queue_get_shallow(struct sbitmap_queue *sbq,
} }
EXPORT_SYMBOL_GPL(__sbitmap_queue_get_shallow); EXPORT_SYMBOL_GPL(__sbitmap_queue_get_shallow);
void sbitmap_queue_min_shallow_depth(struct sbitmap_queue *sbq,
unsigned int min_shallow_depth)
{
sbq->min_shallow_depth = min_shallow_depth;
sbitmap_queue_update_wake_batch(sbq, sbq->sb.depth);
}
EXPORT_SYMBOL_GPL(sbitmap_queue_min_shallow_depth);
static struct sbq_wait_state *sbq_wake_ptr(struct sbitmap_queue *sbq) static struct sbq_wait_state *sbq_wake_ptr(struct sbitmap_queue *sbq)
{ {
int i, wake_index; int i, wake_index;
...@@ -528,5 +558,6 @@ void sbitmap_queue_show(struct sbitmap_queue *sbq, struct seq_file *m) ...@@ -528,5 +558,6 @@ void sbitmap_queue_show(struct sbitmap_queue *sbq, struct seq_file *m)
seq_puts(m, "}\n"); seq_puts(m, "}\n");
seq_printf(m, "round_robin=%d\n", sbq->round_robin); seq_printf(m, "round_robin=%d\n", sbq->round_robin);
seq_printf(m, "min_shallow_depth=%u\n", sbq->min_shallow_depth);
} }
EXPORT_SYMBOL_GPL(sbitmap_queue_show); EXPORT_SYMBOL_GPL(sbitmap_queue_show);
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment