Commit ef0acf72 authored by Matthias Käppler's avatar Matthias Käppler Committed by Fabio Pitino

Batch-cancel user pipelines

This adds the ability to cancel user pipelines
in bulk, similarly to what we allow with project
pipelines. This is expected to be much faster
and less prone to N+1s when cancelling pipelines
due to a user block event.
parent 1bc23c39
......@@ -19,6 +19,7 @@ module Ci
has_many :bridges, foreign_key: :stage_id
scope :ordered, -> { order(position: :asc) }
scope :in_pipelines, ->(pipelines) { where(pipeline: pipelines) }
with_options unless: :importing? do
validates :project, presence: true
......
......@@ -350,7 +350,12 @@ class User < ApplicationRecord
# this state transition object in order to do a rollback.
# For this reason the tradeoff is to disable this cop.
after_transition any => :blocked do |user|
Ci::CancelUserPipelinesService.new.execute(user)
if Feature.enabled?(:abort_user_pipelines_on_block, user)
Ci::AbortPipelinesService.new.execute(user.pipelines)
else
Ci::CancelUserPipelinesService.new.execute(user)
end
Ci::DisableUserPipelineSchedulesService.new.execute(user)
end
# rubocop: enable CodeReuse/ServiceClass
......
# frozen_string_literal: true
module Ci
class AbortPipelinesService
# Danger: Cancels in bulk without callbacks
# Only for pipeline abandonment scenarios (examples: project delete, user block)
def execute(pipelines)
bulk_abort!(pipelines.cancelable, status: :canceled)
ServiceResponse.success(message: 'Pipelines canceled')
end
private
def bulk_abort!(pipelines, status:)
pipelines.each_batch(of: 100) do |pipeline_batch|
update_status_for(Ci::Stage, pipeline_batch, status)
update_status_for(CommitStatus, pipeline_batch, status)
pipeline_batch.update_all(status: status, finished_at: Time.current)
end
end
def update_status_for(klass, pipelines, status)
klass.in_pipelines(pipelines)
.cancelable
.in_batches(of: 150) # rubocop:disable Cop/InBatches
.update_all(status: status)
end
end
end
# frozen_string_literal: true
module Ci
class AbortProjectPipelinesService
# Danger: Cancels in bulk without callbacks
# Only for pipeline abandonment scenarios (current example: project delete)
def execute(project)
return unless Feature.enabled?(:abort_deleted_project_pipelines, default_enabled: :yaml)
pipelines = project.all_pipelines.cancelable
bulk_abort!(pipelines, status: :canceled)
ServiceResponse.success(message: 'Pipelines canceled')
end
private
def bulk_abort!(pipelines, status:)
pipelines.each_batch do |pipeline_batch|
CommitStatus.in_pipelines(pipeline_batch).in_batches.update_all(status: status) # rubocop: disable Cop/InBatches
pipeline_batch.update_all(status: status)
end
end
end
end
......@@ -27,7 +27,9 @@ module Projects
# Git data (e.g. a list of branch names).
flush_caches(project)
::Ci::AbortProjectPipelinesService.new.execute(project)
if Feature.enabled?(:abort_deleted_project_pipelines, default_enabled: :yaml)
::Ci::AbortPipelinesService.new.execute(project.all_pipelines)
end
Projects::UnlinkForkService.new(project, current_user).execute
......
---
title: Add index on ci_stages to speed up batch pipeline cancellation
merge_request: 56126
author:
type: performance
---
name: abort_user_pipelines_on_block
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/56126
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/324045
milestone: '13.10'
type: development
group: group::memory
default_enabled: false
# frozen_string_literal: true
class AddIndexCiStagesOnPipelineIdAndId < ActiveRecord::Migration[6.0]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
INDEX_NAME = 'index_ci_stages_on_pipeline_id_and_id'
disable_ddl_transaction!
def up
add_concurrent_index :ci_stages, %i[pipeline_id id], where: 'status IN (0, 1, 2, 8, 9, 10)', name: INDEX_NAME
end
def down
remove_concurrent_index_by_name :ci_stages, INDEX_NAME
end
end
c2e3f8f6f283d919d99b0acf970f663fef8ca30ef277116401549014fc99ae91
\ No newline at end of file
......@@ -22196,6 +22196,8 @@ CREATE UNIQUE INDEX index_ci_sources_projects_on_source_project_id_and_pipeline_
CREATE INDEX index_ci_stages_on_pipeline_id ON ci_stages USING btree (pipeline_id);
CREATE INDEX index_ci_stages_on_pipeline_id_and_id ON ci_stages USING btree (pipeline_id, id) WHERE (status = ANY (ARRAY[0, 1, 2, 8, 9, 10]));
CREATE UNIQUE INDEX index_ci_stages_on_pipeline_id_and_name ON ci_stages USING btree (pipeline_id, name);
CREATE INDEX index_ci_stages_on_pipeline_id_and_position ON ci_stages USING btree (pipeline_id, "position");
......@@ -1777,16 +1777,27 @@ RSpec.describe User do
context 'when user has running CI pipelines' do
let(:service) { double }
before do
pipeline = create(:ci_pipeline, :running, user: user)
create(:ci_build, :running, pipeline: pipeline)
context 'with abort_user_pipelines_on_block feature enabled' do
let(:pipelines) { build_list(:ci_pipeline, 3, :running) }
it 'aborts all running pipelines and related jobs' do
stub_feature_flags(abort_user_pipelines_on_block: true)
expect(user).to receive(:pipelines).and_return(pipelines)
expect(Ci::AbortPipelinesService).to receive(:new).and_return(service)
expect(service).to receive(:execute).with(pipelines)
user.block
end
end
it 'cancels all running pipelines and related jobs' do
expect(Ci::CancelUserPipelinesService).to receive(:new).and_return(service)
expect(service).to receive(:execute).with(user)
context 'with abort_user_pipelines_on_block feature disabled' do
it 'cancels all running pipelines and related jobs' do
stub_feature_flags(abort_user_pipelines_on_block: false)
expect(Ci::CancelUserPipelinesService).to receive(:new).and_return(service)
expect(service).to receive(:execute).with(user)
user.block
user.block
end
end
end
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Ci::AbortPipelinesService do
let_it_be(:user) { create(:user) }
let_it_be(:project) { create(:project, namespace: user.namespace) }
let_it_be(:cancelable_pipeline, reload: true) { create(:ci_pipeline, :running, project: project, user: user) }
let_it_be(:manual_pipeline, reload: true) { create(:ci_pipeline, status: :manual, project: project, user: user) } # not cancelable
let_it_be(:other_users_pipeline, reload: true) { create(:ci_pipeline, :running, project: project, user: create(:user)) } # not this user's pipeline
let_it_be(:cancelable_build, reload: true) { create(:ci_build, :running, pipeline: cancelable_pipeline) }
let_it_be(:non_cancelable_build, reload: true) { create(:ci_build, :success, pipeline: cancelable_pipeline) }
let_it_be(:cancelable_stage, reload: true) { create(:ci_stage_entity, name: 'stageA', status: :running, pipeline: cancelable_pipeline, project: project) }
let_it_be(:non_cancelable_stage, reload: true) { create(:ci_stage_entity, name: 'stageB', status: :success, pipeline: cancelable_pipeline, project: project) }
describe '#execute' do
def expect_correct_cancellations
expect(cancelable_pipeline.finished_at).not_to be_nil
expect(cancelable_pipeline).to be_canceled
expect(cancelable_pipeline.stages - [non_cancelable_stage]).to all(be_canceled)
expect(cancelable_build).to be_canceled
expect(manual_pipeline).not_to be_canceled
expect(non_cancelable_stage).not_to be_canceled
expect(non_cancelable_build).not_to be_canceled
end
context 'with project pipelines' do
it 'cancels all running pipelines and related jobs' do
expect(described_class.new.execute(project.all_pipelines)).to be_success
expect_correct_cancellations
expect(other_users_pipeline).to be_canceled
expect(other_users_pipeline.stages).to all(be_canceled)
end
it 'avoids N+1 queries' do
project_pipelines = project.all_pipelines
control_count = ActiveRecord::QueryRecorder.new { described_class.new.execute(project_pipelines) }.count
pipelines = create_list(:ci_pipeline, 5, :running, project: project)
create_list(:ci_build, 5, :running, pipeline: pipelines.first)
expect { described_class.new.execute(project_pipelines) }.not_to exceed_query_limit(control_count)
end
end
context 'with user pipelines' do
it 'cancels all running pipelines and related jobs' do
expect(described_class.new.execute(user.pipelines)).to be_success
expect_correct_cancellations
expect(other_users_pipeline).not_to be_canceled
end
it 'avoids N+1 queries' do
user_pipelines = user.pipelines
control_count = ActiveRecord::QueryRecorder.new { described_class.new.execute(user_pipelines) }.count
pipelines = create_list(:ci_pipeline, 5, :running, project: project, user: user)
create_list(:ci_build, 5, :running, pipeline: pipelines.first)
expect { described_class.new.execute(user_pipelines) }.not_to exceed_query_limit(control_count)
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Ci::AbortProjectPipelinesService do
let_it_be(:project) { create(:project) }
let_it_be(:pipeline) { create(:ci_pipeline, :running, project: project) }
let_it_be(:build) { create(:ci_build, :running, pipeline: pipeline) }
describe '#execute' do
it 'cancels all running pipelines and related jobs' do
result = described_class.new.execute(project)
expect(result).to be_success
expect(pipeline.reload).to be_canceled
expect(build.reload).to be_canceled
end
it 'avoids N+1 queries' do
control_count = ActiveRecord::QueryRecorder.new { described_class.new.execute(project) }.count
pipelines = create_list(:ci_pipeline, 5, :running, project: project)
create_list(:ci_build, 5, :running, pipeline: pipelines.first)
expect { described_class.new.execute(project) }.not_to exceed_query_limit(control_count)
end
end
context 'when feature disabled' do
before do
stub_feature_flags(abort_deleted_project_pipelines: false)
end
it 'does not abort the pipeline' do
result = described_class.new.execute(project)
expect(result).to be(nil)
expect(pipeline.reload).to be_running
expect(build.reload).to be_running
end
end
end
......@@ -93,10 +93,26 @@ RSpec.describe Projects::DestroyService, :aggregate_failures do
destroy_project(project, user, {})
end
it 'performs cancel for project ci pipelines' do
expect(::Ci::AbortProjectPipelinesService).to receive_message_chain(:new, :execute).with(project)
context 'with abort_deleted_project_pipelines feature disabled' do
it 'does not cancel project ci pipelines' do
stub_feature_flags(abort_deleted_project_pipelines: false)
destroy_project(project, user, {})
expect(::Ci::AbortPipelinesService).not_to receive(:new)
destroy_project(project, user, {})
end
end
context 'with abort_deleted_project_pipelines feature enabled' do
it 'performs cancel for project ci pipelines' do
stub_feature_flags(abort_deleted_project_pipelines: true)
pipelines = build_list(:ci_pipeline, 3, :running)
allow(project).to receive(:all_pipelines).and_return(pipelines)
expect(::Ci::AbortPipelinesService).to receive_message_chain(:new, :execute).with(pipelines)
destroy_project(project, user, {})
end
end
context 'when project has remote mirrors' do
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment