Commit 39fc1bb3 authored by Douglas Barbosa Alexandre's avatar Douglas Barbosa Alexandre

Merge branch 'add-weights-to-sidekiq-workers' into 'master'

Move Sidekiq worker weights to application code

See merge request gitlab-org/gitlab!23253
parents a8c080b4 a77f8992
...@@ -6,6 +6,7 @@ class AuthorizedProjectsWorker ...@@ -6,6 +6,7 @@ class AuthorizedProjectsWorker
feature_category :authentication_and_authorization feature_category :authentication_and_authorization
latency_sensitive_worker! latency_sensitive_worker!
weight 2
# This is a workaround for a Ruby 2.3.7 bug. rspec-mocks cannot restore the # This is a workaround for a Ruby 2.3.7 bug. rspec-mocks cannot restore the
# visibility of prepended modules. See https://github.com/rspec/rspec-mocks/issues/1231 # visibility of prepended modules. See https://github.com/rspec/rspec-mocks/issues/1231
......
...@@ -8,6 +8,8 @@ class ChatNotificationWorker ...@@ -8,6 +8,8 @@ class ChatNotificationWorker
sidekiq_options retry: false sidekiq_options retry: false
feature_category :chatops feature_category :chatops
latency_sensitive_worker! latency_sensitive_worker!
weight 2
# TODO: break this into multiple jobs # TODO: break this into multiple jobs
# as the `responder` uses external dependencies # as the `responder` uses external dependencies
# See https://gitlab.com/gitlab-com/gl-infra/scalability/issues/34 # See https://gitlab.com/gitlab-com/gl-infra/scalability/issues/34
......
...@@ -9,6 +9,7 @@ module SelfMonitoringProjectWorker ...@@ -9,6 +9,7 @@ module SelfMonitoringProjectWorker
# Other Functionality. Metrics seems to be the closest feature_category for # Other Functionality. Metrics seems to be the closest feature_category for
# this worker. # this worker.
feature_category :metrics feature_category :metrics
weight 2
end end
LEASE_TIMEOUT = 15.minutes.to_i LEASE_TIMEOUT = 15.minutes.to_i
......
...@@ -7,6 +7,24 @@ module WorkerAttributes ...@@ -7,6 +7,24 @@ module WorkerAttributes
# `worker_resource_boundary` attribute # `worker_resource_boundary` attribute
VALID_RESOURCE_BOUNDARIES = [:memory, :cpu, :unknown].freeze VALID_RESOURCE_BOUNDARIES = [:memory, :cpu, :unknown].freeze
NAMESPACE_WEIGHTS = {
auto_devops: 2,
auto_merge: 3,
chaos: 2,
deployment: 3,
mail_scheduler: 2,
notifications: 2,
pipeline_cache: 3,
pipeline_creation: 4,
pipeline_default: 3,
pipeline_hooks: 2,
pipeline_processing: 5,
# EE-specific
epics: 2,
incident_management: 2
}.stringify_keys.freeze
class_methods do class_methods do
def feature_category(value) def feature_category(value)
raise "Invalid category. Use `feature_category_not_owned!` to mark a worker as not owned" if value == :not_owned raise "Invalid category. Use `feature_category_not_owned!` to mark a worker as not owned" if value == :not_owned
...@@ -70,6 +88,16 @@ module WorkerAttributes ...@@ -70,6 +88,16 @@ module WorkerAttributes
worker_attributes[:resource_boundary] || :unknown worker_attributes[:resource_boundary] || :unknown
end end
def weight(value)
worker_attributes[:weight] = value
end
def get_weight
worker_attributes[:weight] ||
NAMESPACE_WEIGHTS[queue_namespace] ||
1
end
protected protected
# Returns a worker attribute declared on this class or its parent class. # Returns a worker attribute declared on this class or its parent class.
......
...@@ -4,6 +4,7 @@ class CreateEvidenceWorker ...@@ -4,6 +4,7 @@ class CreateEvidenceWorker
include ApplicationWorker include ApplicationWorker
feature_category :release_governance feature_category :release_governance
weight 2
def perform(release_id) def perform(release_id)
release = Release.find_by_id(release_id) release = Release.find_by_id(release_id)
......
...@@ -4,6 +4,7 @@ class CreateGpgSignatureWorker ...@@ -4,6 +4,7 @@ class CreateGpgSignatureWorker
include ApplicationWorker include ApplicationWorker
feature_category :source_code_management feature_category :source_code_management
weight 2
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def perform(commit_shas, project_id) def perform(commit_shas, project_id)
......
...@@ -5,6 +5,7 @@ class EmailReceiverWorker ...@@ -5,6 +5,7 @@ class EmailReceiverWorker
feature_category :issue_tracking feature_category :issue_tracking
latency_sensitive_worker! latency_sensitive_worker!
weight 2
def perform(raw) def perform(raw)
return unless Gitlab::IncomingEmail.enabled? return unless Gitlab::IncomingEmail.enabled?
......
...@@ -8,6 +8,7 @@ class EmailsOnPushWorker ...@@ -8,6 +8,7 @@ class EmailsOnPushWorker
feature_category :source_code_management feature_category :source_code_management
latency_sensitive_worker! latency_sensitive_worker!
worker_resource_boundary :cpu worker_resource_boundary :cpu
weight 2
def perform(project_id, recipients, push_data, options = {}) def perform(project_id, recipients, push_data, options = {})
options.symbolize_keys! options.symbolize_keys!
......
...@@ -6,6 +6,7 @@ class GitlabShellWorker ...@@ -6,6 +6,7 @@ class GitlabShellWorker
feature_category :source_code_management feature_category :source_code_management
latency_sensitive_worker! latency_sensitive_worker!
weight 2
def perform(action, *arg) def perform(action, *arg)
Gitlab::GitalyClient::NamespaceService.allow do Gitlab::GitalyClient::NamespaceService.allow do
......
...@@ -5,6 +5,7 @@ class ImportIssuesCsvWorker ...@@ -5,6 +5,7 @@ class ImportIssuesCsvWorker
feature_category :issue_tracking feature_category :issue_tracking
worker_resource_boundary :cpu worker_resource_boundary :cpu
weight 2
sidekiq_retries_exhausted do |job| sidekiq_retries_exhausted do |job|
Upload.find(job['args'][2]).destroy Upload.find(job['args'][2]).destroy
......
...@@ -4,6 +4,7 @@ class InvalidGpgSignatureUpdateWorker ...@@ -4,6 +4,7 @@ class InvalidGpgSignatureUpdateWorker
include ApplicationWorker include ApplicationWorker
feature_category :source_code_management feature_category :source_code_management
weight 2
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def perform(gpg_key_id) def perform(gpg_key_id)
......
...@@ -5,6 +5,7 @@ class MergeWorker ...@@ -5,6 +5,7 @@ class MergeWorker
feature_category :source_code_management feature_category :source_code_management
latency_sensitive_worker! latency_sensitive_worker!
weight 5
def perform(merge_request_id, current_user_id, params) def perform(merge_request_id, current_user_id, params)
params = params.with_indifferent_access params = params.with_indifferent_access
......
...@@ -7,6 +7,7 @@ class NewIssueWorker ...@@ -7,6 +7,7 @@ class NewIssueWorker
feature_category :issue_tracking feature_category :issue_tracking
latency_sensitive_worker! latency_sensitive_worker!
worker_resource_boundary :cpu worker_resource_boundary :cpu
weight 2
def perform(issue_id, user_id) def perform(issue_id, user_id)
return unless objects_found?(issue_id, user_id) return unless objects_found?(issue_id, user_id)
......
...@@ -7,6 +7,7 @@ class NewMergeRequestWorker ...@@ -7,6 +7,7 @@ class NewMergeRequestWorker
feature_category :source_code_management feature_category :source_code_management
latency_sensitive_worker! latency_sensitive_worker!
worker_resource_boundary :cpu worker_resource_boundary :cpu
weight 2
def perform(merge_request_id, user_id) def perform(merge_request_id, user_id)
return unless objects_found?(merge_request_id, user_id) return unless objects_found?(merge_request_id, user_id)
......
...@@ -6,6 +6,7 @@ class NewNoteWorker ...@@ -6,6 +6,7 @@ class NewNoteWorker
feature_category :issue_tracking feature_category :issue_tracking
latency_sensitive_worker! latency_sensitive_worker!
worker_resource_boundary :cpu worker_resource_boundary :cpu
weight 2
# Keep extra parameter to preserve backwards compatibility with # Keep extra parameter to preserve backwards compatibility with
# old `NewNoteWorker` jobs (can remove later) # old `NewNoteWorker` jobs (can remove later)
......
...@@ -5,6 +5,7 @@ class NewReleaseWorker ...@@ -5,6 +5,7 @@ class NewReleaseWorker
queue_namespace :notifications queue_namespace :notifications
feature_category :release_orchestration feature_category :release_orchestration
weight 2
def perform(release_id) def perform(release_id)
release = Release.preloaded.find_by_id(release_id) release = Release.preloaded.find_by_id(release_id)
......
...@@ -6,6 +6,7 @@ class PostReceive ...@@ -6,6 +6,7 @@ class PostReceive
feature_category :source_code_management feature_category :source_code_management
latency_sensitive_worker! latency_sensitive_worker!
worker_resource_boundary :cpu worker_resource_boundary :cpu
weight 5
def perform(gl_repository, identifier, changes, push_options = {}) def perform(gl_repository, identifier, changes, push_options = {})
project, repo_type = Gitlab::GlRepository.parse(gl_repository) project, repo_type = Gitlab::GlRepository.parse(gl_repository)
......
...@@ -12,6 +12,7 @@ class ProcessCommitWorker ...@@ -12,6 +12,7 @@ class ProcessCommitWorker
feature_category :source_code_management feature_category :source_code_management
latency_sensitive_worker! latency_sensitive_worker!
weight 3
# project_id - The ID of the project this commit belongs to. # project_id - The ID of the project this commit belongs to.
# user_id - The ID of the user that pushed the commit. # user_id - The ID of the user that pushed the commit.
......
...@@ -6,6 +6,7 @@ class RebaseWorker ...@@ -6,6 +6,7 @@ class RebaseWorker
include ApplicationWorker include ApplicationWorker
feature_category :source_code_management feature_category :source_code_management
weight 2
def perform(merge_request_id, current_user_id, skip_ci = false) def perform(merge_request_id, current_user_id, skip_ci = false)
current_user = User.find(current_user_id) current_user = User.find(current_user_id)
......
...@@ -4,6 +4,7 @@ class RemoteMirrorNotificationWorker ...@@ -4,6 +4,7 @@ class RemoteMirrorNotificationWorker
include ApplicationWorker include ApplicationWorker
feature_category :source_code_management feature_category :source_code_management
weight 2
def perform(remote_mirror_id) def perform(remote_mirror_id)
remote_mirror = RemoteMirror.find_by_id(remote_mirror_id) remote_mirror = RemoteMirror.find_by_id(remote_mirror_id)
......
...@@ -4,6 +4,7 @@ class UpdateExternalPullRequestsWorker ...@@ -4,6 +4,7 @@ class UpdateExternalPullRequestsWorker
include ApplicationWorker include ApplicationWorker
feature_category :source_code_management feature_category :source_code_management
weight 3
def perform(project_id, user_id, ref) def perform(project_id, user_id, ref)
project = Project.find_by_id(project_id) project = Project.find_by_id(project_id)
......
...@@ -6,6 +6,7 @@ class UpdateMergeRequestsWorker ...@@ -6,6 +6,7 @@ class UpdateMergeRequestsWorker
feature_category :source_code_management feature_category :source_code_management
latency_sensitive_worker! latency_sensitive_worker!
worker_resource_boundary :cpu worker_resource_boundary :cpu
weight 3
LOG_TIME_THRESHOLD = 90 # seconds LOG_TIME_THRESHOLD = 90 # seconds
......
# This file is generated automatically by
# bin/rake gitlab:sidekiq:sidekiq_queues_yml:generate
#
# Do not edit it manually!
#
# This configuration file should be exclusively used to set queue settings for # This configuration file should be exclusively used to set queue settings for
# Sidekiq. Any other setting should be specified using the Sidekiq CLI or the # Sidekiq. Any other setting should be specified using the Sidekiq CLI or the
# Sidekiq Ruby API (see config/initializers/sidekiq.rb). # Sidekiq Ruby API (see config/initializers/sidekiq.rb).
--- #
# All the queues to process and their weights. Every queue _must_ have a weight # All the queues to process and their weights. Every queue _must_ have a weight
# defined. # defined.
# #
...@@ -17,116 +22,217 @@ ...@@ -17,116 +22,217 @@
# to perform) is: # to perform) is:
# #
# chance = (queue weight / total weight of all queues) * 100 # chance = (queue weight / total weight of all queues) * 100
---
:queues: :queues:
- [post_receive, 5] - - adjourned_project_deletion
- [merge, 5] - 1
- [update_merge_requests, 3] - - admin_emails
- [process_commit, 3] - 1
- [new_note, 2] - - authorized_projects
- [new_issue, 2] - 2
- [notifications, 2] - - auto_devops
- [new_merge_request, 2] - 2
- [pipeline_processing, 5] - - auto_merge
- [pipeline_creation, 4] - 3
- [pipeline_default, 3] - - background_migration
- [pipeline_cache, 3] - 1
- [deployment, 3] - - chaos
- [auto_merge, 3] - 2
- [pipeline_hooks, 2] - - chat_notification
- [gitlab_shell, 2] - 2
- [email_receiver, 2] - - container_repository
- [emails_on_push, 2] - 1
- [mailers, 2] - - create_evidence
- [mail_scheduler, 2] - 2
- [invalid_gpg_signature_update, 2] - - create_github_webhook
- [create_gpg_signature, 2] - 2
- [rebase, 2] - - create_gpg_signature
- [upload_checksum, 1] - 2
- [repository_fork, 1] - - create_note_diff_file
- [repository_import, 1] - 1
- [github_importer, 1] - - cronjob
- [github_import_advance_stage, 1] - 1
- [project_service, 1] - - default
- [delete_user, 1] - 1
- [todos_destroyer, 1] - - delete_diff_files
- [delete_merged_branches, 1] - 1
- [authorized_projects, 2] - - delete_merged_branches
- [expire_build_instance_artifacts, 1] - 1
- [group_destroy, 1] - - delete_stored_files
- [irker, 1] - 1
- [namespaceless_project_destroy, 1] - - delete_user
- [project_cache, 1] - 1
- [project_destroy, 1] - - deployment
- [project_export, 1] - 3
- [web_hook, 1] - - design_management_new_version
- [repository_check, 1] - 1
- [git_garbage_collect, 1] - - detect_repository_languages
- [reactive_caching, 1] - 1
- [cronjob, 1] - - elastic_batch_project_indexer
- [default, 1] - 1
- [pages, 1] - - elastic_commit_indexer
- [system_hook_push, 1] - 1
- [propagate_service_template, 1] - - elastic_full_index
- [background_migration, 1] - 1
- [gcp_cluster, 1] - - elastic_indexer
- [project_migrate_hashed_storage, 1] - 1
- [project_rollback_hashed_storage, 1] - - elastic_namespace_indexer
- [hashed_storage, 1] - 1
- [pages_domain_verification, 1] - - elastic_namespace_rollout
- [pages_domain_ssl_renewal, 1] - 1
- [object_storage_upload, 1] - - email_receiver
- [object_storage, 1] - 2
- [file_hook, 1] - - emails_on_push
- [pipeline_background, 1] - 2
- [repository_update_remote_mirror, 1] - - epics
- [repository_remove_remote, 1] - 2
- [create_note_diff_file, 1] - - error_tracking_issue_link
- [delete_diff_files, 1] - 1
- [detect_repository_languages, 1] - - expire_build_instance_artifacts
- [auto_devops, 2] - 1
- [container_repository, 1] - - export_csv
- [object_pool, 1] - 1
- [repository_cleanup, 1] - - file_hook
- [delete_stored_files, 1] - 1
- [remote_mirror_notification, 2] - - gcp_cluster
- [project_daily_statistics, 1] - 1
- [import_issues_csv, 2] - - geo
- [chat_notification, 2] - 1
- [migrate_external_diffs, 1] - - git_garbage_collect
- [update_project_statistics, 1] - 1
- [phabricator_import_import_tasks, 1] - - github_import_advance_stage
- [update_namespace_statistics, 1] - 1
- [chaos, 2] - - github_importer
- [create_evidence, 2] - 1
- [group_export, 1] - - gitlab_shell
- [self_monitoring_project_create, 2] - 2
- [self_monitoring_project_delete, 2] - - group_destroy
- [error_tracking_issue_link, 2] - 1
- [merge_request_mergeability_check, 5] - - group_export
- 1
# EE-specific queues - - hashed_storage
- [analytics, 1] - 1
- [ldap_group_sync, 2] - - import_issues_csv
- [create_github_webhook, 2] - 2
- [geo, 1] - - incident_management
- [repository_update_mirror, 1] - 2
- [repository_push_audit_event, 1] - - invalid_gpg_signature_update
- [new_epic, 2] - 2
- [project_import_schedule, 1] - - irker
- [project_update_repository_storage, 1] - 1
- [admin_emails, 1] - - jira_connect
- [elastic_batch_project_indexer, 1] - 1
- [elastic_indexer, 1] - - ldap_group_sync
- [elastic_full_index, 1] - 2
- [elastic_commit_indexer, 1] - - mail_scheduler
- [elastic_namespace_indexer, 1] - 2
- [elastic_namespace_rollout, 1] - - mailers
- [export_csv, 1] - 2
- [incident_management, 2] - - merge
- [jira_connect, 1] - 5
- [update_external_pull_requests, 3] - - merge_request_mergeability_check
- [refresh_license_compliance_checks, 2] - 1
- [design_management_new_version, 1] - - migrate_external_diffs
- [epics, 2] - 1
- [personal_access_tokens, 1] - - namespaceless_project_destroy
- [adjourned_project_deletion, 1] - 1
- - new_epic
- 2
- - new_issue
- 2
- - new_merge_request
- 2
- - new_note
- 2
- - notifications
- 2
- - object_pool
- 1
- - object_storage
- 1
- - pages
- 1
- - pages_domain_ssl_renewal
- 1
- - pages_domain_verification
- 1
- - personal_access_tokens
- 1
- - phabricator_import_import_tasks
- 1
- - pipeline_background
- 1
- - pipeline_cache
- 3
- - pipeline_creation
- 4
- - pipeline_default
- 3
- - pipeline_hooks
- 2
- - pipeline_processing
- 5
- - post_receive
- 5
- - process_commit
- 3
- - project_cache
- 1
- - project_daily_statistics
- 1
- - project_destroy
- 1
- - project_export
- 1
- - project_import_schedule
- 1
- - project_service
- 1
- - project_update_repository_storage
- 1
- - propagate_service_template
- 1
- - reactive_caching
- 1
- - rebase
- 2
- - refresh_license_compliance_checks
- 2
- - remote_mirror_notification
- 2
- - repository_check
- 1
- - repository_cleanup
- 1
- - repository_fork
- 1
- - repository_import
- 1
- - repository_push_audit_event
- 1
- - repository_remove_remote
- 1
- - repository_update_mirror
- 1
- - repository_update_remote_mirror
- 1
- - self_monitoring_project_create
- 2
- - self_monitoring_project_delete
- 2
- - system_hook_push
- 1
- - todos_destroyer
- 1
- - update_external_pull_requests
- 3
- - update_merge_requests
- 3
- - update_namespace_statistics
- 1
- - update_project_statistics
- 1
- - upload_checksum
- 1
- - web_hook
- 1
...@@ -7,6 +7,7 @@ class CreateGithubWebhookWorker ...@@ -7,6 +7,7 @@ class CreateGithubWebhookWorker
feature_category :integrations feature_category :integrations
worker_resource_boundary :cpu worker_resource_boundary :cpu
worker_has_external_dependencies! worker_has_external_dependencies!
weight 2
attr_reader :project attr_reader :project
......
...@@ -5,6 +5,7 @@ class LdapGroupSyncWorker ...@@ -5,6 +5,7 @@ class LdapGroupSyncWorker
feature_category :authentication_and_authorization feature_category :authentication_and_authorization
worker_has_external_dependencies! worker_has_external_dependencies!
weight 2
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def perform(group_ids, provider = nil) def perform(group_ids, provider = nil)
......
...@@ -6,6 +6,7 @@ class NewEpicWorker ...@@ -6,6 +6,7 @@ class NewEpicWorker
feature_category :epics feature_category :epics
worker_resource_boundary :cpu worker_resource_boundary :cpu
weight 2
def perform(epic_id, user_id) def perform(epic_id, user_id)
return unless objects_found?(epic_id, user_id) return unless objects_found?(epic_id, user_id)
......
...@@ -4,6 +4,7 @@ class RefreshLicenseComplianceChecksWorker ...@@ -4,6 +4,7 @@ class RefreshLicenseComplianceChecksWorker
include ApplicationWorker include ApplicationWorker
feature_category :license_compliance feature_category :license_compliance
weight 2
def perform(project_id) def perform(project_id)
project = Project.find(project_id) project = Project.find(project_id)
......
...@@ -6,6 +6,7 @@ module Gitlab ...@@ -6,6 +6,7 @@ module Gitlab
module SidekiqConfig module SidekiqConfig
FOSS_QUEUE_CONFIG_PATH = 'app/workers/all_queues.yml' FOSS_QUEUE_CONFIG_PATH = 'app/workers/all_queues.yml'
EE_QUEUE_CONFIG_PATH = 'ee/app/workers/all_queues.yml' EE_QUEUE_CONFIG_PATH = 'ee/app/workers/all_queues.yml'
SIDEKIQ_QUEUES_PATH = 'config/sidekiq_queues.yml'
QUEUE_CONFIG_PATHS = [ QUEUE_CONFIG_PATHS = [
FOSS_QUEUE_CONFIG_PATH, FOSS_QUEUE_CONFIG_PATH,
...@@ -13,11 +14,19 @@ module Gitlab ...@@ -13,11 +14,19 @@ module Gitlab
].compact.freeze ].compact.freeze
# For queues that don't have explicit workers - default and mailers # For queues that don't have explicit workers - default and mailers
DummyWorker = Struct.new(:queue) DummyWorker = Struct.new(:queue, :weight) do
def queue_namespace
nil
end
def get_weight
weight
end
end
DEFAULT_WORKERS = [ DEFAULT_WORKERS = [
Gitlab::SidekiqConfig::Worker.new(DummyWorker.new('default'), ee: false), Gitlab::SidekiqConfig::Worker.new(DummyWorker.new('default', 1), ee: false),
Gitlab::SidekiqConfig::Worker.new(DummyWorker.new('mailers'), ee: false) Gitlab::SidekiqConfig::Worker.new(DummyWorker.new('mailers', 2), ee: false)
].freeze ].freeze
class << self class << self
...@@ -30,7 +39,7 @@ module Gitlab ...@@ -30,7 +39,7 @@ module Gitlab
def config_queues def config_queues
@config_queues ||= begin @config_queues ||= begin
config = YAML.load_file(Rails.root.join('config/sidekiq_queues.yml')) config = YAML.load_file(Rails.root.join(SIDEKIQ_QUEUES_PATH))
config[:queues].map(&:first) config[:queues].map(&:first)
end end
end end
...@@ -65,6 +74,28 @@ module Gitlab ...@@ -65,6 +74,28 @@ module Gitlab
Gitlab.ee? && ee_workers != YAML.safe_load(File.read(EE_QUEUE_CONFIG_PATH)) Gitlab.ee? && ee_workers != YAML.safe_load(File.read(EE_QUEUE_CONFIG_PATH))
end end
def queues_for_sidekiq_queues_yml
namespaces_with_equal_weights =
workers
.group_by(&:queue_namespace)
.map(&:last)
.select { |workers| workers.map(&:get_weight).uniq.count == 1 }
.map(&:first)
namespaces = namespaces_with_equal_weights.map(&:queue_namespace).to_set
remaining_queues = workers.reject { |worker| namespaces.include?(worker.queue_namespace) }
(namespaces_with_equal_weights.map(&:namespace_and_weight) +
remaining_queues.map(&:queue_and_weight)).sort
end
def sidekiq_queues_yml_outdated?
# YAML.load is OK here as we control the file contents
config_queues = YAML.load(File.read(SIDEKIQ_QUEUES_PATH))[:queues] # rubocop:disable Security/YAMLLoad
queues_for_sidekiq_queues_yml != config_queues
end
private private
def find_workers(root, ee:) def find_workers(root, ee:)
......
...@@ -7,8 +7,9 @@ module Gitlab ...@@ -7,8 +7,9 @@ module Gitlab
attr_reader :klass attr_reader :klass
delegate :feature_category_not_owned?, :get_feature_category, delegate :feature_category_not_owned?, :get_feature_category,
:get_worker_resource_boundary, :latency_sensitive_worker?, :get_weight, :get_worker_resource_boundary,
:queue, :worker_has_external_dependencies?, :latency_sensitive_worker?, :queue, :queue_namespace,
:worker_has_external_dependencies?,
to: :klass to: :klass
def initialize(klass, ee:) def initialize(klass, ee:)
...@@ -35,7 +36,7 @@ module Gitlab ...@@ -35,7 +36,7 @@ module Gitlab
# Put namespaced queues first # Put namespaced queues first
def to_sort def to_sort
[queue.include?(':') ? 0 : 1, queue] [queue_namespace ? 0 : 1, queue]
end end
# YAML representation # YAML representation
...@@ -46,6 +47,14 @@ module Gitlab ...@@ -46,6 +47,14 @@ module Gitlab
def to_yaml def to_yaml
queue queue
end end
def namespace_and_weight
[queue_namespace, get_weight]
end
def queue_and_weight
[queue, get_weight]
end
end end
end end
end end
...@@ -4,8 +4,13 @@ return if Rails.env.production? ...@@ -4,8 +4,13 @@ return if Rails.env.production?
namespace :gitlab do namespace :gitlab do
namespace :sidekiq do namespace :sidekiq do
def write_yaml(path, banner, object)
File.write(path, banner + YAML.dump(object))
end
namespace :all_queues_yml do namespace :all_queues_yml do
def write_yaml(path, object) desc 'GitLab | Sidekiq | Generate all_queues.yml based on worker definitions'
task generate: :environment do
banner = <<~BANNER banner = <<~BANNER
# This file is generated automatically by # This file is generated automatically by
# bin/rake gitlab:sidekiq:all_queues_yml:generate # bin/rake gitlab:sidekiq:all_queues_yml:generate
...@@ -13,17 +18,12 @@ namespace :gitlab do ...@@ -13,17 +18,12 @@ namespace :gitlab do
# Do not edit it manually! # Do not edit it manually!
BANNER BANNER
File.write(path, banner + YAML.dump(object))
end
desc 'GitLab | Sidekiq | Generate all_queues.yml based on worker definitions'
task generate: :environment do
foss_workers, ee_workers = Gitlab::SidekiqConfig.workers_for_all_queues_yml foss_workers, ee_workers = Gitlab::SidekiqConfig.workers_for_all_queues_yml
write_yaml(Gitlab::SidekiqConfig::FOSS_QUEUE_CONFIG_PATH, foss_workers) write_yaml(Gitlab::SidekiqConfig::FOSS_QUEUE_CONFIG_PATH, banner, foss_workers)
if Gitlab.ee? if Gitlab.ee?
write_yaml(Gitlab::SidekiqConfig::EE_QUEUE_CONFIG_PATH, ee_workers) write_yaml(Gitlab::SidekiqConfig::EE_QUEUE_CONFIG_PATH, banner, ee_workers)
end end
end end
...@@ -44,5 +44,57 @@ namespace :gitlab do ...@@ -44,5 +44,57 @@ namespace :gitlab do
end end
end end
end end
namespace :sidekiq_queues_yml do
desc 'GitLab | Sidekiq | Generate sidekiq_queues.yml based on worker definitions'
task generate: :environment do
banner = <<~BANNER
# This file is generated automatically by
# bin/rake gitlab:sidekiq:sidekiq_queues_yml:generate
#
# Do not edit it manually!
#
# This configuration file should be exclusively used to set queue settings for
# Sidekiq. Any other setting should be specified using the Sidekiq CLI or the
# Sidekiq Ruby API (see config/initializers/sidekiq.rb).
#
# All the queues to process and their weights. Every queue _must_ have a weight
# defined.
#
# The available weights are as follows
#
# 1: low priority
# 2: medium priority
# 3: high priority
# 5: _super_ high priority, this should only be used for _very_ important queues
#
# As per http://stackoverflow.com/a/21241357/290102 the formula for calculating
# the likelihood of a job being popped off a queue (given all queues have work
# to perform) is:
#
# chance = (queue weight / total weight of all queues) * 100
BANNER
queues_and_weights = Gitlab::SidekiqConfig.queues_for_sidekiq_queues_yml
write_yaml(Gitlab::SidekiqConfig::SIDEKIQ_QUEUES_PATH, banner, queues: queues_and_weights)
end
desc 'GitLab | Sidekiq | Validate that sidekiq_queues.yml matches worker definitions'
task check: :environment do
if Gitlab::SidekiqConfig.sidekiq_queues_yml_outdated?
raise <<~MSG
Changes in worker queues found, please update the metadata by running:
bin/rake gitlab:sidekiq:sidekiq_queues_yml:generate
Then commit and push the changes from:
- #{Gitlab::SidekiqConfig::SIDEKIQ_QUEUES_PATH}
MSG
end
end
end
end end
end end
...@@ -38,11 +38,13 @@ unless Rails.env.production? ...@@ -38,11 +38,13 @@ unless Rails.env.production?
] ]
if Gitlab.ee? if Gitlab.ee?
# This task will fail on CE installations (e.g. gitlab-org/gitlab-foss) # These tasks will fail on FOSS installations
# since it will detect strings in the locale files that do not exist in # (e.g. gitlab-org/gitlab-foss) since they test against a single
# the source files. To work around this we will only enable this task on # file that is generated by an EE installation, which can
# EE installations. # contain values that a FOSS installation won't find. To work
# around this we will only enable this task on EE installations.
tasks << 'gettext:updated_check' tasks << 'gettext:updated_check'
tasks << 'gitlab:sidekiq:sidekiq_queues_yml:check'
end end
tasks.each do |task| tasks.each do |task|
......
...@@ -3,8 +3,11 @@ ...@@ -3,8 +3,11 @@
require 'fast_spec_helper' require 'fast_spec_helper'
describe Gitlab::SidekiqConfig::Worker do describe Gitlab::SidekiqConfig::Worker do
def worker_with_queue(queue) def create_worker(queue:, weight: 0)
described_class.new(double(queue: queue), ee: false) namespace = queue.include?(':') && queue.split(':').first
inner_worker = double(queue: queue, queue_namespace: namespace, get_weight: weight)
described_class.new(inner_worker, ee: false)
end end
describe '#ee?' do describe '#ee?' do
...@@ -34,9 +37,9 @@ describe Gitlab::SidekiqConfig::Worker do ...@@ -34,9 +37,9 @@ describe Gitlab::SidekiqConfig::Worker do
describe 'delegations' do describe 'delegations' do
[ [
:feature_category_not_owned?, :get_feature_category, :feature_category_not_owned?, :get_feature_category, :get_weight,
:get_worker_resource_boundary, :latency_sensitive_worker?, :queue, :get_worker_resource_boundary, :latency_sensitive_worker?, :queue,
:worker_has_external_dependencies? :queue_namespace, :worker_has_external_dependencies?
].each do |meth| ].each do |meth|
it "delegates #{meth} to the worker class" do it "delegates #{meth} to the worker class" do
worker = double worker = double
...@@ -50,8 +53,8 @@ describe Gitlab::SidekiqConfig::Worker do ...@@ -50,8 +53,8 @@ describe Gitlab::SidekiqConfig::Worker do
describe 'sorting' do describe 'sorting' do
it 'sorts queues with a namespace before those without a namespace' do it 'sorts queues with a namespace before those without a namespace' do
namespaced_worker = worker_with_queue('namespace:queue') namespaced_worker = create_worker(queue: 'namespace:queue')
plain_worker = worker_with_queue('a_queue') plain_worker = create_worker(queue: 'a_queue')
expect([plain_worker, namespaced_worker].sort) expect([plain_worker, namespaced_worker].sort)
.to eq([namespaced_worker, plain_worker]) .to eq([namespaced_worker, plain_worker])
...@@ -59,12 +62,12 @@ describe Gitlab::SidekiqConfig::Worker do ...@@ -59,12 +62,12 @@ describe Gitlab::SidekiqConfig::Worker do
it 'sorts alphabetically by queue' do it 'sorts alphabetically by queue' do
workers = [ workers = [
worker_with_queue('namespace:a'), create_worker(queue: 'namespace:a'),
worker_with_queue('namespace:b'), create_worker(queue: 'namespace:b'),
worker_with_queue('other_namespace:a'), create_worker(queue: 'other_namespace:a'),
worker_with_queue('other_namespace:b'), create_worker(queue: 'other_namespace:b'),
worker_with_queue('a'), create_worker(queue: 'a'),
worker_with_queue('b') create_worker(queue: 'b')
] ]
expect(workers.shuffle.sort).to eq(workers) expect(workers.shuffle.sort).to eq(workers)
...@@ -73,12 +76,26 @@ describe Gitlab::SidekiqConfig::Worker do ...@@ -73,12 +76,26 @@ describe Gitlab::SidekiqConfig::Worker do
describe 'YAML encoding' do describe 'YAML encoding' do
it 'encodes the worker in YAML as a string of the queue' do it 'encodes the worker in YAML as a string of the queue' do
worker_a = worker_with_queue('a') worker_a = create_worker(queue: 'a')
worker_b = worker_with_queue('b') worker_b = create_worker(queue: 'b')
expect(YAML.dump(worker_a)).to eq(YAML.dump('a')) expect(YAML.dump(worker_a)).to eq(YAML.dump('a'))
expect(YAML.dump([worker_a, worker_b])) expect(YAML.dump([worker_a, worker_b]))
.to eq(YAML.dump(%w[a b])) .to eq(YAML.dump(%w[a b]))
end end
end end
describe '#namespace_and_weight' do
it 'returns a namespace, weight pair for the worker' do
expect(create_worker(queue: 'namespace:a', weight: 2).namespace_and_weight)
.to eq(['namespace', 2])
end
end
describe '#queue_and_weight' do
it 'returns a queue, weight pair for the worker' do
expect(create_worker(queue: 'namespace:a', weight: 2).queue_and_weight)
.to eq(['namespace:a', 2])
end
end
end end
...@@ -80,4 +80,64 @@ describe Gitlab::SidekiqConfig do ...@@ -80,4 +80,64 @@ describe Gitlab::SidekiqConfig do
expect(described_class.all_queues_yml_outdated?).to be(false) expect(described_class.all_queues_yml_outdated?).to be(false)
end end
end end
describe '.queues_for_sidekiq_queues_yml' do
before do
workers = [
Namespaces::RootStatisticsWorker,
Namespaces::ScheduleAggregationWorker,
MergeWorker,
ProcessCommitWorker
].map { |worker| described_class::Worker.new(worker, ee: false) }
allow(described_class).to receive(:workers).and_return(workers)
end
it 'returns queues and weights, aggregating namespaces with the same weight' do
expected_queues = [
['merge', 5],
['process_commit', 3],
['update_namespace_statistics', 1]
]
expect(described_class.queues_for_sidekiq_queues_yml).to eq(expected_queues)
end
end
describe '.sidekiq_queues_yml_outdated?' do
before do
workers = [
Namespaces::RootStatisticsWorker,
Namespaces::ScheduleAggregationWorker,
MergeWorker,
ProcessCommitWorker
].map { |worker| described_class::Worker.new(worker, ee: false) }
allow(described_class).to receive(:workers).and_return(workers)
end
let(:expected_queues) do
[
['merge', 5],
['process_commit', 3],
['update_namespace_statistics', 1]
]
end
it 'returns true if the YAML file does not match the application code' do
allow(File).to receive(:read)
.with(described_class::SIDEKIQ_QUEUES_PATH)
.and_return(YAML.dump(queues: expected_queues.reverse))
expect(described_class.sidekiq_queues_yml_outdated?).to be(true)
end
it 'returns false if the YAML file matches the application code' do
allow(File).to receive(:read)
.with(described_class::SIDEKIQ_QUEUES_PATH)
.and_return(YAML.dump(queues: expected_queues))
expect(described_class.sidekiq_queues_yml_outdated?).to be(false)
end
end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment