Commit 6e8d0b78 authored by Nick Thomas's avatar Nick Thomas

Use event-based waiting in Gitlab::JobWaiter

parent 6509833c
......@@ -4,18 +4,25 @@ class AuthorizedProjectsWorker
# Schedules multiple jobs and waits for them to be completed.
def self.bulk_perform_and_wait(args_list)
job_ids = bulk_perform_async(args_list)
waiter = Gitlab::JobWaiter.new(args_list.size)
Gitlab::JobWaiter.new(job_ids).wait
# Point all the bulk jobs at the same JobWaiter. Converts, [[1], [2], [3]]
# into [[1, "key"], [2, "key"], [3, "key"]]
waiting_args_list = args_list.map { |args| args << waiter.key }
bulk_perform_async(waiting_args_list)
waiter.wait
end
def self.bulk_perform_async(args_list)
Sidekiq::Client.push_bulk('class' => self, 'queue' => sidekiq_options['queue'], 'args' => args_list)
end
def perform(user_id)
def perform(user_id, notify_key = nil)
user = User.find_by(id: user_id)
user&.refresh_authorized_projects
ensure
Gitlab::JobWaiter.notify(notify_key, jid) if notify_key
end
end
module Gitlab
# JobWaiter can be used to wait for a number of Sidekiq jobs to complete.
#
# Its use requires the cooperation of the sidekiq jobs themselves. Set up the
# waiter, then start the jobs, passing them its `key`. Their `perform` methods
# should look like:
#
# def perform(args, notify_key)
# # do work
# ensure
# ::Gitlab::JobWaiter.notify(notify_key, jid)
# end
#
# The JobWaiter blocks popping items from a Redis array. All the sidekiq jobs
# push to that array when done. Once the waiter has popped `count` items, it
# knows all the jobs are done.
class JobWaiter
# The sleep interval between checking keys, in seconds.
INTERVAL = 0.1
def self.notify(key, jid)
Gitlab::Redis::SharedState.with { |redis| redis.lpush(key, jid) }
end
attr_reader :key, :jobs_remaining, :finished
# jobs - The job IDs to wait for.
def initialize(jobs)
@jobs = jobs
# jobs_remaining - the number of jobs left to wait for
def initialize(jobs_remaining)
@key = "gitlab:job_waiter:#{SecureRandom.uuid}"
@jobs_remaining = jobs_remaining
@finished = []
end
# Waits for all the jobs to be completed.
......@@ -15,13 +34,33 @@ module Gitlab
# ensures we don't indefinitely block a caller in case a job takes
# long to process, or is never processed.
def wait(timeout = 10)
start = Time.current
deadline = Time.now.utc + timeout
Gitlab::Redis::SharedState.with do |redis|
# Fallback key expiry: allow a long grace period to reduce the chance of
# a job pushing to an expired key and recreating it
redis.expire(key, [timeout * 2, 10.minutes.to_i].max)
while jobs_remaining > 0
# Redis will not take fractional seconds. Prefer waiting too long over
# not waiting long enough
seconds_left = (deadline - Time.now.utc).ceil
while (Time.current - start) <= timeout
break if SidekiqStatus.all_completed?(@jobs)
# Redis interprets 0 as "wait forever", so skip the final `blpop` call
break if seconds_left <= 0
sleep(INTERVAL) # to not overload Redis too much.
list, jid = redis.blpop(key, timeout: seconds_left)
break unless list && jid # timed out
@finished << jid
@jobs_remaining -= 1
end
# All jobs have finished, so expire the key immediately
redis.expire(key, 0) if jobs_remaining == 0
end
finished
end
end
end
require 'spec_helper'
describe Gitlab::JobWaiter do
describe '#wait' do
let(:waiter) { described_class.new(%w(a)) }
it 'returns when all jobs have been completed' do
expect(Gitlab::SidekiqStatus).to receive(:all_completed?).with(%w(a))
.and_return(true)
describe '.notify' do
it 'pushes the jid to the named queue' do
key = 'gitlab:job_waiter:foo'
jid = 1
expect(waiter).not_to receive(:sleep)
redis = double('redis')
expect(Gitlab::Redis::SharedState).to receive(:with).and_yield(redis)
expect(redis).to receive(:lpush).with(key, jid)
waiter.wait
described_class.notify(key, jid)
end
end
describe '#wait' do
let(:waiter) { described_class.new(2) }
it 'sleeps between checking the job statuses' do
expect(Gitlab::SidekiqStatus).to receive(:all_completed?)
.with(%w(a))
.and_return(false, true)
it 'returns when all jobs have been completed' do
described_class.notify(waiter.key, 'a')
described_class.notify(waiter.key, 'b')
expect(waiter).to receive(:sleep).with(described_class::INTERVAL)
result = nil
expect { Timeout.timeout(1) { result = waiter.wait(2) } }.not_to raise_error
waiter.wait
expect(result).to contain_exactly('a', 'b')
end
it 'returns when timing out' do
expect(waiter).not_to receive(:sleep)
waiter.wait(0)
it 'times out if not all jobs complete' do
described_class.notify(waiter.key, 'a')
result = nil
expect { Timeout.timeout(2) { result = waiter.wait(1) } }.not_to raise_error
expect(result).to contain_exactly('a')
end
end
end
......@@ -29,21 +29,27 @@ describe AuthorizedProjectsWorker do
end
describe '#perform' do
subject { described_class.new }
let(:user) { create(:user) }
it "refreshes user's authorized projects" do
user = create(:user)
subject(:job) { described_class.new }
it "refreshes user's authorized projects" do
expect_any_instance_of(User).to receive(:refresh_authorized_projects)
subject.perform(user.id)
job.perform(user.id)
end
it 'notifies the JobWaiter when done if the key is provided' do
expect(Gitlab::JobWaiter).to receive(:notify).with('notify-key', job.jid)
job.perform(user.id, 'notify-key')
end
context "when the user is not found" do
it "does nothing" do
expect_any_instance_of(User).not_to receive(:refresh_authorized_projects)
subject.perform(-1)
job.perform(-1)
end
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment