Commit b1849ee2 authored by Douwe Maan's avatar Douwe Maan

Use a dedicated queue for each worker

parent d673628d
---
- cronjob:admin_email
- cronjob:expire_build_artifacts
- cronjob:gitlab_usage_ping
- cronjob:import_export_project_cleanup
- cronjob:pipeline_schedule
- cronjob:prune_old_events
- cronjob:remove_expired_group_links
- cronjob:remove_expired_members
- cronjob:remove_old_web_hook_logs
- cronjob:remove_unreferenced_lfs_objects
- cronjob:repository_archive_cache
- cronjob:repository_check_batch
- cronjob:requests_profiles
- cronjob:schedule_update_user_activity
- cronjob:stuck_ci_jobs
- cronjob:stuck_import_jobs
- cronjob:stuck_merge_jobs
- cronjob:trending_projects
- gcp_cluster:cluster_install_app
- gcp_cluster:cluster_provision
- gcp_cluster:cluster_wait_for_app_installation
- gcp_cluster:wait_for_cluster_creation
- github_import_advance_stage
- github_importer:github_import_import_diff_note
- github_importer:github_import_import_issue
- github_importer:github_import_import_note
- github_importer:github_import_import_pull_request
- github_importer:github_import_refresh_import_jid
- github_importer:github_import_stage_finish_import
- github_importer:github_import_stage_import_base_data
- github_importer:github_import_stage_import_issues_and_diff_notes
- github_importer:github_import_stage_import_notes
- github_importer:github_import_stage_import_pull_requests
- github_importer:github_import_stage_import_repository
- pipeline_cache:expire_job_cache
- pipeline_cache:expire_pipeline_cache
- pipeline_creation:create_pipeline
- pipeline_default:build_coverage
- pipeline_default:build_trace_sections
- pipeline_default:pipeline_metrics
- pipeline_default:pipeline_notification
- pipeline_default:update_head_pipeline_for_merge_request
- pipeline_hooks:build_hooks
- pipeline_hooks:pipeline_hooks
- pipeline_processing:build_finished
- pipeline_processing:build_queue
- pipeline_processing:build_success
- pipeline_processing:pipeline_process
- pipeline_processing:pipeline_success
- pipeline_processing:pipeline_update
- pipeline_processing:stage_update
- repository_check:repository_check_clear
- repository_check:repository_check_single_repository
- default
- mailers # ActionMailer::DeliveryJob.queue_name
- authorized_projects
- background_migration
- create_gpg_signature
- delete_merged_branches
- delete_user
- email_receiver
- emails_on_push
- expire_build_instance_artifacts
- git_garbage_collect
- gitlab_shell
- group_destroy
- invalid_gpg_signature_update
- irker
- merge
- namespaceless_project_destroy
- new_issue
- new_merge_request
- new_note
- pages
- post_receive
- process_commit
- project_cache
- project_destroy
- project_export
- project_migrate_hashed_storage
- project_service
- propagate_service_template
- reactive_caching
- repository_fork
- repository_import
- storage_migrator
- system_hook_push
- update_merge_requests
- update_user_activity
- upload_checksum
- web_hook
......@@ -2,7 +2,7 @@ class BuildFinishedWorker
include ApplicationWorker
include PipelineQueue
enqueue_in group: :processing
queue_namespace :pipeline_processing
def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build|
......
......@@ -2,7 +2,7 @@ class BuildHooksWorker
include ApplicationWorker
include PipelineQueue
enqueue_in group: :hooks
queue_namespace :pipeline_hooks
def perform(build_id)
Ci::Build.find_by(id: build_id)
......
......@@ -2,7 +2,7 @@ class BuildQueueWorker
include ApplicationWorker
include PipelineQueue
enqueue_in group: :processing
queue_namespace :pipeline_processing
def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build|
......
......@@ -2,7 +2,7 @@ class BuildSuccessWorker
include ApplicationWorker
include PipelineQueue
enqueue_in group: :processing
queue_namespace :pipeline_processing
def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build|
......
......@@ -6,10 +6,20 @@ module ApplicationWorker
include Sidekiq::Worker
included do
sidekiq_options queue: base_queue_name
set_queue
end
module ClassMethods
def inherited(subclass)
subclass.set_queue
end
def set_queue
queue_name = [queue_namespace, base_queue_name].compact.join(':')
sidekiq_options queue: queue_name
end
def base_queue_name
name
.sub(/\AGitlab::/, '')
......@@ -18,6 +28,16 @@ module ApplicationWorker
.tr('/', '_')
end
def queue_namespace(new_namespace = nil)
if new_namespace
sidekiq_options queue_namespace: new_namespace
set_queue
else
get_sidekiq_options['queue_namespace']&.to_s
end
end
def queue
get_sidekiq_options['queue'].to_s
end
......
......@@ -5,6 +5,6 @@ module ClusterQueue
extend ActiveSupport::Concern
included do
sidekiq_options queue: :gcp_cluster
queue_namespace :gcp_cluster
end
end
......@@ -4,6 +4,7 @@ module CronjobQueue
extend ActiveSupport::Concern
included do
sidekiq_options queue: :cronjob, retry: false
queue_namespace :cronjob
sidekiq_options retry: false
end
end
......@@ -4,12 +4,14 @@ module Gitlab
extend ActiveSupport::Concern
included do
queue_namespace :github_importer
# If a job produces an error it may block a stage from advancing
# forever. To prevent this from happening we prevent jobs from going to
# the dead queue. This does mean some resources may not be imported, but
# this is better than a project being stuck in the "import" state
# forever.
sidekiq_options queue: 'github_importer', dead: false, retry: 5
sidekiq_options dead: false, retry: 5
end
end
end
......
......@@ -5,14 +5,6 @@ module PipelineQueue
extend ActiveSupport::Concern
included do
sidekiq_options queue: 'pipeline_default'
end
class_methods do
def enqueue_in(group:)
raise ArgumentError, 'Unspecified queue group!' if group.empty?
sidekiq_options queue: "pipeline_#{group}"
end
queue_namespace :pipeline_default
end
end
......@@ -3,6 +3,8 @@ module RepositoryCheckQueue
extend ActiveSupport::Concern
included do
sidekiq_options queue: :repository_check, retry: false
queue_namespace :repository_check
sidekiq_options retry: false
end
end
......@@ -2,7 +2,7 @@ class CreatePipelineWorker
include ApplicationWorker
include PipelineQueue
enqueue_in group: :creation
queue_namespace :pipeline_creation
def perform(project_id, user_id, ref, source, params = {})
project = Project.find(project_id)
......
......@@ -2,7 +2,7 @@ class ExpireJobCacheWorker
include ApplicationWorker
include PipelineQueue
enqueue_in group: :cache
queue_namespace :pipeline_cache
def perform(job_id)
job = CommitStatus.joins(:pipeline, :project).find_by(id: job_id)
......
......@@ -2,7 +2,7 @@ class ExpirePipelineCacheWorker
include ApplicationWorker
include PipelineQueue
enqueue_in group: :cache
queue_namespace :pipeline_cache
def perform(pipeline_id)
pipeline = Ci::Pipeline.find_by(id: pipeline_id)
......
......@@ -9,7 +9,7 @@ module Gitlab
class AdvanceStageWorker
include ApplicationWorker
sidekiq_options queue: 'github_importer_advance_stage', dead: false
sidekiq_options dead: false
INTERVAL = 30.seconds.to_i
......
class PagesWorker
include ApplicationWorker
sidekiq_options queue: :pages, retry: false
sidekiq_options retry: false
def perform(action, *arg)
send(action, *arg) # rubocop:disable GitlabSecurity/PublicSend
......
......@@ -2,7 +2,7 @@ class PipelineHooksWorker
include ApplicationWorker
include PipelineQueue
enqueue_in group: :hooks
queue_namespace :pipeline_hooks
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id)
......
......@@ -2,7 +2,7 @@ class PipelineProcessWorker
include ApplicationWorker
include PipelineQueue
enqueue_in group: :processing
queue_namespace :pipeline_processing
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id)
......
......@@ -2,7 +2,7 @@ class PipelineSuccessWorker
include ApplicationWorker
include PipelineQueue
enqueue_in group: :processing
queue_namespace :pipeline_processing
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id).try do |pipeline|
......
......@@ -2,7 +2,7 @@ class PipelineUpdateWorker
include ApplicationWorker
include PipelineQueue
enqueue_in group: :processing
queue_namespace :pipeline_processing
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id)
......
......@@ -2,7 +2,7 @@ class StageUpdateWorker
include ApplicationWorker
include PipelineQueue
enqueue_in group: :processing
queue_namespace :pipeline_processing
def perform(stage_id)
Ci::Stage.find_by(id: stage_id).try do |stage|
......
class UpdateHeadPipelineForMergeRequestWorker
include ApplicationWorker
sidekiq_options queue: 'pipeline_default'
include PipelineQueue
def perform(merge_request_id)
merge_request = MergeRequest.find(merge_request_id)
......
......@@ -42,6 +42,8 @@ Sidekiq.configure_server do |config|
Gitlab::SidekiqThrottler.execute!
Gitlab::SidekiqVersioning.install!
config = Gitlab::Database.config ||
Rails.application.config.database_configuration[Rails.env]
config['pool'] = Sidekiq.options[:concurrency]
......@@ -60,19 +62,3 @@ Sidekiq.configure_client do |config|
chain.add Gitlab::SidekiqStatus::ClientMiddleware
end
end
# The Sidekiq client API always adds the queue to the Sidekiq queue
# list, but mail_room and gitlab-shell do not. This is only necessary
# for monitoring.
begin
queues = Gitlab::SidekiqConfig.worker_queues
Sidekiq.redis do |conn|
conn.pipelined do
queues.each do |queue|
conn.sadd('queues', queue)
end
end
end
rescue Redis::BaseError, SocketError, Errno::ENOENT, Errno::EADDRNOTAVAIL, Errno::EAFNOSUPPORT, Errno::ECONNRESET, Errno::ECONNREFUSED
end
......@@ -25,8 +25,8 @@
- [new_note, 2]
- [new_issue, 2]
- [new_merge_request, 2]
- [build, 2]
- [pipeline, 2]
- [build, 2] # Replaced by pipeline
- [pipeline, 2] # Replaced by pipeline_*
- [pipeline_processing, 5]
- [pipeline_creation, 4]
- [pipeline_default, 3]
......@@ -38,11 +38,13 @@
- [mailers, 2]
- [invalid_gpg_signature_update, 2]
- [create_gpg_signature, 2]
- [rebase, 2]
- [upload_checksum, 1]
- [repository_fork, 1]
- [repository_import, 1]
- [github_importer, 1]
- [github_importer_advance_stage, 1]
- [github_importer_advance_stage, 1] # Replaced by github_import_advance_stage
- [github_import_advance_stage, 1]
- [project_service, 1]
- [delete_user, 1]
- [delete_merged_branches, 1]
......
require 'yaml'
require 'set'
module Gitlab
module SidekiqConfig
def self.redis_queues
@redis_queues ||= Sidekiq::Queue.all.map(&:name)
# This method is called by `bin/sidekiq-cluster` in EE, which runs outside
# of bundler/Rails context, so we cannot use any gem or Rails methods.
def self.worker_queues(rails_path = Rails.root.to_s)
@worker_queues ||= {}
@worker_queues[rails_path] ||= YAML.load_file(File.join(rails_path, 'app/workers/all_queues.yml'))
end
# This method is called by `bin/sidekiq-cluster` in EE, which runs outside
# of bundler/Rails context, so we cannot use any gem or Rails methods.
def self.config_queues(rails_path = Rails.root.to_s)
def self.expand_queues(queues, all_queues = self.worker_queues)
return [] if queues.empty?
queues_set = all_queues.to_set
queues.flat_map do |queue|
[queue, *queues_set.grep(/\A#{queue}:/)]
end
end
def self.redis_queues
# Not memoized, because this can change during the life of the application
Sidekiq::Queue.all.map(&:name)
end
def self.config_queues
@config_queues ||= begin
config = YAML.load_file(File.join(rails_path, 'config', 'sidekiq_queues.yml'))
config = YAML.load_file(Rails.root.join('config/sidekiq_queues.yml'))
config[:queues].map(&:first)
end
end
......@@ -23,14 +42,6 @@ module Gitlab
@workers ||= find_workers(Rails.root.join('app', 'workers'))
end
def self.default_queues
[ActionMailer::DeliveryJob.queue_name, 'default']
end
def self.worker_queues
@worker_queues ||= (workers.map(&:queue) + default_queues).uniq
end
def self.find_workers(root)
concerns = root.join('concerns').to_s
......@@ -43,7 +54,7 @@ module Gitlab
ns.camelize.constantize
end
# Skip concerns
# Skip things that aren't workers
workers.select { |w| w < Sidekiq::Worker }
end
end
......
module Gitlab
module SidekiqVersioning
def self.install!
Sidekiq::Manager.prepend SidekiqVersioning::Manager
# The Sidekiq client API always adds the queue to the Sidekiq queue
# list, but mail_room and gitlab-shell do not. This is only necessary
# for monitoring.
begin
queues = SidekiqConfig.worker_queues
if queues.any?
Sidekiq.redis do |conn|
conn.pipelined do
queues.each do |queue|
conn.sadd('queues', queue)
end
end
end
end
rescue ::Redis::BaseError, SocketError, Errno::ENOENT, Errno::EADDRNOTAVAIL, Errno::EAFNOSUPPORT, Errno::ECONNRESET, Errno::ECONNREFUSED
end
end
end
end
module Gitlab
module SidekiqVersioning
module Manager
def initialize(options = {})
options[:strict] = false
options[:queues] = SidekiqConfig.expand_queues(options[:queues])
Sidekiq.logger.info "Listening on queues #{options[:queues].uniq.sort}"
super
end
end
end
end
......@@ -16,9 +16,30 @@ describe Gitlab::SidekiqConfig do
expect(queues).to include('post_receive')
expect(queues).to include('merge')
expect(queues).to include('cronjob')
expect(queues).to include('cronjob:stuck_import_jobs')
expect(queues).to include('mailers')
expect(queues).to include('default')
end
end
describe '.expand_queues' do
it 'expands queue namespaces to concrete queue names' do
queues = described_class.expand_queues(%w[cronjob])
expect(queues).to include('cronjob:stuck_import_jobs')
expect(queues).to include('cronjob:stuck_merge_jobs')
end
it 'lets concrete queue names pass through' do
queues = described_class.expand_queues(%w[post_receive])
expect(queues).to include('post_receive')
end
it 'lets unknown queues pass through' do
queues = described_class.expand_queues(%w[unknown])
expect(queues).to include('unknown')
end
end
end
require 'spec_helper'
describe Gitlab::SidekiqVersioning::Manager do
before do
Sidekiq::Manager.prepend described_class
end
describe '#initialize' do
it 'listens on all expanded queues' do
manager = Sidekiq::Manager.new(queues: %w[post_receive repository_fork cronjob unknown])
queues = manager.options[:queues]
expect(queues).to include('post_receive')
expect(queues).to include('repository_fork')
expect(queues).to include('cronjob')
expect(queues).to include('cronjob:stuck_import_jobs')
expect(queues).to include('cronjob:stuck_merge_jobs')
expect(queues).to include('unknown')
end
end
end
require 'spec_helper'
describe Gitlab::SidekiqVersioning, :sidekiq, :redis do
let(:foo_worker) do
Class.new do
def self.name
'FooWorker'
end
include ApplicationWorker
end
end
let(:bar_worker) do
Class.new do
def self.name
'BarWorker'
end
include ApplicationWorker
end
end
before do
allow(Gitlab::SidekiqConfig).to receive(:workers).and_return([foo_worker, bar_worker])
allow(Gitlab::SidekiqConfig).to receive(:worker_queues).and_return([foo_worker.queue, bar_worker.queue])
end
describe '.install!' do
it 'prepends SidekiqVersioning::Manager into Sidekiq::Manager' do
described_class.install!
expect(Sidekiq::Manager).to include(Gitlab::SidekiqVersioning::Manager)
end
it 'registers all versionless and versioned queues with Redis' do
described_class.install!
queues = Sidekiq::Queue.all.map(&:name)
expect(queues).to include('foo')
expect(queues).to include('bar')
end
end
end
......@@ -17,6 +17,14 @@ describe ApplicationWorker do
end
end
describe '.queue_namespace' do
it 'sets the queue name based on the class name' do
worker.queue_namespace :some_namespace
expect(worker.queue).to eq('some_namespace:foo_bar_dummy')
end
end
describe '.queue' do
it 'returns the queue name' do
worker.sidekiq_options queue: :some_queue
......
......@@ -14,6 +14,6 @@ describe ClusterQueue do
it 'sets a default pipelines queue automatically' do
expect(worker.sidekiq_options['queue'])
.to eq :gcp_cluster
.to eq 'gcp_cluster:dummy'
end
end
......@@ -13,7 +13,7 @@ describe CronjobQueue do
end
it 'sets the queue name of a worker' do
expect(worker.sidekiq_options['queue'].to_s).to eq('cronjob')
expect(worker.sidekiq_options['queue'].to_s).to eq('cronjob:dummy')
end
it 'disables retrying of failed jobs' do
......
......@@ -11,6 +11,6 @@ describe Gitlab::GithubImport::Queue do
include Gitlab::GithubImport::Queue
end
expect(worker.sidekiq_options['queue']).to eq('github_importer')
expect(worker.sidekiq_options['queue']).to eq('github_importer:dummy')
end
end
......@@ -14,15 +14,6 @@ describe PipelineQueue do
it 'sets a default pipelines queue automatically' do
expect(worker.sidekiq_options['queue'])
.to eq 'pipeline_default'
end
describe '.enqueue_in' do
it 'sets a custom sidekiq queue with prefix and group' do
worker.enqueue_in(group: :processing)
expect(worker.sidekiq_options['queue'])
.to eq 'pipeline_processing'
end
.to eq 'pipeline_default:dummy'
end
end
......@@ -13,7 +13,7 @@ describe RepositoryCheckQueue do
end
it 'sets the queue name of a worker' do
expect(worker.sidekiq_options['queue'].to_s).to eq('repository_check')
expect(worker.sidekiq_options['queue'].to_s).to eq('repository_check:dummy')
end
it 'disables retrying of failed jobs' do
......
......@@ -10,12 +10,31 @@ describe 'Every Sidekiq worker' do
end
it 'uses the cronjob queue when the worker runs as a cronjob' do
expect(Gitlab::SidekiqConfig.cron_workers.map(&:queue)).to all(eq('cronjob'))
expect(Gitlab::SidekiqConfig.cron_workers.map(&:queue)).to all(start_with('cronjob:'))
end
it 'defines the queue in the Sidekiq configuration file' do
config_queue_names = Gitlab::SidekiqConfig.config_queues.to_set
it 'has its queue in app/workers/all_queues.yml', :aggregate_failures do
file_worker_queues = Gitlab::SidekiqConfig.worker_queues.to_set
expect(Gitlab::SidekiqConfig.worker_queues).to all(be_in(config_queue_names))
worker_queues = Gitlab::SidekiqConfig.workers.map(&:queue).to_set
worker_queues << ActionMailer::DeliveryJob.queue_name
worker_queues << 'default'
missing_from_file = worker_queues - file_worker_queues
expect(missing_from_file).to be_empty, "expected #{missing_from_file.to_a.inspect} to be in app/workers/all_queues.yml"
unncessarily_in_file = file_worker_queues - worker_queues
expect(unncessarily_in_file).to be_empty, "expected #{unncessarily_in_file.to_a.inspect} not to be in app/workers/all_queues.yml"
end
it 'has its queue or namespace in config/sidekiq_queues.yml', :aggregate_failures do
config_queues = Gitlab::SidekiqConfig.config_queues.to_set
Gitlab::SidekiqConfig.workers.each do |worker|
queue = worker.queue
queue_namespace = queue.split(':').first
expect(config_queues).to include(queue).or(include(queue_namespace))
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment