Commit e9368df2 authored by Grzegorz Bizon's avatar Grzegorz Bizon

Implement pending builds queue builder in a separate class

Changelog: performance
parent adfc1cdb
......@@ -8,6 +8,7 @@ module Ci
belongs_to :build, class_name: 'Ci::Build'
scope :ref_protected, -> { where(protected: true) }
scope :queued_before, ->(time) { where(arel_table[:created_at].lt(time)) }
def self.upsert_from_build!(build)
entry = self.new(build: build, project: build.project, protected: build.protected?)
......
......@@ -85,21 +85,25 @@ class CommitStatus < ApplicationRecord
merge(or_conditions)
end
scope :matches_tag_ids, ->(tag_ids) do
##
# The temporary `on:` argument has been introduced to make it possible to
# reuse these scopes as a subquery for a different table (ci_pending_builds).
#
scope :matches_tag_ids, ->(tag_ids, on: 'ci_builds.id') do
matcher = ::ActsAsTaggableOn::Tagging
.where(taggable_type: CommitStatus.name)
.where(context: 'tags')
.where('taggable_id = ci_builds.id')
.where("taggable_id = #{on}")
.where.not(tag_id: tag_ids).select('1')
where("NOT EXISTS (?)", matcher)
end
scope :with_any_tags, -> do
scope :with_any_tags, ->(on: 'ci_builds.id') do
matcher = ::ActsAsTaggableOn::Tagging
.where(taggable_type: CommitStatus.name)
.where(context: 'tags')
.where('taggable_id = ci_builds.id').select('1')
.where("taggable_id = #{on}").select('1')
where("EXISTS (?)", matcher)
end
......
......@@ -103,35 +103,34 @@ module Ci
# rubocop: disable CodeReuse/ActiveRecord
def each_build(params, &blk)
builds =
queue = ::Gitlab::Ci::Queue::Builder.new(runner)
if runner.instance_type?
builds_for_shared_runner
queue.builds_for_shared_runner
elsif runner.group_type?
builds_for_group_runner
queue.builds_for_group_runner
else
builds_for_project_runner
queue.builds_for_project_runner
end
# pick builds that does not have other tags than runner's one
builds = builds.merge(::CommitStatus.matches_tag_ids(runner.tags.ids))
queue.builds_matching_tag_ids(runner.tags.ids)
# pick builds that have at least one tag
unless runner.run_untagged?
builds = builds.merge(::CommitStatus.with_any_tags)
queue.builds_with_any_tags
end
# pick builds that older than specified age
if params.key?(:job_age)
builds = builds.queued_before(params[:job_age].seconds.ago)
queue.builds_queued_before(params[:job_age].seconds.ago)
end
build_ids = retrieve_queue(-> { builds.map(&:id) })
build_ids = retrieve_queue(-> { queue.build_ids })
@metrics.observe_queue_size(-> { build_ids.size }, @runner.runner_type)
build_ids.each do |build_id|
yield Ci::Build.find(build_id)
end
build_ids.each { |build_id| yield Ci::Build.find(build_id) }
end
# rubocop: enable CodeReuse/ActiveRecord
......@@ -259,82 +258,6 @@ module Ci
)
end
# rubocop: disable CodeReuse/ActiveRecord
def builds_for_shared_runner
relation = new_builds
# don't run projects which have not enabled shared runners and builds
.joins('INNER JOIN projects ON ci_builds.project_id = projects.id')
.where(projects: { shared_runners_enabled: true, pending_delete: false })
.joins('LEFT JOIN project_features ON ci_builds.project_id = project_features.project_id')
.where('project_features.builds_access_level IS NULL or project_features.builds_access_level > 0')
if Feature.enabled?(:ci_queueing_disaster_recovery, runner, type: :ops, default_enabled: :yaml)
# if disaster recovery is enabled, we fallback to FIFO scheduling
relation.order('ci_builds.id ASC')
else
# Implement fair scheduling
# this returns builds that are ordered by number of running builds
# we prefer projects that don't use shared runners at all
relation
.joins("LEFT JOIN (#{running_builds_for_shared_runners.to_sql}) AS project_builds ON ci_builds.project_id=project_builds.project_id")
.order(Arel.sql('COALESCE(project_builds.running_builds, 0) ASC'), 'ci_builds.id ASC')
end
end
def new_builds_in_projects(scope)
new_builds.where("project_id IN (#{scope.select(:id).to_sql})").order('id ASC')
end
def builds_for_project_runner
new_builds_in_projects(runner.projects.without_deleted.with_builds_enabled)
end
def builds_for_group_runner
# Workaround for weird Rails bug, that makes `runner.groups.to_sql` to return `runner_id = NULL`
groups = ::Group.joins(:runner_namespaces).merge(runner.runner_namespaces)
hierarchy_groups = Gitlab::ObjectHierarchy.new(groups, options: { use_distinct: Feature.enabled?(:use_distinct_in_register_job_object_hierarchy) }).base_and_descendants
projects = Project.where(namespace_id: hierarchy_groups)
.with_group_runners_enabled
.with_builds_enabled
.without_deleted
new_builds_in_projects(projects)
end
def all_running_builds
if Feature.enabled?(:ci_pending_builds_queue_source, runner, default_enabled: :yaml)
Ci::RunningBuild.all
else
Ci::Build.running
end
end
def all_builds
if Feature.enabled?(:ci_pending_builds_queue_source, runner, default_enabled: :yaml)
Ci::PendingBuild.select('build_id AS id').from('ci_pending_builds AS ci_builds')
else
Ci::Build.select('id').pending.unstarted
end
end
# rubocop: enable CodeReuse/ActiveRecord
def running_builds_for_shared_runners
all_running_builds
.where(runner: Ci::Runner.instance_type)
.group(:project_id)
.select(:project_id, 'count(*) AS running_builds')
end
def new_builds
if runner.ref_protected?
all_builds.where('protected = true')
# all_builds.ref_protected
else
all_builds
end
end
def pre_assign_runner_checks
{
missing_dependency_failure: -> (build, _) { !build.has_valid_build_dependencies? },
......
# frozen_string_literal: true
module Gitlab
module Ci
module Queue
class Builder < SimpleDelegator
def initialize(runner)
if ::Feature.enabled?(:ci_pending_builds_queue_source, runner, default_enabled: :yaml)
super(PendingBuildsTableStrategy.new(runner))
else
super(BuildsTableStrategy.new(runner))
end
end
# rubocop:disable CodeReuse/ActiveRecord
class BuildsTableStrategy
attr_reader :runner
def initialize(runner)
@runner = runner
@relation = new_builds
end
def builds_for_shared_runner
@relation = new_builds
# don't run projects which have not enabled shared runners and builds
.joins('INNER JOIN projects ON ci_builds.project_id = projects.id')
.where(projects: { shared_runners_enabled: true, pending_delete: false })
.joins('LEFT JOIN project_features ON ci_builds.project_id = project_features.project_id')
.where('project_features.builds_access_level IS NULL or project_features.builds_access_level > 0')
@relation = begin
if Feature.enabled?(:ci_queueing_disaster_recovery, runner, type: :ops, default_enabled: :yaml)
# if disaster recovery is enabled, we fallback to FIFO scheduling
@relation.order('ci_builds.id ASC')
else
# Implement fair scheduling
# this returns builds that are ordered by number of running builds
# we prefer projects that don't use shared runners at all
relation
.joins("LEFT JOIN (#{running_builds_for_shared_runners.to_sql}) AS project_builds ON ci_builds.project_id=project_builds.project_id")
.order(Arel.sql('COALESCE(project_builds.running_builds, 0) ASC'), 'ci_builds.id ASC')
end
end
end
def builds_for_project_runner
@relation = new_builds.where(project: runner.projects.without_deleted.with_builds_enabled).order('id ASC')
end
def builds_for_group_runner
# Workaround for weird Rails bug, that makes `runner.groups.to_sql` to return `runner_id = NULL`
groups = ::Group.joins(:runner_namespaces).merge(runner.runner_namespaces)
hierarchy_groups = Gitlab::ObjectHierarchy
.new(groups, options: { use_distinct: ::Feature.enabled?(:use_distinct_in_register_job_object_hierarchy) })
.base_and_descendants
projects = Project.where(namespace_id: hierarchy_groups)
.with_group_runners_enabled
.with_builds_enabled
.without_deleted
@relation = new_builds.where(project: projects).order('id ASC')
end
def builds_matching_tag_ids(ids)
# pick builds that does not have other tags than runner's one
@relation = @relation.matches_tag_ids(ids)
end
def builds_with_any_tags
# pick builds that have at least one tag
@relation = @relation.with_any_tags
end
def builds_queued_before(time)
@relation = @relation.queued_before(time)
end
def build_ids
@relation.pluck(:id)
end
private
def all_builds
::Ci::Build.pending.unstarted
end
def new_builds
if runner.ref_protected?
all_builds.ref_protected
else
all_builds
end
end
def running_builds_for_shared_runners
::Ci::Build.running
.where(runner: ::Ci::Runner.instance_type)
.group(:project_id)
.select(:project_id, 'count(*) AS running_builds')
end
end
class PendingBuildsTableStrategy
attr_reader :runner
def initialize(runner)
@runner = runner
@relation = new_builds
end
def builds_for_shared_runner
@relation = new_builds
# don't run projects which have not enabled shared runners and builds
.joins('INNER JOIN projects ON ci_pending_builds.project_id = projects.id')
.where(projects: { shared_runners_enabled: true, pending_delete: false })
.joins('LEFT JOIN project_features ON ci_pending_builds.project_id = project_features.project_id')
.where('project_features.builds_access_level IS NULL or project_features.builds_access_level > 0')
@relation = begin
if Feature.enabled?(:ci_queueing_disaster_recovery, runner, type: :ops, default_enabled: :yaml)
# if disaster recovery is enabled, we fallback to FIFO scheduling
@relation.order('ci_pending_builds.build_id ASC')
else
# Implement fair scheduling
# this returns builds that are ordered by number of running builds
# we prefer projects that don't use shared runners at all
@relation
.joins("LEFT JOIN (#{running_builds_for_shared_runners.to_sql}) AS project_builds ON ci_pending_builds.project_id=project_builds.project_id")
.order(Arel.sql('COALESCE(project_builds.running_builds, 0) ASC'), 'ci_pending_builds.build_id ASC')
end
end
end
def builds_for_project_runner
@relation = new_builds
.where(project: runner.projects.without_deleted.with_builds_enabled)
.order('build_id ASC')
end
def builds_for_group_runner
# Workaround for weird Rails bug, that makes `runner.groups.to_sql` to return `runner_id = NULL`
groups = ::Group.joins(:runner_namespaces).merge(runner.runner_namespaces)
hierarchy_groups = Gitlab::ObjectHierarchy
.new(groups, options: { use_distinct: ::Feature.enabled?(:use_distinct_in_register_job_object_hierarchy) })
.base_and_descendants
projects = Project.where(namespace_id: hierarchy_groups)
.with_group_runners_enabled
.with_builds_enabled
.without_deleted
@relation = new_builds.where(project: projects).order('build_id ASC')
end
def builds_matching_tag_ids(ids)
@relation = @relation.merge(CommitStatus.matches_tag_ids(ids, on: 'ci_pending_builds.build_id'))
end
def builds_with_any_tags
@relation = @relation.merge(CommitStatus.with_any_tags(on: 'ci_pending_builds.build_id'))
end
def builds_queued_before(time)
@relation = @relation.queued_before(time)
end
def build_ids
@relation.pluck(:build_id)
end
private
def all_builds
::Ci::PendingBuild.all
end
def new_builds
if runner.ref_protected?
all_builds.ref_protected
else
all_builds
end
end
def running_builds_for_shared_runners
::Ci::RunningBuild
.where(runner: ::Ci::Runner.instance_type)
.group(:project_id)
.select(:project_id, 'count(*) AS running_builds')
end
# rubocop:enable CodeReuse/ActiveRecord
end
end
end
end
end
......@@ -297,7 +297,13 @@ RSpec.describe API::Ci::Runner, :clean_gitlab_redis_shared_state do
end
context 'when job filtered by job_age' do
let!(:job) { create(:ci_build, :pending, :queued, :tag, pipeline: pipeline, name: 'spinach', stage: 'test', stage_idx: 0, queued_at: 60.seconds.ago) }
let!(:job) do
create(:ci_build, :pending, :queued, :tag, pipeline: pipeline, name: 'spinach', stage: 'test', stage_idx: 0, queued_at: 60.seconds.ago)
end
before do
job.queuing_entry&.update(created_at: 60.seconds.ago)
end
context 'job is queued less than job_age parameter' do
let(:job_age) { 120 }
......
......@@ -269,25 +269,27 @@ module Ci
let!(:unrelated_group_runner) { create(:ci_runner, :group, groups: [unrelated_group]) }
it 'does not consider builds from other group runners' do
expect(described_class.new(group_runner).send(:builds_for_group_runner).size).to eq 6
queue = ::Gitlab::Ci::Queue::Builder
expect(queue.new(group_runner).builds_for_group_runner.size).to eq 6
execute(group_runner)
expect(described_class.new(group_runner).send(:builds_for_group_runner).size).to eq 5
expect(queue.new(group_runner).builds_for_group_runner.size).to eq 5
execute(group_runner)
expect(described_class.new(group_runner).send(:builds_for_group_runner).size).to eq 4
expect(queue.new(group_runner).builds_for_group_runner.size).to eq 4
execute(group_runner)
expect(described_class.new(group_runner).send(:builds_for_group_runner).size).to eq 3
expect(queue.new(group_runner).builds_for_group_runner.size).to eq 3
execute(group_runner)
expect(described_class.new(group_runner).send(:builds_for_group_runner).size).to eq 2
expect(queue.new(group_runner).builds_for_group_runner.size).to eq 2
execute(group_runner)
expect(described_class.new(group_runner).send(:builds_for_group_runner).size).to eq 1
expect(queue.new(group_runner).builds_for_group_runner.size).to eq 1
execute(group_runner)
expect(described_class.new(group_runner).send(:builds_for_group_runner).size).to eq 0
expect(queue.new(group_runner).builds_for_group_runner.size).to eq 0
expect(execute(group_runner)).to be_nil
end
end
......@@ -299,7 +301,9 @@ module Ci
end
it 'calls DISTINCT' do
expect(described_class.new(group_runner).send(:builds_for_group_runner).to_sql).to include("DISTINCT")
queue = ::Gitlab::Ci::Queue::Builder.new(group_runner)
expect(queue.builds_for_group_runner.to_sql).to include("DISTINCT")
end
end
......@@ -310,7 +314,9 @@ module Ci
end
it 'does not call DISTINCT' do
expect(described_class.new(group_runner).send(:builds_for_group_runner).to_sql).not_to include("DISTINCT")
queue = ::Gitlab::Ci::Queue::Builder.new(group_runner)
expect(queue.builds_for_group_runner.to_sql).not_to include("DISTINCT")
end
end
......@@ -349,8 +355,9 @@ module Ci
let!(:other_build) { create(:ci_build, :pending, :queued, pipeline: pipeline) }
before do
allow_any_instance_of(Ci::RegisterJobService).to receive(:builds_for_project_runner)
.and_return(Ci::Build.where(id: [pending_job, other_build]))
allow_any_instance_of(::Gitlab::Ci::Queue::Builder)
.to receive(:build_ids)
.and_return(Ci::Build.where(id: [pending_job, other_build]).pluck(:id))
end
it "receives second build from the queue" do
......@@ -361,8 +368,9 @@ module Ci
context 'when single build is in queue' do
before do
allow_any_instance_of(Ci::RegisterJobService).to receive(:builds_for_project_runner)
.and_return(Ci::Build.where(id: pending_job))
allow_any_instance_of(::Gitlab::Ci::Queue::Builder)
.to receive(:build_ids)
.and_return(Ci::Build.where(id: pending_job).pluck(:id))
end
it "does not receive any valid result" do
......@@ -372,8 +380,9 @@ module Ci
context 'when there is no build in queue' do
before do
allow_any_instance_of(Ci::RegisterJobService).to receive(:builds_for_project_runner)
.and_return(Ci::Build.none)
allow_any_instance_of(::Gitlab::Ci::Queue::Builder)
.to receive(:build_ids)
.and_return([])
end
it "does not receive builds but result is valid" do
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment