Commit 3168331b authored by Marius Bobin's avatar Marius Bobin

Introduce Ci::PipelineCreation::StartPipelineService class

Introduce Ci::PipelineCreation::StartPipelineService to drop not
runnable builds.
parent bff3a296
...@@ -1246,6 +1246,10 @@ module Ci ...@@ -1246,6 +1246,10 @@ module Ci
end end
end end
def build_matchers
self.builds.build_matchers(project)
end
private private
def add_message(severity, content) def add_message(severity, content)
......
...@@ -433,13 +433,7 @@ module Ci ...@@ -433,13 +433,7 @@ module Ci
end end
def matches_build?(build) def matches_build?(build)
return false if self.ref_protected? && !build.protected? runner_matcher.matches?(build.build_matcher)
accepting_tags?(build)
end
def accepting_tags?(build)
(run_untagged? || build.has_tags?) && (build.tag_list - tag_list).empty?
end end
end end
end end
......
...@@ -174,8 +174,11 @@ class CommitStatus < ApplicationRecord ...@@ -174,8 +174,11 @@ class CommitStatus < ApplicationRecord
next if commit_status.processed? next if commit_status.processed?
next unless commit_status.project next unless commit_status.project
last_arg = transition.args.last
transition_options = last_arg.is_a?(Hash) && last_arg.extractable_options? ? last_arg : {}
commit_status.run_after_commit do commit_status.run_after_commit do
PipelineProcessWorker.perform_async(pipeline_id) PipelineProcessWorker.perform_async(pipeline_id) unless transition_options[:skip_pipeline_processing]
ExpireJobCacheWorker.perform_async(id) ExpireJobCacheWorker.perform_async(id)
end end
end end
......
...@@ -24,6 +24,7 @@ module Enums ...@@ -24,6 +24,7 @@ module Enums
project_deleted: 15, project_deleted: 15,
ci_quota_exceeded: 16, ci_quota_exceeded: 16,
pipeline_loop_detected: 17, pipeline_loop_detected: 17,
no_matching_runner: 18,
insufficient_bridge_permissions: 1_001, insufficient_bridge_permissions: 1_001,
downstream_bridge_project_not_found: 1_002, downstream_bridge_project_not_found: 1_002,
invalid_bridge_trigger: 1_003, invalid_bridge_trigger: 1_003,
......
...@@ -25,7 +25,8 @@ class CommitStatusPresenter < Gitlab::View::Presenter::Delegated ...@@ -25,7 +25,8 @@ class CommitStatusPresenter < Gitlab::View::Presenter::Delegated
reached_max_descendant_pipelines_depth: 'You reached the maximum depth of child pipelines', reached_max_descendant_pipelines_depth: 'You reached the maximum depth of child pipelines',
project_deleted: 'The job belongs to a deleted project', project_deleted: 'The job belongs to a deleted project',
user_blocked: 'The user who created this job is blocked', user_blocked: 'The user who created this job is blocked',
ci_quota_exceeded: 'No more CI minutes available' ci_quota_exceeded: 'No more CI minutes available',
no_matching_runner: 'No matching runner available'
}.freeze }.freeze
private_constant :CALLOUT_FAILURE_MESSAGES private_constant :CALLOUT_FAILURE_MESSAGES
......
# frozen_string_literal: true
module Ci
module PipelineCreation
class DropNotRunnableBuildsService
include Gitlab::Utils::StrongMemoize
def initialize(pipeline)
@pipeline = pipeline
end
##
# We want to run this service exactly once,
# before the first pipeline processing call
#
def execute
return unless ::Feature.enabled?(:ci_drop_new_builds_when_ci_quota_exceeded, project, default_enabled: :yaml)
return unless pipeline.created?
load_runners
validate_build_matchers
end
private
attr_reader :pipeline
attr_reader :instance_runners, :private_runners
delegate :project, to: :pipeline
def load_runners
@instance_runners, @private_runners = project
.all_runners
.runner_matchers
.partition(&:instance_type?)
end
def validate_build_matchers
pipeline.build_matchers.each do |build_matcher|
failure_reason = validate_build_matcher(build_matcher)
next unless failure_reason
drop_all_builds(build_matcher.build_ids, failure_reason)
end
end
def validate_build_matcher(build_matcher)
return if matching_private_runners?(build_matcher)
return if matching_instance_runners_available?(build_matcher)
matching_failure_reason(build_matcher)
end
##
# We skip pipeline processing until we drop all required builds. Otherwise
# as we drop the first build, the remaining builds to be dropped could
# transition to other states by `PipelineProcessWorker` running async.
#
def drop_all_builds(build_ids, failure_reason)
pipeline.builds.id_in(build_ids).each do |build|
build.drop(failure_reason, skip_pipeline_processing: true)
end
end
def matching_private_runners?(build_matcher)
private_runners
.find { |matcher| matcher.matches?(build_matcher) }
.present?
end
# Overridden in EE to include more conditions
def matching_instance_runners_available?(build_matcher)
matching_instance_runners?(build_matcher)
end
def matching_instance_runners?(build_matcher)
instance_runners
.find { |matcher| matcher.matches?(build_matcher) }
.present?
end
# Overridden in EE
def matching_failure_reason(build_matcher)
:no_matching_runner
end
end
end
end
Ci::PipelineCreation::DropNotRunnableBuildsService.prepend_mod_with('Ci::PipelineCreation::DropNotRunnableBuildsService')
# frozen_string_literal: true
module Ci
module PipelineCreation
class StartPipelineService
attr_reader :pipeline
def initialize(pipeline)
@pipeline = pipeline
end
def execute
DropNotRunnableBuildsService.new(pipeline).execute
Ci::ProcessPipelineService.new(pipeline).execute
end
end
end
end
...@@ -15,7 +15,7 @@ module Ci ...@@ -15,7 +15,7 @@ module Ci
def perform(pipeline_id) def perform(pipeline_id)
Ci::Pipeline.find_by_id(pipeline_id).try do |pipeline| Ci::Pipeline.find_by_id(pipeline_id).try do |pipeline|
Ci::ProcessPipelineService Ci::PipelineCreation::StartPipelineService
.new(pipeline) .new(pipeline)
.execute .execute
end end
......
---
name: ci_drop_new_builds_when_ci_quota_exceeded
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/61166
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/326709
milestone: '14.0'
type: development
group: group::continuous integration
default_enabled: false
# frozen_string_literal: true
module EE
module Ci
module PipelineCreation
module DropNotRunnableBuildsService
extend ::Gitlab::Utils::Override
private
override :matching_instance_runners_available?
def matching_instance_runners_available?(build_matcher)
instance_runners
.find { |matcher| matcher.matches?(build_matcher) && matcher.matches_quota?(build_matcher) }
.present?
end
override :matching_failure_reason
def matching_failure_reason(build_matcher)
if matching_instance_runners?(build_matcher)
:ci_quota_exceeded
else
:no_matching_runner
end
end
end
end
end
end
...@@ -8,6 +8,7 @@ RSpec.describe 'Two merge requests on a merge train' do ...@@ -8,6 +8,7 @@ RSpec.describe 'Two merge requests on a merge train' do
let(:project) { create(:project, :repository) } let(:project) { create(:project, :repository) }
let_it_be(:maintainer_1) { create(:user) } let_it_be(:maintainer_1) { create(:user) }
let_it_be(:maintainer_2) { create(:user) } let_it_be(:maintainer_2) { create(:user) }
let_it_be(:runner) { create(:ci_runner) }
let(:merge_request_1) do let(:merge_request_1) do
create(:merge_request, create(:merge_request,
......
...@@ -64,6 +64,8 @@ RSpec.describe 'User adds a merge request to a merge train', :js do ...@@ -64,6 +64,8 @@ RSpec.describe 'User adds a merge request to a merge train', :js do
end end
context 'when pipeline for merge train succeeds', :sidekiq_might_not_need_inline do context 'when pipeline for merge train succeeds', :sidekiq_might_not_need_inline do
let_it_be(:runner) { create(:ci_runner) }
before do before do
visit project_merge_request_path(project, merge_request) visit project_merge_request_path(project, merge_request)
merge_request.merge_train.pipeline.builds.map(&:success!) merge_request.merge_train.pipeline.builds.map(&:success!)
......
...@@ -75,6 +75,7 @@ RSpec.describe Ci::CreatePipelineService do ...@@ -75,6 +75,7 @@ RSpec.describe Ci::CreatePipelineService do
end end
it 'creates a pipeline with regular_job and bridge_dag_job pending' do it 'creates a pipeline with regular_job and bridge_dag_job pending' do
create(:ci_runner)
pipeline = create_pipeline! pipeline = create_pipeline!
processables = pipeline.processables processables = pipeline.processables
Ci::InitialPipelineProcessWorker.new.perform(pipeline.id) Ci::InitialPipelineProcessWorker.new.perform(pipeline.id)
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Ci::PipelineCreation::DropNotRunnableBuildsService do
let_it_be_with_reload(:pipeline) do
create(:ci_pipeline, status: :created)
end
let_it_be_with_reload(:job) do
create(:ci_build, project: pipeline.project, pipeline: pipeline)
end
let_it_be(:instance_runner) do
create(:ci_runner,
runner_type: :instance_type,
public_projects_minutes_cost_factor: 0,
private_projects_minutes_cost_factor: 1)
end
describe '#execute' do
subject(:execute) { described_class.new(pipeline).execute }
context 'with public projects' do
before do
pipeline.project.update!(visibility_level: ::Gitlab::VisibilityLevel::PUBLIC)
end
context 'with available CI quota' do
it 'does not drop the jobs' do
expect { execute }.not_to change { job.reload.status }
end
end
context 'when the Ci quota is exceeded' do
before do
allow(pipeline.project).to receive(:ci_minutes_quota)
.and_return(double('quota', minutes_used_up?: true))
end
it 'does not drop the jobs' do
expect { execute }.not_to change { job.reload.status }
end
end
end
context 'with internal projects' do
before do
pipeline.project.update!(visibility_level: ::Gitlab::VisibilityLevel::INTERNAL)
end
context 'with available CI quota' do
it 'does not drop the jobs' do
expect { execute }.not_to change { job.reload.status }
end
end
context 'when the Ci quota is exceeded' do
before do
expect(pipeline.project).to receive(:ci_minutes_quota)
.and_return(double('quota', minutes_used_up?: true))
end
it 'drops the job' do
execute
job.reload
expect(job).to be_failed
expect(job.failure_reason).to eq('ci_quota_exceeded')
end
end
end
context 'with private projects' do
before do
pipeline.project.update!(visibility_level: ::Gitlab::VisibilityLevel::PRIVATE)
end
context 'with available CI quota' do
it 'does not drop the jobs' do
expect { execute }.not_to change { job.reload.status }
end
end
context 'when the Ci quota is exceeded' do
before do
expect(pipeline.project).to receive(:ci_minutes_quota)
.and_return(double('quota', minutes_used_up?: true))
end
it 'drops the job' do
execute
job.reload
expect(job).to be_failed
expect(job.failure_reason).to eq('ci_quota_exceeded')
end
end
end
end
end
...@@ -29,6 +29,11 @@ RSpec.describe Ci::ProcessPipelineService, '#execute' do ...@@ -29,6 +29,11 @@ RSpec.describe Ci::ProcessPipelineService, '#execute' do
stub_ci_pipeline_to_return_yaml_file stub_ci_pipeline_to_return_yaml_file
end end
context 'when there is a runner available' do
let_it_be(:runner) do
create(:ci_runner, tag_list: %w[ruby postgres mysql])
end
it 'creates a downstream cross-project pipeline' do it 'creates a downstream cross-project pipeline' do
service.execute service.execute
Sidekiq::Worker.drain_all Sidekiq::Worker.drain_all
...@@ -48,6 +53,27 @@ RSpec.describe Ci::ProcessPipelineService, '#execute' do ...@@ -48,6 +53,27 @@ RSpec.describe Ci::ProcessPipelineService, '#execute' do
end end
end end
context 'with no runners' do
it 'creates a downstream cross-project pipeline' do
service.execute
Sidekiq::Worker.drain_all
expect_statuses(%w[test pending], %w[cross created], %w[deploy created])
update_build_status(:test, :success)
Sidekiq::Worker.drain_all
expect_statuses(%w[test success], %w[cross success], %w[deploy pending])
expect(downstream.ci_pipelines).to be_one
expect(downstream.ci_pipelines.first).to be_failed
expect(downstream.builds).not_to be_empty
expect(downstream.builds).to all be_failed
expect(downstream.builds.map(&:failure_reason)).to all eq('no_matching_runner')
end
end
end
def expect_statuses(*expected) def expect_statuses(*expected)
statuses = pipeline.statuses statuses = pipeline.statuses
.where(name: expected.map(&:first)) .where(name: expected.map(&:first))
......
...@@ -30,7 +30,8 @@ module Gitlab ...@@ -30,7 +30,8 @@ module Gitlab
reached_max_descendant_pipelines_depth: 'reached maximum depth of child pipelines', reached_max_descendant_pipelines_depth: 'reached maximum depth of child pipelines',
project_deleted: 'pipeline project was deleted', project_deleted: 'pipeline project was deleted',
user_blocked: 'pipeline user was blocked', user_blocked: 'pipeline user was blocked',
ci_quota_exceeded: 'no more CI minutes available' ci_quota_exceeded: 'no more CI minutes available',
no_matching_runner: 'no matching runner available'
}.freeze }.freeze
private_constant :REASONS private_constant :REASONS
......
...@@ -25,6 +25,8 @@ RSpec.describe 'Merge request > User sees pipelines triggered by merge request', ...@@ -25,6 +25,8 @@ RSpec.describe 'Merge request > User sees pipelines triggered by merge request',
} }
end end
let_it_be(:runner) { create(:ci_runner) }
before do before do
stub_application_setting(auto_devops_enabled: false) stub_application_setting(auto_devops_enabled: false)
stub_ci_pipeline_yaml_file(YAML.dump(config)) stub_ci_pipeline_yaml_file(YAML.dump(config))
......
...@@ -8,7 +8,7 @@ RSpec.describe Gitlab::Ci::Build::AutoRetry do ...@@ -8,7 +8,7 @@ RSpec.describe Gitlab::Ci::Build::AutoRetry do
describe '#allowed?' do describe '#allowed?' do
using RSpec::Parameterized::TableSyntax using RSpec::Parameterized::TableSyntax
let(:build) { create(:ci_build) } let(:build) { build_stubbed(:ci_build) }
subject { auto_retry.allowed? } subject { auto_retry.allowed? }
......
...@@ -4512,4 +4512,17 @@ RSpec.describe Ci::Pipeline, :mailer, factory_default: :keep do ...@@ -4512,4 +4512,17 @@ RSpec.describe Ci::Pipeline, :mailer, factory_default: :keep do
.not_to exceed_query_limit(control_count) .not_to exceed_query_limit(control_count)
end end
end end
describe '#build_matchers' do
let_it_be(:pipeline) { create(:ci_pipeline) }
let_it_be(:builds) { create_list(:ci_build, 2, pipeline: pipeline, project: pipeline.project) }
subject(:matchers) { pipeline.build_matchers }
it 'returns build matchers' do
expect(matchers.size).to eq(1)
expect(matchers).to all be_a(Gitlab::Ci::Matching::BuildMatcher)
expect(matchers.first.build_ids).to match_array(builds.map(&:id))
end
end
end end
...@@ -53,6 +53,8 @@ RSpec.describe Ci::CreatePipelineService, '#execute' do ...@@ -53,6 +53,8 @@ RSpec.describe Ci::CreatePipelineService, '#execute' do
end end
context 'when sidekiq processes the job', :sidekiq_inline do context 'when sidekiq processes the job', :sidekiq_inline do
let_it_be(:runner) { create(:ci_runner) }
it 'transitions to pending status and triggers a downstream pipeline' do it 'transitions to pending status and triggers a downstream pipeline' do
pipeline = create_pipeline! pipeline = create_pipeline!
......
...@@ -202,6 +202,11 @@ RSpec.describe Ci::CreatePipelineService do ...@@ -202,6 +202,11 @@ RSpec.describe Ci::CreatePipelineService do
YAML YAML
end end
context 'when there are runners matching the builds' do
before do
create(:ci_runner)
end
it 'creates a pipeline with build_a and test_b pending; deploy_b manual', :sidekiq_inline do it 'creates a pipeline with build_a and test_b pending; deploy_b manual', :sidekiq_inline do
processables = pipeline.processables processables = pipeline.processables
...@@ -211,7 +216,7 @@ RSpec.describe Ci::CreatePipelineService do ...@@ -211,7 +216,7 @@ RSpec.describe Ci::CreatePipelineService do
deploy_a = processables.find { |processable| processable.name == 'deploy_a' } deploy_a = processables.find { |processable| processable.name == 'deploy_a' }
deploy_b = processables.find { |processable| processable.name == 'deploy_b' } deploy_b = processables.find { |processable| processable.name == 'deploy_b' }
expect(pipeline).to be_persisted expect(pipeline).to be_created_successfully
expect(build_a.status).to eq('pending') expect(build_a.status).to eq('pending')
expect(test_a.status).to eq('created') expect(test_a.status).to eq('created')
expect(test_b.status).to eq('pending') expect(test_b.status).to eq('pending')
...@@ -220,6 +225,17 @@ RSpec.describe Ci::CreatePipelineService do ...@@ -220,6 +225,17 @@ RSpec.describe Ci::CreatePipelineService do
end end
end end
context 'when there are no runners matching the builds' do
it 'creates a pipeline with build_a and test_b pending; deploy_b manual', :sidekiq_inline do
processables = pipeline.processables
expect(pipeline).to be_created_successfully
expect(processables).to all be_failed
expect(processables.map(&:failure_reason)).to all eq('no_matching_runner')
end
end
end
context 'when needs is empty hash' do context 'when needs is empty hash' do
let(:config) do let(:config) do
<<~YAML <<~YAML
......
...@@ -7,6 +7,7 @@ RSpec.describe Ci::CreatePipelineService do ...@@ -7,6 +7,7 @@ RSpec.describe Ci::CreatePipelineService do
let_it_be(:project, reload: true) { create(:project, :repository) } let_it_be(:project, reload: true) { create(:project, :repository) }
let_it_be(:user, reload: true) { project.owner } let_it_be(:user, reload: true) { project.owner }
let_it_be(:runner) { create(:ci_runner, tag_list: %w[postgres mysql ruby]) }
let(:ref_name) { 'refs/heads/master' } let(:ref_name) { 'refs/heads/master' }
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Ci::PipelineCreation::DropNotRunnableBuildsService do
let_it_be(:group) { create(:group) }
let_it_be(:project) { create(:project, group: group) }
let_it_be_with_reload(:pipeline) do
create(:ci_pipeline, project: project, status: :created)
end
let_it_be_with_reload(:job) do
create(:ci_build, project: project, pipeline: pipeline)
end
describe '#execute' do
subject(:execute) { described_class.new(pipeline).execute }
context 'when the feature flag is disabled' do
before do
stub_feature_flags(ci_drop_new_builds_when_ci_quota_exceeded: false)
end
it 'does not drop the jobs' do
expect { execute }.not_to change { job.reload.status }
end
end
context 'when the pipeline status is not created' do
before do
pipeline.update!(status: :running)
end
it 'does not drop the jobs' do
expect { execute }.not_to change { job.reload.status }
end
end
context 'when there are no runners available' do
it 'drops the job' do
execute
job.reload
expect(job).to be_failed
expect(job.failure_reason).to eq('no_matching_runner')
end
end
context 'with project runners' do
let_it_be(:project_runner) do
create(:ci_runner, runner_type: :project_type, projects: [project])
end
it 'does not drop the jobs' do
expect { execute }.not_to change { job.reload.status }
end
end
context 'with group runners' do
let_it_be(:group_runner) do
create(:ci_runner, runner_type: :group_type, groups: [group])
end
it 'does not drop the jobs' do
expect { execute }.not_to change { job.reload.status }
end
end
context 'with instance runners' do
let_it_be(:instance_runner) do
create(:ci_runner, runner_type: :instance_type)
end
it 'does not drop the jobs' do
expect { execute }.not_to change { job.reload.status }
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Ci::PipelineCreation::StartPipelineService do
let(:pipeline) { build(:ci_pipeline) }
subject(:service) { described_class.new(pipeline) }
describe '#execute' do
it 'calls the pipeline runners matching validation service' do
expect(Ci::PipelineCreation::DropNotRunnableBuildsService)
.to receive(:new)
.with(pipeline)
.and_return(double('service', execute: true))
service.execute
end
it 'calls the pipeline process service' do
expect(Ci::ProcessPipelineService)
.to receive(:new)
.with(pipeline)
.and_return(double('service', execute: true))
service.execute
end
end
end
...@@ -859,6 +859,8 @@ RSpec.shared_examples 'Pipeline Processing Service' do ...@@ -859,6 +859,8 @@ RSpec.shared_examples 'Pipeline Processing Service' do
end end
context 'when a bridge job has parallel:matrix config', :sidekiq_inline do context 'when a bridge job has parallel:matrix config', :sidekiq_inline do
let_it_be(:runner) { create(:ci_runner) }
let(:parent_config) do let(:parent_config) do
<<-EOY <<-EOY
test: test:
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
RSpec.shared_context 'Pipeline Processing Service Tests With Yaml' do RSpec.shared_context 'Pipeline Processing Service Tests With Yaml' do
let_it_be(:project) { create(:project, :repository) } let_it_be(:project) { create(:project, :repository) }
let_it_be(:user) { project.owner } let_it_be(:user) { project.owner }
let_it_be(:runner) { create(:ci_runner) }
where(:test_file_path) do where(:test_file_path) do
Dir.glob(Rails.root.join('spec/services/ci/pipeline_processing/test_cases/*.yml')) Dir.glob(Rails.root.join('spec/services/ci/pipeline_processing/test_cases/*.yml'))
......
...@@ -4,11 +4,18 @@ require 'spec_helper' ...@@ -4,11 +4,18 @@ require 'spec_helper'
RSpec.describe Ci::InitialPipelineProcessWorker do RSpec.describe Ci::InitialPipelineProcessWorker do
describe '#perform' do describe '#perform' do
let_it_be(:pipeline) { create(:ci_pipeline, :with_job, status: :created) } let_it_be_with_reload(:pipeline) do
create(:ci_pipeline, :with_job, status: :created)
end
include_examples 'an idempotent worker' do include_examples 'an idempotent worker' do
let(:job_args) { pipeline.id } let(:job_args) { pipeline.id }
context 'when there are runners available' do
before do
create(:ci_runner)
end
it 'marks the pipeline as pending' do it 'marks the pipeline as pending' do
expect(pipeline).to be_created expect(pipeline).to be_created
...@@ -17,5 +24,14 @@ RSpec.describe Ci::InitialPipelineProcessWorker do ...@@ -17,5 +24,14 @@ RSpec.describe Ci::InitialPipelineProcessWorker do
expect(pipeline.reload).to be_pending expect(pipeline.reload).to be_pending
end end
end end
it 'marks the pipeline as failed' do
expect(pipeline).to be_created
subject
expect(pipeline.reload).to be_failed
end
end
end end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment