Commit 85745b72 authored by Heinrich Lee Yu's avatar Heinrich Lee Yu

Merge branch 'jv-limited-capacity-worker' into 'master'

Decouple LimitedCapacity::Worker from Sidekiq queue depth

See merge request gitlab-org/gitlab!60415
parents 6ad3569d 9b02b9f6
......@@ -3,21 +3,30 @@ module LimitedCapacity
class JobTracker # rubocop:disable Scalability/IdempotentWorker
include Gitlab::Utils::StrongMemoize
LUA_REGISTER_SCRIPT = <<~EOS
local set_key, element, max_elements = KEYS[1], ARGV[1], ARGV[2]
if redis.call("scard", set_key) < tonumber(max_elements) then
redis.call("sadd", set_key, element)
return true
end
return false
EOS
def initialize(namespace)
@namespace = namespace
end
def register(jid)
_added, @count = with_redis_pipeline do |redis|
register_job_keys(redis, jid)
get_job_count(redis)
end
def register(jid, max_jids)
with_redis do |redis|
redis.eval(LUA_REGISTER_SCRIPT, keys: [counter_key], argv: [jid, max_jids])
end.present?
end
def remove(jid)
_removed, @count = with_redis_pipeline do |redis|
with_redis do |redis|
remove_job_keys(redis, jid)
get_job_count(redis)
end
end
......@@ -25,14 +34,13 @@ module LimitedCapacity
completed_jids = Gitlab::SidekiqStatus.completed_jids(running_jids)
return unless completed_jids.any?
_removed, @count = with_redis_pipeline do |redis|
with_redis do |redis|
remove_job_keys(redis, completed_jids)
get_job_count(redis)
end
end
def count
@count ||= with_redis { |redis| get_job_count(redis) }
with_redis { |redis| redis.scard(counter_key) }
end
def running_jids
......@@ -49,14 +57,6 @@ module LimitedCapacity
"worker:#{namespace.to_s.underscore}:running"
end
def get_job_count(redis)
redis.scard(counter_key)
end
def register_job_keys(redis, keys)
redis.sadd(counter_key, keys)
end
def remove_job_keys(redis, keys)
redis.srem(counter_key, keys)
end
......@@ -64,11 +64,5 @@ module LimitedCapacity
def with_redis(&block)
Gitlab::Redis::Queues.with(&block) # rubocop: disable CodeReuse/ActiveRecord
end
def with_redis_pipeline(&block)
with_redis do |redis|
redis.pipelined(&block)
end
end
end
end
......@@ -55,26 +55,14 @@ module LimitedCapacity
def perform_with_capacity(*args)
worker = self.new
worker.remove_failed_jobs
worker.report_prometheus_metrics(*args)
required_jobs_count = worker.required_jobs_count(*args)
arguments = Array.new(required_jobs_count) { args }
arguments = Array.new(worker.max_running_jobs) { args }
self.bulk_perform_async(arguments) # rubocop:disable Scalability/BulkPerformWithContext
end
end
def perform(*args)
return unless has_capacity?
job_tracker.register(jid)
report_running_jobs_metrics
perform_work(*args)
rescue StandardError => exception
raise
ensure
job_tracker.remove(jid)
report_prometheus_metrics(*args)
re_enqueue(*args) unless exception
perform_registered(*args) if job_tracker.register(jid, max_running_jobs)
end
def perform_work(*args)
......@@ -89,43 +77,32 @@ module LimitedCapacity
raise NotImplementedError
end
def has_capacity?
remaining_capacity > 0
end
def remaining_capacity
[
max_running_jobs - running_jobs_count - self.class.queue_size,
0
].max
end
def has_work?(*args)
remaining_work_count(*args) > 0
end
def remove_failed_jobs
job_tracker.clean_up
end
def report_prometheus_metrics(*args)
report_running_jobs_metrics
remaining_work_gauge.set(prometheus_labels, remaining_work_count(*args))
max_running_jobs_gauge.set(prometheus_labels, max_running_jobs)
set_metric(:remaining_work_gauge, remaining_work_count(*args))
set_metric(:max_running_jobs_gauge, max_running_jobs)
end
def report_running_jobs_metrics
running_jobs_gauge.set(prometheus_labels, running_jobs_count)
end
private
def required_jobs_count(*args)
[
remaining_work_count(*args),
remaining_capacity
].min
def perform_registered(*args)
report_running_jobs_metrics
perform_work(*args)
rescue StandardError => exception
raise
ensure
job_tracker.remove(jid)
report_prometheus_metrics(*args)
re_enqueue(*args) unless exception
end
private
def report_running_jobs_metrics
set_metric(:running_jobs_gauge, running_jobs_count)
end
def running_jobs_count
job_tracker.count
......@@ -138,32 +115,21 @@ module LimitedCapacity
end
def re_enqueue(*args)
return unless has_capacity?
return unless has_work?(*args)
return unless remaining_work_count(*args) > 0
self.class.perform_async(*args)
end
def running_jobs_gauge
strong_memoize(:running_jobs_gauge) do
Gitlab::Metrics.gauge(:limited_capacity_worker_running_jobs, 'Number of running jobs')
end
end
def max_running_jobs_gauge
strong_memoize(:max_running_jobs_gauge) do
Gitlab::Metrics.gauge(:limited_capacity_worker_max_running_jobs, 'Maximum number of running jobs')
end
end
def remaining_work_gauge
strong_memoize(:remaining_work_gauge) do
Gitlab::Metrics.gauge(:limited_capacity_worker_remaining_work_count, 'Number of jobs waiting to be enqueued')
end
def set_metric(name, value)
metrics = strong_memoize(:metrics) do
{
running_jobs_gauge: Gitlab::Metrics.gauge(:limited_capacity_worker_running_jobs, 'Number of running jobs'),
max_running_jobs_gauge: Gitlab::Metrics.gauge(:limited_capacity_worker_max_running_jobs, 'Maximum number of running jobs'),
remaining_work_gauge: Gitlab::Metrics.gauge(:limited_capacity_worker_remaining_work_count, 'Number of jobs waiting to be enqueued')
}
end
def prometheus_labels
{ worker: self.class.name }
metrics[name].set({ worker: self.class.name }, value)
end
end
end
......@@ -46,7 +46,7 @@ module Geo
# Secondaries don't need to run this since they will receive an event for each
# rechecksummed resource: https://gitlab.com/gitlab-org/gitlab/-/issues/13842
::Geo::ReverificationBatchWorker.perform_async(replicable_name) if ::Gitlab::Geo.primary?
::Geo::ReverificationBatchWorker.perform_with_capacity(replicable_name) if ::Gitlab::Geo.primary?
end
# Called by VerificationBatchWorker.
......
......@@ -28,7 +28,7 @@ module Geo
replicator_class = replicator_class_for(replicable_name)
@remaining_work_count ||= replicator_class
.remaining_reverification_batch_count(max_batch_count: remaining_capacity)
.remaining_reverification_batch_count(max_batch_count: max_running_jobs)
end
def max_running_jobs
......
......@@ -28,7 +28,7 @@ module Geo
replicator_class = replicator_class_for(replicable_name)
@remaining_work_count ||= replicator_class
.remaining_verification_batch_count(max_batch_count: remaining_capacity)
.remaining_verification_batch_count(max_batch_count: max_running_jobs)
end
def replicator_class_for(replicable_name)
......
......@@ -148,7 +148,7 @@ RSpec.shared_examples 'a verifiable replicator' do
it 'does not enqueue ReverificationBatchWorker' do
stub_secondary_node
expect(::Geo::ReverificationBatchWorker).not_to receive(:perform_async)
expect(::Geo::ReverificationBatchWorker).not_to receive(:perform_with_capacity)
described_class.trigger_background_verification
end
......@@ -158,7 +158,7 @@ RSpec.shared_examples 'a verifiable replicator' do
it 'enqueues ReverificationBatchWorker' do
stub_primary_node
expect(::Geo::ReverificationBatchWorker).to receive(:perform_async).with(described_class.replicable_name)
expect(::Geo::ReverificationBatchWorker).to receive(:perform_with_capacity).with(described_class.replicable_name)
described_class.trigger_background_verification
end
......
......@@ -38,7 +38,7 @@ RSpec.describe Geo::VerificationBatchWorker, :geo do
it 'returns remaining_verification_batch_count' do
expected = 7
args = { max_batch_count: 95 }
allow(job).to receive(:remaining_capacity).and_return(args[:max_batch_count])
allow(job).to receive(:max_running_jobs).and_return(args[:max_batch_count])
allow(::Gitlab::Geo::Replicator).to receive(:for_replicable_name).with(replicable_name).and_return(replicator_class)
expect(replicator_class).to receive(:remaining_verification_batch_count).with(args).and_return(expected)
......
......@@ -7,30 +7,30 @@ RSpec.describe LimitedCapacity::JobTracker, :clean_gitlab_redis_queues do
described_class.new('namespace')
end
let(:max_jids) { 10 }
describe '#register' do
it 'adds jid to the set' do
job_tracker.register('a-job-id')
expect(job_tracker.register('a-job-id', max_jids)). to be true
expect(job_tracker.running_jids).to contain_exactly('a-job-id')
end
it 'updates the counter' do
expect { job_tracker.register('a-job-id') }
.to change { job_tracker.count }
.from(0)
.to(1)
it 'returns false if the jid was not added' do
max_jids = 2
%w[jid1 jid2].each do |jid|
expect(job_tracker.register(jid, max_jids)).to be true
end
it 'does it in only one Redis call' do
expect(job_tracker).to receive(:with_redis).once.and_call_original
job_tracker.register('a-job-id')
expect(job_tracker.register('jid3', max_jids)).to be false
expect(job_tracker.running_jids).to contain_exactly(*%w[jid1 jid2])
end
end
describe '#remove' do
before do
job_tracker.register(%w[a-job-id other-job-id])
%w[a-job-id other-job-id].each do |jid|
job_tracker.register(jid, max_jids)
end
end
it 'removes jid from the set' do
......@@ -38,24 +38,11 @@ RSpec.describe LimitedCapacity::JobTracker, :clean_gitlab_redis_queues do
expect(job_tracker.running_jids).to contain_exactly('a-job-id')
end
it 'updates the counter' do
expect { job_tracker.remove('other-job-id') }
.to change { job_tracker.count }
.from(2)
.to(1)
end
it 'does it in only one Redis call' do
expect(job_tracker).to receive(:with_redis).once.and_call_original
job_tracker.remove('other-job-id')
end
end
describe '#clean_up' do
before do
job_tracker.register('a-job-id')
job_tracker.register('a-job-id', max_jids)
end
context 'with running jobs' do
......@@ -83,13 +70,6 @@ RSpec.describe LimitedCapacity::JobTracker, :clean_gitlab_redis_queues do
.to change { job_tracker.running_jids.include?('a-job-id') }
end
it 'updates the counter' do
expect { job_tracker.clean_up }
.to change { job_tracker.count }
.from(1)
.to(0)
end
it 'gets the job ids, removes them, and updates the counter with only two Redis calls' do
expect(job_tracker).to receive(:with_redis).twice.and_call_original
......
......@@ -44,34 +44,17 @@ RSpec.describe LimitedCapacity::Worker, :clean_gitlab_redis_queues, :aggregate_f
describe '.perform_with_capacity' do
subject(:perform_with_capacity) { worker_class.perform_with_capacity(:arg) }
let(:max_running_jobs) { 3 }
before do
expect_next_instance_of(worker_class) do |instance|
expect(instance).to receive(:remove_failed_jobs)
expect(instance).to receive(:report_prometheus_metrics)
allow(instance).to receive(:remaining_work_count).and_return(remaining_work_count)
allow(instance).to receive(:remaining_capacity).and_return(remaining_capacity)
end
end
context 'when capacity is larger than work' do
let(:remaining_work_count) { 2 }
let(:remaining_capacity) { 3 }
it 'enqueues jobs for remaining work' do
expect(worker_class)
.to receive(:bulk_perform_async)
.with([[:arg], [:arg]])
perform_with_capacity
allow(instance).to receive(:max_running_jobs).and_return(max_running_jobs)
end
end
context 'when capacity is lower than work' do
let(:remaining_work_count) { 5 }
let(:remaining_capacity) { 3 }
it 'enqueues jobs for remaining work' do
it 'enqueues jobs' do
expect(worker_class)
.to receive(:bulk_perform_async)
.with([[:arg], [:arg], [:arg]])
......@@ -79,7 +62,6 @@ RSpec.describe LimitedCapacity::Worker, :clean_gitlab_redis_queues, :aggregate_f
perform_with_capacity
end
end
end
describe '#perform' do
subject(:perform) { worker.perform(:arg) }
......@@ -104,34 +86,27 @@ RSpec.describe LimitedCapacity::Worker, :clean_gitlab_redis_queues, :aggregate_f
perform
end
it 'registers itself in the running set' do
it 'reports prometheus metrics' do
allow(worker).to receive(:perform_work)
expect(job_tracker).to receive(:register).with('my-jid')
expect(worker).to receive(:report_prometheus_metrics).once.and_call_original
expect(worker).to receive(:report_running_jobs_metrics).twice.and_call_original
perform
end
it 'removes itself from the running set' do
expect(job_tracker).to receive(:remove).with('my-jid')
it 'updates the running set' do
expect(job_tracker.running_jids).to be_empty
allow(worker).to receive(:perform_work)
perform
end
it 'reports prometheus metrics' do
allow(worker).to receive(:perform_work)
expect(worker).to receive(:report_prometheus_metrics).once.and_call_original
expect(worker).to receive(:report_running_jobs_metrics).twice.and_call_original
perform
expect(job_tracker.running_jids).to be_empty
end
end
context 'with capacity and without work' do
before do
allow(worker).to receive(:max_running_jobs).and_return(10)
allow(worker).to receive(:running_jobs_count).and_return(0)
allow(worker).to receive(:remaining_work_count).and_return(0)
allow(worker).to receive(:perform_work)
end
......@@ -146,7 +121,7 @@ RSpec.describe LimitedCapacity::Worker, :clean_gitlab_redis_queues, :aggregate_f
context 'without capacity' do
before do
allow(worker).to receive(:max_running_jobs).and_return(10)
allow(worker).to receive(:running_jobs_count).and_return(15)
allow(job_tracker).to receive(:register).and_return(false)
allow(worker).to receive(:remaining_work_count).and_return(10)
end
......@@ -161,27 +136,14 @@ RSpec.describe LimitedCapacity::Worker, :clean_gitlab_redis_queues, :aggregate_f
perform
end
it 'does not register in the running set' do
expect(job_tracker).not_to receive(:register)
perform
end
it 'removes itself from the running set' do
expect(job_tracker).to receive(:remove).with('my-jid')
perform
end
it 'reports prometheus metrics' do
expect(worker).to receive(:report_prometheus_metrics)
perform
end
context 'when perform_work fails' do
before do
allow(worker).to receive(:max_running_jobs).and_return(10)
allow(job_tracker).to receive(:register).and_return(true)
end
context 'when perform_work fails' do
it 'does not re-enqueue itself' do
expect(worker).not_to receive(:re_enqueue)
......@@ -189,7 +151,7 @@ RSpec.describe LimitedCapacity::Worker, :clean_gitlab_redis_queues, :aggregate_f
end
it 'removes itself from the running set' do
expect(job_tracker).to receive(:remove)
expect(job_tracker).to receive(:remove).with('my-jid')
expect { perform }.to raise_error(NotImplementedError)
end
......@@ -202,65 +164,14 @@ RSpec.describe LimitedCapacity::Worker, :clean_gitlab_redis_queues, :aggregate_f
end
end
describe '#remaining_capacity' do
subject(:remaining_capacity) { worker.remaining_capacity }
before do
expect(worker).to receive(:max_running_jobs).and_return(max_capacity)
end
context 'when changing the capacity to a lower value' do
let(:max_capacity) { -1 }
it { expect(remaining_capacity).to eq(0) }
end
context 'when registering new jobs' do
let(:max_capacity) { 2 }
before do
job_tracker.register('a-job-id')
end
it { expect(remaining_capacity).to eq(1) }
end
context 'with jobs in the queue' do
let(:max_capacity) { 2 }
before do
expect(worker_class).to receive(:queue_size).and_return(1)
end
it { expect(remaining_capacity).to eq(1) }
end
context 'with both running jobs and queued jobs' do
let(:max_capacity) { 10 }
before do
expect(worker_class).to receive(:queue_size).and_return(5)
expect(worker).to receive(:running_jobs_count).and_return(3)
end
it { expect(remaining_capacity).to eq(2) }
end
end
describe '#remove_failed_jobs' do
subject(:remove_failed_jobs) { worker.remove_failed_jobs }
before do
job_tracker.register('a-job-id')
allow(worker).to receive(:max_running_jobs).and_return(2)
it 'removes failed jobs' do
job_tracker.register('a-job-id', 10)
expect(job_tracker).to receive(:clean_up).and_call_original
end
context 'with failed jobs' do
it 'update the available capacity' do
expect { remove_failed_jobs }.to change { worker.remaining_capacity }.by(1)
end
expect { remove_failed_jobs }.to change { job_tracker.running_jids.size }.by(-1)
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment