Commit f31de303 authored by Marius Bobin's avatar Marius Bobin Committed by Bob Van Landuyt

Add limited capacity worker concern

Add limited capacity worker concern to help implement workers that pull
their own work from the database
parent c9e9bb32
# frozen_string_literal: true
module LimitedCapacity
class JobTracker # rubocop:disable Scalability/IdempotentWorker
include Gitlab::Utils::StrongMemoize
def initialize(namespace)
@namespace = namespace
end
def register(jid)
_added, @count = with_redis_pipeline do |redis|
register_job_keys(redis, jid)
get_job_count(redis)
end
end
def remove(jid)
_removed, @count = with_redis_pipeline do |redis|
remove_job_keys(redis, jid)
get_job_count(redis)
end
end
def clean_up
completed_jids = Gitlab::SidekiqStatus.completed_jids(running_jids)
return unless completed_jids.any?
_removed, @count = with_redis_pipeline do |redis|
remove_job_keys(redis, completed_jids)
get_job_count(redis)
end
end
def count
@count ||= with_redis { |redis| get_job_count(redis) }
end
def running_jids
with_redis do |redis|
redis.smembers(counter_key)
end
end
private
attr_reader :namespace
def counter_key
"worker:#{namespace.to_s.underscore}:running"
end
def get_job_count(redis)
redis.scard(counter_key)
end
def register_job_keys(redis, keys)
redis.sadd(counter_key, keys)
end
def remove_job_keys(redis, keys)
redis.srem(counter_key, keys)
end
def with_redis(&block)
Gitlab::Redis::Queues.with(&block) # rubocop: disable CodeReuse/ActiveRecord
end
def with_redis_pipeline(&block)
with_redis do |redis|
redis.pipelined(&block)
end
end
end
end
# frozen_string_literal: true
# Usage:
#
# Worker that performs the tasks:
#
# class DummyWorker
# include ApplicationWorker
# include LimitedCapacity::Worker
#
# # For each job that raises any error, a worker instance will be disabled
# # until the next schedule-run.
# # If you wish to get around this, exceptions must by handled by the implementer.
# #
# def perform_work(*args)
# end
#
# def remaining_work_count(*args)
# 5
# end
#
# def max_running_jobs
# 25
# end
# end
#
# Cron worker to fill the pool of regular workers:
#
# class ScheduleDummyCronWorker
# include ApplicationWorker
# include CronjobQueue
#
# def perform(*args)
# DummyWorker.perform_with_capacity(*args)
# end
# end
#
module LimitedCapacity
module Worker
extend ActiveSupport::Concern
include Gitlab::Utils::StrongMemoize
included do
# Disable Sidekiq retries, log the error, and send the job to the dead queue.
# This is done to have only one source that produces jobs and because the slot
# would be occupied by a job that will be performed in the distant future.
# We let the cron worker enqueue new jobs, this could be seen as our retry and
# back off mechanism because the job might fail again if executed immediately.
sidekiq_options retry: 0
deduplicate :none
end
class_methods do
def perform_with_capacity(*args)
worker = self.new
worker.remove_failed_jobs
worker.report_prometheus_metrics(*args)
required_jobs_count = worker.required_jobs_count(*args)
arguments = Array.new(required_jobs_count) { args }
self.bulk_perform_async(arguments) # rubocop:disable Scalability/BulkPerformWithContext
end
end
def perform(*args)
return unless has_capacity?
job_tracker.register(jid)
perform_work(*args)
rescue => exception
raise
ensure
job_tracker.remove(jid)
report_prometheus_metrics
re_enqueue(*args) unless exception
end
def perform_work(*args)
raise NotImplementedError
end
def remaining_work_count(*args)
raise NotImplementedError
end
def max_running_jobs
raise NotImplementedError
end
def has_capacity?
remaining_capacity > 0
end
def remaining_capacity
[
max_running_jobs - running_jobs_count - self.class.queue_size,
0
].max
end
def has_work?(*args)
remaining_work_count(*args) > 0
end
def remove_failed_jobs
job_tracker.clean_up
end
def report_prometheus_metrics(*args)
running_jobs_gauge.set(prometheus_labels, running_jobs_count)
remaining_work_gauge.set(prometheus_labels, remaining_work_count(*args))
max_running_jobs_gauge.set(prometheus_labels, max_running_jobs)
end
def required_jobs_count(*args)
[
remaining_work_count(*args),
remaining_capacity
].min
end
private
def running_jobs_count
job_tracker.count
end
def job_tracker
strong_memoize(:job_tracker) do
JobTracker.new(self.class.name)
end
end
def re_enqueue(*args)
return unless has_capacity?
return unless has_work?(*args)
self.class.perform_async(*args)
end
def running_jobs_gauge
strong_memoize(:running_jobs_gauge) do
Gitlab::Metrics.gauge(:limited_capacity_worker_running_jobs, 'Number of running jobs')
end
end
def max_running_jobs_gauge
strong_memoize(:max_running_jobs_gauge) do
Gitlab::Metrics.gauge(:limited_capacity_worker_max_running_jobs, 'Maximum number of running jobs')
end
end
def remaining_work_gauge
strong_memoize(:remaining_work_gauge) do
Gitlab::Metrics.gauge(:limited_capacity_worker_remaining_work_count, 'Number of jobs waiting to be enqueued')
end
end
def prometheus_labels
{ worker: self.class.name }
end
end
end
......@@ -204,6 +204,9 @@ configuration option in `gitlab.yml`. These metrics are served from the
| `geo_snippet_repositories_synced` | Gauge | 13.4 | Number of syncable snippets synced on secondary | `url` |
| `geo_snippet_repositories_failed` | Gauge | 13.4 | Number of syncable snippets failed on secondary | `url` |
| `geo_snippet_repositories_registry` | Gauge | 13.4 | Number of syncable snippets in the registry | `url` |
| `limited_capacity_worker_running_jobs` | Gauge | 13.5 | Number of running jobs | `worker` |
| `limited_capacity_worker_max_running_jobs` | Gauge | 13.5 | Maximum number of running jobs | `worker` |
| `limited_capacity_worker_remaining_work_count` | Gauge | 13.5 | Number of jobs waiting to be enqueued | `worker` |
## Database load balancing metrics **(PREMIUM ONLY)**
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe LimitedCapacity::JobTracker, :clean_gitlab_redis_queues do
let(:job_tracker) do
described_class.new('namespace')
end
describe '#register' do
it 'adds jid to the set' do
job_tracker.register('a-job-id')
expect(job_tracker.running_jids).to contain_exactly('a-job-id')
end
it 'updates the counter' do
expect { job_tracker.register('a-job-id') }
.to change { job_tracker.count }
.from(0)
.to(1)
end
it 'does it in only one Redis call' do
expect(job_tracker).to receive(:with_redis).once.and_call_original
job_tracker.register('a-job-id')
end
end
describe '#remove' do
before do
job_tracker.register(%w[a-job-id other-job-id])
end
it 'removes jid from the set' do
job_tracker.remove('other-job-id')
expect(job_tracker.running_jids).to contain_exactly('a-job-id')
end
it 'updates the counter' do
expect { job_tracker.remove('other-job-id') }
.to change { job_tracker.count }
.from(2)
.to(1)
end
it 'does it in only one Redis call' do
expect(job_tracker).to receive(:with_redis).once.and_call_original
job_tracker.remove('other-job-id')
end
end
describe '#clean_up' do
before do
job_tracker.register('a-job-id')
end
context 'with running jobs' do
before do
expect(Gitlab::SidekiqStatus).to receive(:completed_jids)
.with(%w[a-job-id])
.and_return([])
end
it 'does not remove the jid from the set' do
expect { job_tracker.clean_up }
.not_to change { job_tracker.running_jids.include?('a-job-id') }
end
it 'does only one Redis call to get the job ids' do
expect(job_tracker).to receive(:with_redis).once.and_call_original
job_tracker.clean_up
end
end
context 'with completed jobs' do
it 'removes the jid from the set' do
expect { job_tracker.clean_up }
.to change { job_tracker.running_jids.include?('a-job-id') }
end
it 'updates the counter' do
expect { job_tracker.clean_up }
.to change { job_tracker.count }
.from(1)
.to(0)
end
it 'gets the job ids, removes them, and updates the counter with only two Redis calls' do
expect(job_tracker).to receive(:with_redis).twice.and_call_original
job_tracker.clean_up
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe LimitedCapacity::Worker, :clean_gitlab_redis_queues, :aggregate_failures do
let(:worker_class) do
Class.new do
def self.name
'DummyWorker'
end
include ApplicationWorker
include LimitedCapacity::Worker
end
end
let(:worker) { worker_class.new }
let(:job_tracker) do
LimitedCapacity::JobTracker.new(worker_class.name)
end
before do
worker.jid = 'my-jid'
allow(worker).to receive(:job_tracker).and_return(job_tracker)
end
describe 'required methods' do
it { expect { worker.perform_work }.to raise_error(NotImplementedError) }
it { expect { worker.remaining_work_count }.to raise_error(NotImplementedError) }
it { expect { worker.max_running_jobs }.to raise_error(NotImplementedError) }
end
describe 'Sidekiq options' do
it 'does not retry failed jobs' do
expect(worker_class.sidekiq_options['retry']).to eq(0)
end
it 'does not deduplicate jobs' do
expect(worker_class.get_deduplicate_strategy).to eq(:none)
end
end
describe '.perform_with_capacity' do
subject(:perform_with_capacity) { worker_class.perform_with_capacity(:arg) }
before do
expect_next_instance_of(worker_class) do |instance|
expect(instance).to receive(:remove_failed_jobs)
expect(instance).to receive(:report_prometheus_metrics)
allow(instance).to receive(:remaining_work_count).and_return(remaining_work_count)
allow(instance).to receive(:remaining_capacity).and_return(remaining_capacity)
end
end
context 'when capacity is larger than work' do
let(:remaining_work_count) { 2 }
let(:remaining_capacity) { 3 }
it 'enqueues jobs for remaining work' do
expect(worker_class)
.to receive(:bulk_perform_async)
.with([[:arg], [:arg]])
perform_with_capacity
end
end
context 'when capacity is lower than work' do
let(:remaining_work_count) { 5 }
let(:remaining_capacity) { 3 }
it 'enqueues jobs for remaining work' do
expect(worker_class)
.to receive(:bulk_perform_async)
.with([[:arg], [:arg], [:arg]])
perform_with_capacity
end
end
end
describe '#perform' do
subject(:perform) { worker.perform(:arg) }
context 'with capacity' do
before do
allow(worker).to receive(:max_running_jobs).and_return(10)
allow(worker).to receive(:running_jobs_count).and_return(0)
allow(worker).to receive(:remaining_work_count).and_return(0)
end
it 'calls perform_work' do
expect(worker).to receive(:perform_work).with(:arg)
perform
end
it 're-enqueues itself' do
allow(worker).to receive(:perform_work)
expect(worker).to receive(:re_enqueue).with(:arg)
perform
end
it 'registers itself in the running set' do
allow(worker).to receive(:perform_work)
expect(job_tracker).to receive(:register).with('my-jid')
perform
end
it 'removes itself from the running set' do
expect(job_tracker).to receive(:remove).with('my-jid')
allow(worker).to receive(:perform_work)
perform
end
it 'reports prometheus metrics' do
allow(worker).to receive(:perform_work)
expect(worker).to receive(:report_prometheus_metrics)
perform
end
end
context 'with capacity and without work' do
before do
allow(worker).to receive(:max_running_jobs).and_return(10)
allow(worker).to receive(:running_jobs_count).and_return(0)
allow(worker).to receive(:remaining_work_count).and_return(0)
allow(worker).to receive(:perform_work)
end
it 'does not re-enqueue itself' do
expect(worker_class).not_to receive(:perform_async)
perform
end
end
context 'without capacity' do
before do
allow(worker).to receive(:max_running_jobs).and_return(10)
allow(worker).to receive(:running_jobs_count).and_return(15)
allow(worker).to receive(:remaining_work_count).and_return(10)
end
it 'does not call perform_work' do
expect(worker).not_to receive(:perform_work)
perform
end
it 'does not re-enqueue itself' do
expect(worker_class).not_to receive(:perform_async)
perform
end
it 'does not register in the running set' do
expect(job_tracker).not_to receive(:register)
perform
end
it 'removes itself from the running set' do
expect(job_tracker).to receive(:remove).with('my-jid')
perform
end
it 'reports prometheus metrics' do
expect(worker).to receive(:report_prometheus_metrics)
perform
end
end
context 'when perform_work fails' do
it 'does not re-enqueue itself' do
expect(worker).not_to receive(:re_enqueue)
expect { perform }.to raise_error(NotImplementedError)
end
it 'removes itself from the running set' do
expect(job_tracker).to receive(:remove)
expect { perform }.to raise_error(NotImplementedError)
end
it 'reports prometheus metrics' do
expect(worker).to receive(:report_prometheus_metrics)
expect { perform }.to raise_error(NotImplementedError)
end
end
end
describe '#remaining_capacity' do
subject(:remaining_capacity) { worker.remaining_capacity }
before do
expect(worker).to receive(:max_running_jobs).and_return(max_capacity)
end
context 'when changing the capacity to a lower value' do
let(:max_capacity) { -1 }
it { expect(remaining_capacity).to eq(0) }
end
context 'when registering new jobs' do
let(:max_capacity) { 2 }
before do
job_tracker.register('a-job-id')
end
it { expect(remaining_capacity).to eq(1) }
end
context 'with jobs in the queue' do
let(:max_capacity) { 2 }
before do
expect(worker_class).to receive(:queue_size).and_return(1)
end
it { expect(remaining_capacity).to eq(1) }
end
context 'with both running jobs and queued jobs' do
let(:max_capacity) { 10 }
before do
expect(worker_class).to receive(:queue_size).and_return(5)
expect(worker).to receive(:running_jobs_count).and_return(3)
end
it { expect(remaining_capacity).to eq(2) }
end
end
describe '#remove_failed_jobs' do
subject(:remove_failed_jobs) { worker.remove_failed_jobs }
before do
job_tracker.register('a-job-id')
allow(worker).to receive(:max_running_jobs).and_return(2)
expect(job_tracker).to receive(:clean_up).and_call_original
end
context 'with failed jobs' do
it 'update the available capacity' do
expect { remove_failed_jobs }.to change { worker.remaining_capacity }.by(1)
end
end
end
describe '#report_prometheus_metrics' do
subject(:report_prometheus_metrics) { worker.report_prometheus_metrics }
before do
allow(worker).to receive(:running_jobs_count).and_return(5)
allow(worker).to receive(:max_running_jobs).and_return(7)
allow(worker).to receive(:remaining_work_count).and_return(9)
end
it 'reports number of running jobs' do
labels = { worker: 'DummyWorker' }
report_prometheus_metrics
expect(Gitlab::Metrics.registry.get(:limited_capacity_worker_running_jobs).get(labels)).to eq(5)
expect(Gitlab::Metrics.registry.get(:limited_capacity_worker_max_running_jobs).get(labels)).to eq(7)
expect(Gitlab::Metrics.registry.get(:limited_capacity_worker_remaining_work_count).get(labels)).to eq(9)
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment