Commit 51eebe52 authored by Bob Van Landuyt's avatar Bob Van Landuyt

Merge branch 'add-limited-capacity-worker' into 'master'

Add limited capacity worker concern

See merge request gitlab-org/gitlab!42237
parents f90202d1 f31de303
# frozen_string_literal: true
module LimitedCapacity
class JobTracker # rubocop:disable Scalability/IdempotentWorker
include Gitlab::Utils::StrongMemoize
def initialize(namespace)
@namespace = namespace
end
def register(jid)
_added, @count = with_redis_pipeline do |redis|
register_job_keys(redis, jid)
get_job_count(redis)
end
end
def remove(jid)
_removed, @count = with_redis_pipeline do |redis|
remove_job_keys(redis, jid)
get_job_count(redis)
end
end
def clean_up
completed_jids = Gitlab::SidekiqStatus.completed_jids(running_jids)
return unless completed_jids.any?
_removed, @count = with_redis_pipeline do |redis|
remove_job_keys(redis, completed_jids)
get_job_count(redis)
end
end
def count
@count ||= with_redis { |redis| get_job_count(redis) }
end
def running_jids
with_redis do |redis|
redis.smembers(counter_key)
end
end
private
attr_reader :namespace
def counter_key
"worker:#{namespace.to_s.underscore}:running"
end
def get_job_count(redis)
redis.scard(counter_key)
end
def register_job_keys(redis, keys)
redis.sadd(counter_key, keys)
end
def remove_job_keys(redis, keys)
redis.srem(counter_key, keys)
end
def with_redis(&block)
Gitlab::Redis::Queues.with(&block) # rubocop: disable CodeReuse/ActiveRecord
end
def with_redis_pipeline(&block)
with_redis do |redis|
redis.pipelined(&block)
end
end
end
end
# frozen_string_literal: true
# Usage:
#
# Worker that performs the tasks:
#
# class DummyWorker
# include ApplicationWorker
# include LimitedCapacity::Worker
#
# # For each job that raises any error, a worker instance will be disabled
# # until the next schedule-run.
# # If you wish to get around this, exceptions must by handled by the implementer.
# #
# def perform_work(*args)
# end
#
# def remaining_work_count(*args)
# 5
# end
#
# def max_running_jobs
# 25
# end
# end
#
# Cron worker to fill the pool of regular workers:
#
# class ScheduleDummyCronWorker
# include ApplicationWorker
# include CronjobQueue
#
# def perform(*args)
# DummyWorker.perform_with_capacity(*args)
# end
# end
#
module LimitedCapacity
module Worker
extend ActiveSupport::Concern
include Gitlab::Utils::StrongMemoize
included do
# Disable Sidekiq retries, log the error, and send the job to the dead queue.
# This is done to have only one source that produces jobs and because the slot
# would be occupied by a job that will be performed in the distant future.
# We let the cron worker enqueue new jobs, this could be seen as our retry and
# back off mechanism because the job might fail again if executed immediately.
sidekiq_options retry: 0
deduplicate :none
end
class_methods do
def perform_with_capacity(*args)
worker = self.new
worker.remove_failed_jobs
worker.report_prometheus_metrics(*args)
required_jobs_count = worker.required_jobs_count(*args)
arguments = Array.new(required_jobs_count) { args }
self.bulk_perform_async(arguments) # rubocop:disable Scalability/BulkPerformWithContext
end
end
def perform(*args)
return unless has_capacity?
job_tracker.register(jid)
perform_work(*args)
rescue => exception
raise
ensure
job_tracker.remove(jid)
report_prometheus_metrics
re_enqueue(*args) unless exception
end
def perform_work(*args)
raise NotImplementedError
end
def remaining_work_count(*args)
raise NotImplementedError
end
def max_running_jobs
raise NotImplementedError
end
def has_capacity?
remaining_capacity > 0
end
def remaining_capacity
[
max_running_jobs - running_jobs_count - self.class.queue_size,
0
].max
end
def has_work?(*args)
remaining_work_count(*args) > 0
end
def remove_failed_jobs
job_tracker.clean_up
end
def report_prometheus_metrics(*args)
running_jobs_gauge.set(prometheus_labels, running_jobs_count)
remaining_work_gauge.set(prometheus_labels, remaining_work_count(*args))
max_running_jobs_gauge.set(prometheus_labels, max_running_jobs)
end
def required_jobs_count(*args)
[
remaining_work_count(*args),
remaining_capacity
].min
end
private
def running_jobs_count
job_tracker.count
end
def job_tracker
strong_memoize(:job_tracker) do
JobTracker.new(self.class.name)
end
end
def re_enqueue(*args)
return unless has_capacity?
return unless has_work?(*args)
self.class.perform_async(*args)
end
def running_jobs_gauge
strong_memoize(:running_jobs_gauge) do
Gitlab::Metrics.gauge(:limited_capacity_worker_running_jobs, 'Number of running jobs')
end
end
def max_running_jobs_gauge
strong_memoize(:max_running_jobs_gauge) do
Gitlab::Metrics.gauge(:limited_capacity_worker_max_running_jobs, 'Maximum number of running jobs')
end
end
def remaining_work_gauge
strong_memoize(:remaining_work_gauge) do
Gitlab::Metrics.gauge(:limited_capacity_worker_remaining_work_count, 'Number of jobs waiting to be enqueued')
end
end
def prometheus_labels
{ worker: self.class.name }
end
end
end
...@@ -204,6 +204,9 @@ configuration option in `gitlab.yml`. These metrics are served from the ...@@ -204,6 +204,9 @@ configuration option in `gitlab.yml`. These metrics are served from the
| `geo_snippet_repositories_synced` | Gauge | 13.4 | Number of syncable snippets synced on secondary | `url` | | `geo_snippet_repositories_synced` | Gauge | 13.4 | Number of syncable snippets synced on secondary | `url` |
| `geo_snippet_repositories_failed` | Gauge | 13.4 | Number of syncable snippets failed on secondary | `url` | | `geo_snippet_repositories_failed` | Gauge | 13.4 | Number of syncable snippets failed on secondary | `url` |
| `geo_snippet_repositories_registry` | Gauge | 13.4 | Number of syncable snippets in the registry | `url` | | `geo_snippet_repositories_registry` | Gauge | 13.4 | Number of syncable snippets in the registry | `url` |
| `limited_capacity_worker_running_jobs` | Gauge | 13.5 | Number of running jobs | `worker` |
| `limited_capacity_worker_max_running_jobs` | Gauge | 13.5 | Maximum number of running jobs | `worker` |
| `limited_capacity_worker_remaining_work_count` | Gauge | 13.5 | Number of jobs waiting to be enqueued | `worker` |
## Database load balancing metrics **(PREMIUM ONLY)** ## Database load balancing metrics **(PREMIUM ONLY)**
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe LimitedCapacity::JobTracker, :clean_gitlab_redis_queues do
let(:job_tracker) do
described_class.new('namespace')
end
describe '#register' do
it 'adds jid to the set' do
job_tracker.register('a-job-id')
expect(job_tracker.running_jids).to contain_exactly('a-job-id')
end
it 'updates the counter' do
expect { job_tracker.register('a-job-id') }
.to change { job_tracker.count }
.from(0)
.to(1)
end
it 'does it in only one Redis call' do
expect(job_tracker).to receive(:with_redis).once.and_call_original
job_tracker.register('a-job-id')
end
end
describe '#remove' do
before do
job_tracker.register(%w[a-job-id other-job-id])
end
it 'removes jid from the set' do
job_tracker.remove('other-job-id')
expect(job_tracker.running_jids).to contain_exactly('a-job-id')
end
it 'updates the counter' do
expect { job_tracker.remove('other-job-id') }
.to change { job_tracker.count }
.from(2)
.to(1)
end
it 'does it in only one Redis call' do
expect(job_tracker).to receive(:with_redis).once.and_call_original
job_tracker.remove('other-job-id')
end
end
describe '#clean_up' do
before do
job_tracker.register('a-job-id')
end
context 'with running jobs' do
before do
expect(Gitlab::SidekiqStatus).to receive(:completed_jids)
.with(%w[a-job-id])
.and_return([])
end
it 'does not remove the jid from the set' do
expect { job_tracker.clean_up }
.not_to change { job_tracker.running_jids.include?('a-job-id') }
end
it 'does only one Redis call to get the job ids' do
expect(job_tracker).to receive(:with_redis).once.and_call_original
job_tracker.clean_up
end
end
context 'with completed jobs' do
it 'removes the jid from the set' do
expect { job_tracker.clean_up }
.to change { job_tracker.running_jids.include?('a-job-id') }
end
it 'updates the counter' do
expect { job_tracker.clean_up }
.to change { job_tracker.count }
.from(1)
.to(0)
end
it 'gets the job ids, removes them, and updates the counter with only two Redis calls' do
expect(job_tracker).to receive(:with_redis).twice.and_call_original
job_tracker.clean_up
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe LimitedCapacity::Worker, :clean_gitlab_redis_queues, :aggregate_failures do
let(:worker_class) do
Class.new do
def self.name
'DummyWorker'
end
include ApplicationWorker
include LimitedCapacity::Worker
end
end
let(:worker) { worker_class.new }
let(:job_tracker) do
LimitedCapacity::JobTracker.new(worker_class.name)
end
before do
worker.jid = 'my-jid'
allow(worker).to receive(:job_tracker).and_return(job_tracker)
end
describe 'required methods' do
it { expect { worker.perform_work }.to raise_error(NotImplementedError) }
it { expect { worker.remaining_work_count }.to raise_error(NotImplementedError) }
it { expect { worker.max_running_jobs }.to raise_error(NotImplementedError) }
end
describe 'Sidekiq options' do
it 'does not retry failed jobs' do
expect(worker_class.sidekiq_options['retry']).to eq(0)
end
it 'does not deduplicate jobs' do
expect(worker_class.get_deduplicate_strategy).to eq(:none)
end
end
describe '.perform_with_capacity' do
subject(:perform_with_capacity) { worker_class.perform_with_capacity(:arg) }
before do
expect_next_instance_of(worker_class) do |instance|
expect(instance).to receive(:remove_failed_jobs)
expect(instance).to receive(:report_prometheus_metrics)
allow(instance).to receive(:remaining_work_count).and_return(remaining_work_count)
allow(instance).to receive(:remaining_capacity).and_return(remaining_capacity)
end
end
context 'when capacity is larger than work' do
let(:remaining_work_count) { 2 }
let(:remaining_capacity) { 3 }
it 'enqueues jobs for remaining work' do
expect(worker_class)
.to receive(:bulk_perform_async)
.with([[:arg], [:arg]])
perform_with_capacity
end
end
context 'when capacity is lower than work' do
let(:remaining_work_count) { 5 }
let(:remaining_capacity) { 3 }
it 'enqueues jobs for remaining work' do
expect(worker_class)
.to receive(:bulk_perform_async)
.with([[:arg], [:arg], [:arg]])
perform_with_capacity
end
end
end
describe '#perform' do
subject(:perform) { worker.perform(:arg) }
context 'with capacity' do
before do
allow(worker).to receive(:max_running_jobs).and_return(10)
allow(worker).to receive(:running_jobs_count).and_return(0)
allow(worker).to receive(:remaining_work_count).and_return(0)
end
it 'calls perform_work' do
expect(worker).to receive(:perform_work).with(:arg)
perform
end
it 're-enqueues itself' do
allow(worker).to receive(:perform_work)
expect(worker).to receive(:re_enqueue).with(:arg)
perform
end
it 'registers itself in the running set' do
allow(worker).to receive(:perform_work)
expect(job_tracker).to receive(:register).with('my-jid')
perform
end
it 'removes itself from the running set' do
expect(job_tracker).to receive(:remove).with('my-jid')
allow(worker).to receive(:perform_work)
perform
end
it 'reports prometheus metrics' do
allow(worker).to receive(:perform_work)
expect(worker).to receive(:report_prometheus_metrics)
perform
end
end
context 'with capacity and without work' do
before do
allow(worker).to receive(:max_running_jobs).and_return(10)
allow(worker).to receive(:running_jobs_count).and_return(0)
allow(worker).to receive(:remaining_work_count).and_return(0)
allow(worker).to receive(:perform_work)
end
it 'does not re-enqueue itself' do
expect(worker_class).not_to receive(:perform_async)
perform
end
end
context 'without capacity' do
before do
allow(worker).to receive(:max_running_jobs).and_return(10)
allow(worker).to receive(:running_jobs_count).and_return(15)
allow(worker).to receive(:remaining_work_count).and_return(10)
end
it 'does not call perform_work' do
expect(worker).not_to receive(:perform_work)
perform
end
it 'does not re-enqueue itself' do
expect(worker_class).not_to receive(:perform_async)
perform
end
it 'does not register in the running set' do
expect(job_tracker).not_to receive(:register)
perform
end
it 'removes itself from the running set' do
expect(job_tracker).to receive(:remove).with('my-jid')
perform
end
it 'reports prometheus metrics' do
expect(worker).to receive(:report_prometheus_metrics)
perform
end
end
context 'when perform_work fails' do
it 'does not re-enqueue itself' do
expect(worker).not_to receive(:re_enqueue)
expect { perform }.to raise_error(NotImplementedError)
end
it 'removes itself from the running set' do
expect(job_tracker).to receive(:remove)
expect { perform }.to raise_error(NotImplementedError)
end
it 'reports prometheus metrics' do
expect(worker).to receive(:report_prometheus_metrics)
expect { perform }.to raise_error(NotImplementedError)
end
end
end
describe '#remaining_capacity' do
subject(:remaining_capacity) { worker.remaining_capacity }
before do
expect(worker).to receive(:max_running_jobs).and_return(max_capacity)
end
context 'when changing the capacity to a lower value' do
let(:max_capacity) { -1 }
it { expect(remaining_capacity).to eq(0) }
end
context 'when registering new jobs' do
let(:max_capacity) { 2 }
before do
job_tracker.register('a-job-id')
end
it { expect(remaining_capacity).to eq(1) }
end
context 'with jobs in the queue' do
let(:max_capacity) { 2 }
before do
expect(worker_class).to receive(:queue_size).and_return(1)
end
it { expect(remaining_capacity).to eq(1) }
end
context 'with both running jobs and queued jobs' do
let(:max_capacity) { 10 }
before do
expect(worker_class).to receive(:queue_size).and_return(5)
expect(worker).to receive(:running_jobs_count).and_return(3)
end
it { expect(remaining_capacity).to eq(2) }
end
end
describe '#remove_failed_jobs' do
subject(:remove_failed_jobs) { worker.remove_failed_jobs }
before do
job_tracker.register('a-job-id')
allow(worker).to receive(:max_running_jobs).and_return(2)
expect(job_tracker).to receive(:clean_up).and_call_original
end
context 'with failed jobs' do
it 'update the available capacity' do
expect { remove_failed_jobs }.to change { worker.remaining_capacity }.by(1)
end
end
end
describe '#report_prometheus_metrics' do
subject(:report_prometheus_metrics) { worker.report_prometheus_metrics }
before do
allow(worker).to receive(:running_jobs_count).and_return(5)
allow(worker).to receive(:max_running_jobs).and_return(7)
allow(worker).to receive(:remaining_work_count).and_return(9)
end
it 'reports number of running jobs' do
labels = { worker: 'DummyWorker' }
report_prometheus_metrics
expect(Gitlab::Metrics.registry.get(:limited_capacity_worker_running_jobs).get(labels)).to eq(5)
expect(Gitlab::Metrics.registry.get(:limited_capacity_worker_max_running_jobs).get(labels)).to eq(7)
expect(Gitlab::Metrics.registry.get(:limited_capacity_worker_remaining_work_count).get(labels)).to eq(9)
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment