Commit 38e549b6 authored by Kamil Trzciński's avatar Kamil Trzciński Committed by Grzegorz Bizon

Implement AtomicProcessing service

This implements atomic processing service
that is able in bulk to update multiple builds,
stages and pipelines.

This uses in-memory structure to calculate statuses
and uses an efficient fetch of DAG dependencies
for efficient processing

This still performs some redundant operations,
but overall it should be significantly faster
processing pipelines of 1000 builds, with
multiple DAG dependencies, as it removes
a number of N+1 problems present in previous
implementation.

This also is order-independent, so it is able
to easily reschedule itself once needed.

This code is also concurrent safe, as if needed
the further processing will be re-scheduled
that will resolve any conflicts.
parent 8d416b7b
......@@ -447,10 +447,6 @@ module Ci
options_retry_when.include?('always')
end
def latest?
!retried?
end
def any_unmet_prerequisites?
prerequisites.present?
end
......
......@@ -515,7 +515,9 @@ module Ci
# rubocop: enable CodeReuse/ServiceClass
def mark_as_processable_after_stage(stage_idx)
builds.skipped.after_stage(stage_idx).find_each(&:process)
builds.skipped.after_stage(stage_idx).find_each do |build|
Gitlab::OptimisticLocking.retry_lock(build, &:process)
end
end
def latest?
......@@ -554,6 +556,13 @@ module Ci
end
end
def needs_processing?
statuses
.where(processed: [false, nil])
.latest
.exists?
end
# TODO: this logic is duplicate with Pipeline::Chain::Config::Content
# we should persist this is `ci_pipelines.config_path`
def config_path
......@@ -583,9 +592,8 @@ module Ci
project.notes.for_commit_id(sha)
end
def update_status
def set_status(new_status)
retry_optimistic_lock(self) do
new_status = latest_builds_status.to_s
case new_status
when 'created' then nil
when 'waiting_for_resource' then request_resource
......@@ -605,6 +613,10 @@ module Ci
end
end
def update_legacy_status
set_status(latest_builds_status.to_s)
end
def protected_ref?
strong_memoize(:protected_ref) { project.protected_for?(git_ref) }
end
......
......@@ -8,8 +8,26 @@ module Ci
scope :preload_needs, -> { preload(:needs) }
def self.select_with_aggregated_needs(project)
return all unless Feature.enabled?(:ci_dag_support, project, default_enabled: true)
aggregated_needs_names = Ci::BuildNeed
.scoped_build
.select("ARRAY_AGG(name)")
.to_sql
all.select(
'*',
"(#{aggregated_needs_names}) as aggregated_needs_names"
)
end
validates :type, presence: true
def aggregated_needs_names
read_attribute(:aggregated_needs_names)
end
def schedulable?
raise NotImplementedError
end
......
......@@ -13,9 +13,12 @@ module Ci
belongs_to :pipeline
has_many :statuses, class_name: 'CommitStatus', foreign_key: :stage_id
has_many :processables, class_name: 'Ci::Processable', foreign_key: :stage_id
has_many :builds, foreign_key: :stage_id
has_many :bridges, foreign_key: :stage_id
scope :ordered, -> { order(position: :asc) }
with_options unless: :importing? do
validates :project, presence: true
validates :pipeline, presence: true
......@@ -80,9 +83,8 @@ module Ci
end
end
def update_status
def set_status(new_status)
retry_optimistic_lock(self) do
new_status = latest_stage_status.to_s
case new_status
when 'created' then nil
when 'waiting_for_resource' then request_resource
......@@ -102,6 +104,10 @@ module Ci
end
end
def update_legacy_status
set_status(latest_stage_status.to_s)
end
def groups
@groups ||= Ci::Group.fabricate(self)
end
......
......@@ -40,6 +40,7 @@ class CommitStatus < ApplicationRecord
scope :latest, -> { where(retried: [false, nil]) }
scope :retried, -> { where(retried: true) }
scope :ordered, -> { order(:name) }
scope :ordered_by_stage, -> { order(stage_idx: :asc) }
scope :latest_ordered, -> { latest.ordered.includes(project: :namespace) }
scope :retried_ordered, -> { retried.ordered.includes(project: :namespace) }
scope :before_stage, -> (index) { where('stage_idx < ?', index) }
......@@ -57,6 +58,10 @@ class CommitStatus < ApplicationRecord
preload(:project, :user)
end
scope :with_project_preload, -> do
preload(project: :namespace)
end
scope :with_needs, -> (names = nil) do
needs = Ci::BuildNeed.scoped_build.select(1)
needs = needs.where(name: names) if names
......@@ -69,6 +74,15 @@ class CommitStatus < ApplicationRecord
where('NOT EXISTS (?)', needs)
end
scope :match_id_and_lock_version, -> (slice) do
# it expects that items are an array of attributes to match
# each hash needs to have `id` and `lock_version`
slice.inject(self) do |relation, item|
match = CommitStatus.where(item.slice(:id, :lock_version))
relation.or(match)
end
end
# We use `CommitStatusEnums.failure_reasons` here so that EE can more easily
# extend this `Hash` with new values.
enum_with_nil failure_reason: ::CommitStatusEnums.failure_reasons
......@@ -86,6 +100,16 @@ class CommitStatus < ApplicationRecord
# rubocop: enable CodeReuse/ServiceClass
end
before_save if: :status_changed?, unless: :importing? do
if Feature.disabled?(:ci_atomic_processing, project)
self.processed = nil
elsif latest?
self.processed = false # force refresh of all dependent ones
elsif retried?
self.processed = true # retried are considered to be already processed
end
end
state_machine :status do
event :process do
transition [:skipped, :manual] => :created
......@@ -136,19 +160,13 @@ class CommitStatus < ApplicationRecord
end
after_transition do |commit_status, transition|
next unless commit_status.project
next if transition.loopback?
next if commit_status.processed?
next unless commit_status.project
commit_status.run_after_commit do
if pipeline_id
if complete? || manual?
PipelineProcessWorker.perform_async(pipeline_id, [id])
else
PipelineUpdateWorker.perform_async(pipeline_id)
end
end
StageUpdateWorker.perform_async(stage_id)
schedule_stage_and_pipeline_update
ExpireJobCacheWorker.perform_async(id)
end
end
......@@ -177,6 +195,11 @@ class CommitStatus < ApplicationRecord
where(name: names).latest.slow_composite_status || 'success'
end
def self.update_as_processed!
# Marks items as processed, and increases `lock_version` (Optimisitc Locking)
update_all('processed=TRUE, lock_version=COALESCE(lock_version,0)+1')
end
def locking_enabled?
will_save_change_to_status?
end
......@@ -193,6 +216,10 @@ class CommitStatus < ApplicationRecord
calculate_duration
end
def latest?
!retried?
end
def playable?
false
end
......@@ -244,4 +271,21 @@ class CommitStatus < ApplicationRecord
v =~ /\d+/ ? v.to_i : v
end
end
private
def schedule_stage_and_pipeline_update
if Feature.enabled?(:ci_atomic_processing, project)
# Atomic Processing requires only single Worker
PipelineProcessWorker.perform_async(pipeline_id, [id])
else
if complete? || manual?
PipelineProcessWorker.perform_async(pipeline_id, [id])
else
PipelineUpdateWorker.perform_async(pipeline_id)
end
StageUpdateWorker.perform_async(stage_id)
end
end
end
# frozen_string_literal: true
module Ci
module PipelineProcessing
class AtomicProcessingService
include Gitlab::Utils::StrongMemoize
include ExclusiveLeaseGuard
attr_reader :pipeline
DEFAULT_LEASE_TIMEOUT = 1.minute
BATCH_SIZE = 20
def initialize(pipeline)
@pipeline = pipeline
@collection = AtomicProcessingService::StatusCollection.new(pipeline)
end
def execute
return unless pipeline.needs_processing?
success = try_obtain_lease { process! }
# re-schedule if we need further processing
if success && pipeline.needs_processing?
PipelineProcessWorker.perform_async(pipeline.id)
end
success
end
private
def process!
update_stages!
update_pipeline!
update_statuses_processed!
true
end
def update_stages!
pipeline.stages.ordered.each(&method(:update_stage!))
end
def update_stage!(stage)
# Update processables for a given stage in bulk/slices
ids = @collection.created_processable_ids_for_stage_position(stage.position)
ids.in_groups_of(BATCH_SIZE, false, &method(:update_processables!))
status = @collection.status_for_stage_position(stage.position)
stage.set_status(status)
end
def update_processables!(ids)
created_processables = pipeline.processables.for_ids(ids)
.with_project_preload
.created
.latest
.ordered_by_stage
.select_with_aggregated_needs(project)
created_processables.each(&method(:update_processable!))
end
def update_pipeline!
pipeline.set_status(@collection.status_of_all)
end
def update_statuses_processed!
processing = @collection.processing_processables
processing.each_slice(BATCH_SIZE) do |slice|
pipeline.statuses.match_id_and_lock_version(slice)
.update_as_processed!
end
end
def update_processable!(processable)
status = processable_status(processable)
return unless HasStatus::COMPLETED_STATUSES.include?(status)
# transition status if possible
Gitlab::OptimisticLocking.retry_lock(processable) do |subject|
Ci::ProcessBuildService.new(project, subject.user)
.execute(subject, status)
# update internal representation of status
# to make the status change of processable
# to be taken into account during further processing
@collection.set_processable_status(
processable.id, processable.status, processable.lock_version)
end
end
def processable_status(processable)
if needs_names = processable.aggregated_needs_names
# Processable uses DAG, get status of all dependent needs
@collection.status_for_names(needs_names)
else
# Processable uses Stages, get status of prior stage
@collection.status_for_prior_stage_position(processable.stage_idx.to_i)
end
end
def project
pipeline.project
end
def lease_key
"#{super}::pipeline_id:#{pipeline.id}"
end
def lease_timeout
DEFAULT_LEASE_TIMEOUT
end
end
end
end
# frozen_string_literal: true
module Ci
module PipelineProcessing
class AtomicProcessingService
class StatusCollection
include Gitlab::Utils::StrongMemoize
attr_reader :pipeline
# We use these columns to perform an efficient
# calculation of a status
STATUSES_COLUMNS = [
:id, :name, :status, :allow_failure,
:stage_idx, :processed, :lock_version
].freeze
def initialize(pipeline)
@pipeline = pipeline
@stage_statuses = {}
@prior_stage_statuses = {}
end
# This method updates internal status for given ID
def set_processable_status(id, status, lock_version)
processable = all_statuses_by_id[id]
return unless processable
processable[:status] = status
processable[:lock_version] = lock_version
end
# This methods gets composite status of all processables
def status_of_all
status_for_array(all_statuses)
end
# This methods gets composite status for processables with given names
def status_for_names(names)
name_statuses = all_statuses_by_name.slice(*names)
status_for_array(name_statuses.values)
end
# This methods gets composite status for processables before given stage
def status_for_prior_stage_position(position)
strong_memoize("status_for_prior_stage_position_#{position}") do
stage_statuses = all_statuses_grouped_by_stage_position
.select { |stage_position, _| stage_position < position }
status_for_array(stage_statuses.values.flatten)
end
end
# This methods gets a list of processables for a given stage
def created_processable_ids_for_stage_position(current_position)
all_statuses_grouped_by_stage_position[current_position]
.to_a
.select { |processable| processable[:status] == 'created' }
.map { |processable| processable[:id] }
end
# This methods gets composite status for processables at a given stage
def status_for_stage_position(current_position)
strong_memoize("status_for_stage_position_#{current_position}") do
stage_statuses = all_statuses_grouped_by_stage_position[current_position].to_a
status_for_array(stage_statuses.flatten)
end
end
# This method returns a list of all processable, that are to be processed
def processing_processables
all_statuses.lazy.reject { |status| status[:processed] }
end
private
def status_for_array(statuses)
result = Gitlab::Ci::Status::Composite
.new(statuses)
.status
result || 'success'
end
def all_statuses_grouped_by_stage_position
strong_memoize(:all_statuses_by_order) do
all_statuses.group_by { |status| status[:stage_idx].to_i }
end
end
def all_statuses_by_id
strong_memoize(:all_statuses_by_id) do
all_statuses.map do |row|
[row[:id], row]
end.to_h
end
end
def all_statuses_by_name
strong_memoize(:statuses_by_name) do
all_statuses.map do |row|
[row[:name], row]
end.to_h
end
end
# rubocop: disable CodeReuse/ActiveRecord
def all_statuses
# We fetch all relevant data in one go.
#
# This is more efficient than relying
# on PostgreSQL to calculate composite status
# for us
#
# Since we need to reprocess everything
# we can fetch all of them and do processing
# ourselves.
strong_memoize(:all_statuses) do
raw_statuses = pipeline
.statuses
.latest
.ordered_by_stage
.pluck(*STATUSES_COLUMNS)
raw_statuses.map do |row|
STATUSES_COLUMNS.zip(row).to_h
end
end
end
# rubocop: enable CodeReuse/ActiveRecord
end
end
end
end
......@@ -18,7 +18,7 @@ module Ci
# only when the another job has finished
success = process_builds_with_needs(trigger_build_ids) || success
@pipeline.update_status
@pipeline.update_legacy_status
success
end
......
......@@ -11,9 +11,15 @@ module Ci
def execute(trigger_build_ids = nil)
update_retried
Ci::PipelineProcessing::LegacyProcessingService
.new(pipeline)
.execute(trigger_build_ids)
if Feature.enabled?(:ci_atomic_processing, pipeline.project)
Ci::PipelineProcessing::AtomicProcessingService
.new(pipeline)
.execute
else
Ci::PipelineProcessing::LegacyProcessingService
.new(pipeline)
.execute(trigger_build_ids)
end
end
private
......
......@@ -11,7 +11,7 @@ module Ci
reprocess!(build).tap do |new_build|
build.pipeline.mark_as_processable_after_stage(build.stage_idx)
new_build.enqueue!
Gitlab::OptimisticLocking.retry_lock(new_build, &:enqueue)
MergeRequests::AddTodoWhenBuildFailsService
.new(project, current_user)
......@@ -31,15 +31,17 @@ module Ci
attributes.push([:user, current_user])
build.retried = true
Ci::Build.transaction do
# mark all other builds of that name as retried
build.pipeline.builds.latest
.where(name: build.name)
.update_all(retried: true)
.update_all(retried: true, processed: true)
create_build!(attributes)
create_build!(attributes).tap do
# mark existing object as retried/processed without a reload
build.retried = true
build.processed = true
end
end
end
# rubocop: enable CodeReuse/ActiveRecord
......@@ -49,6 +51,7 @@ module Ci
def create_build!(attributes)
build = project.builds.new(Hash[attributes])
build.deployment = ::Gitlab::Ci::Pipeline::Seed::Deployment.new(build).to_resource
build.retried = false
build.save!
build
end
......
......@@ -7,10 +7,7 @@ class PipelineUpdateWorker
queue_namespace :pipeline_processing
latency_sensitive_worker!
# rubocop: disable CodeReuse/ActiveRecord
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id)
.try(:update_status)
Ci::Pipeline.find_by_id(pipeline_id)&.update_legacy_status
end
# rubocop: enable CodeReuse/ActiveRecord
end
......@@ -7,11 +7,7 @@ class StageUpdateWorker
queue_namespace :pipeline_processing
latency_sensitive_worker!
# rubocop: disable CodeReuse/ActiveRecord
def perform(stage_id)
Ci::Stage.find_by(id: stage_id).try do |stage|
stage.update_status
end
Ci::Stage.find_by_id(stage_id)&.update_legacy_status
end
# rubocop: enable CodeReuse/ActiveRecord
end
---
title: Implement Atomic Processing that updates status of builds, stages and pipelines in one go
merge_request: 20229
author:
type: performance
......@@ -57,7 +57,7 @@ class Gitlab::Seeder::Pipelines
BUILDS.each { |opts| build_create!(pipeline, opts) }
EXTERNAL_JOBS.each { |opts| commit_status_create!(pipeline, opts) }
pipeline.update_duration
pipeline.update_status
pipeline.update_legacy_status
end
end
......
......@@ -187,7 +187,7 @@ class Gitlab::Seeder::CycleAnalytics
pipeline.builds.each(&:enqueue) # make sure all pipelines in pending state
pipeline.builds.each(&:run!)
pipeline.update_status
pipeline.update_legacy_status
end
end
......@@ -208,7 +208,7 @@ class Gitlab::Seeder::CycleAnalytics
job = merge_request.head_pipeline.builds.where.not(environment: nil).last
job.success!
job.pipeline.update_status
job.pipeline.update_legacy_status
end
end
end
......
# frozen_string_literal: true
class AddProcessedToCiBuilds < ActiveRecord::Migration[5.2]
DOWNTIME = false
def change
add_column :ci_builds, :processed, :boolean
end
end
......@@ -697,6 +697,7 @@ ActiveRecord::Schema.define(version: 2020_01_14_204949) do
t.integer "upstream_pipeline_id"
t.bigint "resource_group_id"
t.datetime_with_timezone "waiting_for_resource_at"
t.boolean "processed"
t.index ["artifacts_expire_at"], name: "index_ci_builds_on_artifacts_expire_at", where: "(artifacts_file <> ''::text)"
t.index ["auto_canceled_by_id"], name: "index_ci_builds_on_auto_canceled_by_id"
t.index ["commit_id", "artifacts_expire_at", "id"], name: "index_ci_builds_on_commit_id_and_artifacts_expireatandidpartial", where: "(((type)::text = 'Ci::Build'::text) AND ((retried = false) OR (retried IS NULL)) AND ((name)::text = ANY (ARRAY[('sast'::character varying)::text, ('dependency_scanning'::character varying)::text, ('sast:container'::character varying)::text, ('container_scanning'::character varying)::text, ('dast'::character varying)::text])))"
......
......@@ -19,41 +19,53 @@ describe Ci::ProcessPipelineService, '#execute' do
end
describe 'cross-project pipelines' do
before do
create_processable(:build, name: 'test', stage: 'test')
create_processable(:bridge, :variables, name: 'cross',
stage: 'build',
downstream: downstream)
create_processable(:build, name: 'deploy', stage: 'deploy')
stub_ci_pipeline_to_return_yaml_file
using RSpec::Parameterized::TableSyntax
where(:ci_atomic_processing) do
[true, false]
end
it 'creates a downstream cross-project pipeline', :sidekiq_might_not_need_inline do
service.execute
with_them do
before do
stub_feature_flags(ci_atomic_processing: ci_atomic_processing)
expect_statuses(%w[test pending], %w[cross created], %w[deploy created])
create_processable(:build, name: 'test', stage: 'test')
create_processable(:bridge, :variables, name: 'cross',
stage: 'build',
downstream: downstream)
create_processable(:build, name: 'deploy', stage: 'deploy')
update_build_status(:test, :success)
stub_ci_pipeline_to_return_yaml_file
end
expect_statuses(%w[test success], %w[cross success], %w[deploy pending])
it 'creates a downstream cross-project pipeline', :sidekiq do
service.execute
Sidekiq::Worker.drain_all
expect(downstream.ci_pipelines).to be_one
expect(downstream.ci_pipelines.first).to be_pending
expect(downstream.builds).not_to be_empty
expect(downstream.builds.first.variables)
.to include(key: 'BRIDGE', value: 'cross', public: false, masked: false)
end
end
expect_statuses(%w[test pending], %w[cross created], %w[deploy created])
update_build_status(:test, :success)
Sidekiq::Worker.drain_all
def expect_statuses(*statuses)
statuses.each do |name, status|
pipeline.statuses.find_by(name: name).yield_self do |build|
expect(build.status).to eq status
expect_statuses(%w[test success], %w[cross success], %w[deploy pending])
expect(downstream.ci_pipelines).to be_one
expect(downstream.ci_pipelines.first).to be_pending
expect(downstream.builds).not_to be_empty
expect(downstream.builds.first.variables)
.to include(key: 'BRIDGE', value: 'cross', public: false, masked: false)
end
end
end
def expect_statuses(*expected)
statuses = pipeline.statuses
.where(name: expected.map(&:first))
.pluck(:name, :status)
expect(statuses).to contain_exactly(*expected)
end
def update_build_status(name, status)
pipeline.builds.find_by(name: name).public_send(status)
end
......
......@@ -240,6 +240,7 @@ excluded_attributes:
- :upstream_pipeline_id
- :resource_group_id
- :waiting_for_resource_at
- :processed
sentry_issue:
- :issue_id
push_event_payload:
......
......@@ -63,7 +63,7 @@ describe 'test coverage badge' do
create(:ci_pipeline, opts).tap do |pipeline|
yield pipeline
pipeline.update_status
pipeline.update_legacy_status
end
end
......
......@@ -102,7 +102,7 @@ describe Gitlab::Badge::Coverage::Report do
create(:ci_pipeline, opts).tap do |pipeline|
yield pipeline
pipeline.update_status
pipeline.update_legacy_status
end
end
end
......@@ -216,6 +216,7 @@ stages:
- project
- pipeline
- statuses
- processables
- builds
- bridges
statuses:
......
......@@ -333,6 +333,7 @@ CommitStatus:
- scheduled_at
- upstream_pipeline_id
- interruptible
- processed
Ci::Variable:
- id
- project_id
......
......@@ -604,7 +604,7 @@ describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do
context 'when traces are archived' do
let(:subject) do
project.builds.each do |build|
build.success!
build.reset.success!
end
end
......
......@@ -1008,22 +1008,22 @@ describe Ci::Pipeline, :mailer do
end
end
describe '#duration', :sidekiq_might_not_need_inline do
describe '#duration', :sidekiq_inline do
context 'when multiple builds are finished' do
before do
travel_to(current + 30) do
build.run!
build.success!
build.reload.success!
build_b.run!
build_c.run!
end
travel_to(current + 40) do
build_b.drop!
build_b.reload.drop!
end
travel_to(current + 70) do
build_c.success!
build_c.reload.success!
end
end
......@@ -1044,7 +1044,7 @@ describe Ci::Pipeline, :mailer do
end
travel_to(current + 5.minutes) do
build.success!
build.reload.success!
end
end
......@@ -1585,6 +1585,30 @@ describe Ci::Pipeline, :mailer do
end
end
describe '#needs_processing?' do
using RSpec::Parameterized::TableSyntax
subject { pipeline.needs_processing? }
where(:processed, :result) do
nil | true
false | true
true | false
end
with_them do
let(:build) do
create(:ci_build, :success, pipeline: pipeline, name: 'rubocop')
end
before do
build.update_column(:processed, processed)
end
it { is_expected.to eq(result) }
end
end
shared_context 'with some outdated pipelines' do
before do
create_pipeline(:canceled, 'ref', 'A', project)
......@@ -1785,7 +1809,7 @@ describe Ci::Pipeline, :mailer do
it { is_expected.not_to include('created', 'waiting_for_resource', 'preparing', 'pending') }
end
describe '#status', :sidekiq_might_not_need_inline do
describe '#status', :sidekiq_inline do
let(:build) do
create(:ci_build, :created, pipeline: pipeline, name: 'test')
end
......@@ -1826,7 +1850,7 @@ describe Ci::Pipeline, :mailer do
context 'on run' do
before do
build.enqueue
build.run
build.reload.run
end
it { is_expected.to eq('running') }
......@@ -1885,7 +1909,7 @@ describe Ci::Pipeline, :mailer do
it 'updates does not change pipeline status' do
expect(pipeline.statuses.latest.slow_composite_status).to be_nil
expect { pipeline.update_status }
expect { pipeline.update_legacy_status }
.to change { pipeline.reload.status }
.from('created')
.to('skipped')
......@@ -1898,7 +1922,7 @@ describe Ci::Pipeline, :mailer do
end
it 'updates pipeline status to running' do
expect { pipeline.update_status }
expect { pipeline.update_legacy_status }
.to change { pipeline.reload.status }
.from('created')
.to('running')
......@@ -1911,7 +1935,7 @@ describe Ci::Pipeline, :mailer do
end
it 'updates pipeline status to scheduled' do
expect { pipeline.update_status }
expect { pipeline.update_legacy_status }
.to change { pipeline.reload.status }
.from('created')
.to('scheduled')
......@@ -1926,7 +1950,7 @@ describe Ci::Pipeline, :mailer do
end
it 'raises an exception' do
expect { pipeline.update_status }
expect { pipeline.update_legacy_status }
.to raise_error(HasStatus::UnknownStatusError)
end
end
......@@ -2214,11 +2238,11 @@ describe Ci::Pipeline, :mailer do
stub_full_request(hook.url, method: :post)
end
context 'with multiple builds', :sidekiq_might_not_need_inline do
context 'with multiple builds', :sidekiq_inline do
context 'when build is queued' do
before do
build_a.enqueue
build_b.enqueue
build_a.reload.enqueue
build_b.reload.enqueue
end
it 'receives a pending event once' do
......@@ -2228,10 +2252,10 @@ describe Ci::Pipeline, :mailer do
context 'when build is run' do
before do
build_a.enqueue
build_a.run
build_b.enqueue
build_b.run
build_a.reload.enqueue
build_a.reload.run!
build_b.reload.enqueue
build_b.reload.run!
end
it 'receives a running event once' do
......@@ -2292,6 +2316,7 @@ describe Ci::Pipeline, :mailer do
:created,
pipeline: pipeline,
name: name,
stage: "stage:#{stage_idx}",
stage_idx: stage_idx)
end
end
......
# frozen_string_literal: true
require 'spec_helper'
describe Ci::Processable do
set(:project) { create(:project) }
set(:pipeline) { create(:ci_pipeline, project: project) }
describe '#aggregated_needs_names' do
let(:with_aggregated_needs) { pipeline.processables.select_with_aggregated_needs(project) }
context 'with created status' do
let!(:processable) { create(:ci_build, :created, project: project, pipeline: pipeline) }
context 'with needs' do
before do
create(:ci_build_need, build: processable, name: 'test1')
create(:ci_build_need, build: processable, name: 'test2')
end
it 'returns all processables' do
expect(with_aggregated_needs).to contain_exactly(processable)
end
it 'returns all needs' do
expect(with_aggregated_needs.first.aggregated_needs_names).to contain_exactly('test1', 'test2')
end
context 'with ci_dag_support disabled' do
before do
stub_feature_flags(ci_dag_support: false)
end
it 'returns all processables' do
expect(with_aggregated_needs).to contain_exactly(processable)
end
it 'returns empty needs' do
expect(with_aggregated_needs.first.aggregated_needs_names).to be_nil
end
end
end
context 'without needs' do
it 'returns all processables' do
expect(with_aggregated_needs).to contain_exactly(processable)
end
it 'returns empty needs' do
expect(with_aggregated_needs.first.aggregated_needs_names).to be_nil
end
end
end
end
end
......@@ -63,7 +63,7 @@ describe Ci::Stage, :models do
end
it 'updates stage status correctly' do
expect { stage.update_status }
expect { stage.update_legacy_status }
.to change { stage.reload.status }
.to eq 'running'
end
......@@ -87,7 +87,7 @@ describe Ci::Stage, :models do
end
it 'updates status to skipped' do
expect { stage.update_status }
expect { stage.update_legacy_status }
.to change { stage.reload.status }
.to eq 'skipped'
end
......@@ -99,7 +99,7 @@ describe Ci::Stage, :models do
end
it 'updates status to scheduled' do
expect { stage.update_status }
expect { stage.update_legacy_status }
.to change { stage.reload.status }
.to 'scheduled'
end
......@@ -111,7 +111,7 @@ describe Ci::Stage, :models do
end
it 'updates status to waiting for resource' do
expect { stage.update_status }
expect { stage.update_legacy_status }
.to change { stage.reload.status }
.to 'waiting_for_resource'
end
......@@ -119,7 +119,7 @@ describe Ci::Stage, :models do
context 'when stage is skipped because is empty' do
it 'updates status to skipped' do
expect { stage.update_status }
expect { stage.update_legacy_status }
.to change { stage.reload.status }
.to eq('skipped')
end
......@@ -133,7 +133,7 @@ describe Ci::Stage, :models do
it 'retries a lock to update a stage status' do
stage.lock_version = 100
stage.update_status
stage.update_legacy_status
expect(stage.reload).to be_failed
end
......@@ -147,7 +147,7 @@ describe Ci::Stage, :models do
end
it 'raises an exception' do
expect { stage.update_status }
expect { stage.update_legacy_status }
.to raise_error(HasStatus::UnknownStatusError)
end
end
......@@ -179,7 +179,7 @@ describe Ci::Stage, :models do
stage_id: stage.id,
status: status)
stage.update_status
stage.update_legacy_status
end
end
......@@ -196,7 +196,7 @@ describe Ci::Stage, :models do
status: :failed,
allow_failure: true)
stage.update_status
stage.update_legacy_status
end
it 'is passed with warnings' do
......@@ -243,7 +243,7 @@ describe Ci::Stage, :models do
it 'recalculates index before updating status' do
expect(stage.reload.position).to be_nil
stage.update_status
stage.update_legacy_status
expect(stage.reload.position).to eq 10
end
......@@ -253,7 +253,7 @@ describe Ci::Stage, :models do
it 'fallbacks to zero' do
expect(stage.reload.position).to be_nil
stage.update_status
stage.update_legacy_status
expect(stage.reload.position).to eq 0
end
......
......@@ -63,6 +63,42 @@ describe CommitStatus do
end
end
describe '#processed' do
subject { commit_status.processed }
context 'when ci_atomic_processing is disabled' do
before do
stub_feature_flags(ci_atomic_processing: false)
commit_status.save!
end
it { is_expected.to be_nil }
end
context 'when ci_atomic_processing is enabled' do
before do
stub_feature_flags(ci_atomic_processing: true)
end
context 'status is latest' do
before do
commit_status.update!(retried: false, status: :pending)
end
it { is_expected.to be_falsey }
end
context 'status is retried' do
before do
commit_status.update!(retried: true, status: :pending)
end
it { is_expected.to be_truthy }
end
end
end
describe '#started?' do
subject { commit_status.started? }
......
......@@ -362,11 +362,11 @@ describe Ci::CreatePipelineService do
context 'when build that is not marked as interruptible is running' do
it 'cancels running outdated pipelines', :sidekiq_might_not_need_inline do
pipeline_on_previous_commit
.builds
.find_by_name('build_2_1')
.tap(&:enqueue!)
.run!
build_2_1 = pipeline_on_previous_commit
.builds.find_by_name('build_2_1')
build_2_1.enqueue!
build_2_1.reset.run!
pipeline
......@@ -377,12 +377,12 @@ describe Ci::CreatePipelineService do
end
context 'when an uninterruptible build is running' do
it 'does not cancel running outdated pipelines', :sidekiq_might_not_need_inline do
pipeline_on_previous_commit
.builds
.find_by_name('build_3_1')
.tap(&:enqueue!)
.run!
it 'does not cancel running outdated pipelines', :sidekiq_inline do
build_3_1 = pipeline_on_previous_commit
.builds.find_by_name('build_3_1')
build_3_1.enqueue!
build_3_1.reset.run!
pipeline
......
# frozen_string_literal: true
require 'spec_helper'
describe Ci::PipelineProcessing::AtomicProcessingService::StatusCollection do
using RSpec::Parameterized::TableSyntax
set(:pipeline) { create(:ci_pipeline) }
set(:build_a) { create(:ci_build, :success, name: 'build-a', stage: 'build', stage_idx: 0, pipeline: pipeline) }
set(:build_b) { create(:ci_build, :failed, name: 'build-b', stage: 'build', stage_idx: 0, pipeline: pipeline) }
set(:test_a) { create(:ci_build, :running, name: 'test-a', stage: 'test', stage_idx: 1, pipeline: pipeline) }
set(:test_b) { create(:ci_build, :pending, name: 'test-b', stage: 'test', stage_idx: 1, pipeline: pipeline) }
set(:deploy) { create(:ci_build, :created, name: 'deploy', stage: 'deploy', stage_idx: 2, pipeline: pipeline) }
let(:collection) { described_class.new(pipeline) }
describe '#set_processable_status' do
it 'does update existing status of processable' do
collection.set_processable_status(test_a.id, 'success', 100)
expect(collection.status_for_names(['test-a'])).to eq('success')
end
it 'ignores a missing processable' do
collection.set_processable_status(-1, 'failed', 100)
end
end
describe '#status_of_all' do
it 'returns composite status of the collection' do
expect(collection.status_of_all).to eq('running')
end
end
describe '#status_for_names' do
where(:names, :status) do
%w[build-a] | 'success'
%w[build-a build-b] | 'failed'
%w[build-a test-a] | 'running'
end
with_them do
it 'returns composite status of given names' do
expect(collection.status_for_names(names)).to eq(status)
end
end
end
describe '#status_for_prior_stage_position' do
where(:stage, :status) do
0 | 'success'
1 | 'failed'
2 | 'running'
end
with_them do
it 'returns composite status for processables in prior stages' do
expect(collection.status_for_prior_stage_position(stage)).to eq(status)
end
end
end
describe '#status_for_stage_position' do
where(:stage, :status) do
0 | 'failed'
1 | 'running'
2 | 'created'
end
with_them do
it 'returns composite status for processables at a given stages' do
expect(collection.status_for_stage_position(stage)).to eq(status)
end
end
end
describe '#created_processable_ids_for_stage_position' do
it 'returns IDs of processables at a given stage position' do
expect(collection.created_processable_ids_for_stage_position(0)).to be_empty
expect(collection.created_processable_ids_for_stage_position(1)).to be_empty
expect(collection.created_processable_ids_for_stage_position(2)).to contain_exactly(deploy.id)
end
end
describe '#processing_processables' do
it 'returns processables marked as processing' do
expect(collection.processing_processables.map { |processable| processable[:id]} )
.to contain_exactly(build_a.id, build_b.id, test_a.id, test_b.id, deploy.id)
end
end
end
# frozen_string_literal: true
require 'spec_helper'
require_relative 'shared_processing_service.rb'
describe Ci::PipelineProcessing::AtomicProcessingService do
before do
stub_feature_flags(ci_atomic_processing: true)
end
it_behaves_like 'Pipeline Processing Service'
end
......@@ -4,5 +4,9 @@ require 'spec_helper'
require_relative 'shared_processing_service.rb'
describe Ci::PipelineProcessing::LegacyProcessingService do
before do
stub_feature_flags(ci_atomic_processing: false)
end
it_behaves_like 'Pipeline Processing Service'
end
......@@ -879,19 +879,27 @@ shared_examples 'Pipeline Processing Service' do
end
def succeed_pending
builds.pending.map(&:success)
builds.pending.each do |build|
build.reset.success
end
end
def succeed_running_or_pending
pipeline.builds.running_or_pending.each(&:success)
pipeline.builds.running_or_pending.each do |build|
build.reset.success
end
end
def fail_running_or_pending
pipeline.builds.running_or_pending.each(&:drop)
pipeline.builds.running_or_pending.each do |build|
build.reset.drop
end
end
def cancel_running_or_pending
pipeline.builds.running_or_pending.each(&:cancel)
pipeline.builds.running_or_pending.each do |build|
build.reset.cancel
end
end
def play_manual_action(name)
......@@ -911,11 +919,15 @@ shared_examples 'Pipeline Processing Service' do
end
def create_build(name, **opts)
create(:ci_build, :created, pipeline: pipeline, name: name, **opts)
create(:ci_build, :created, pipeline: pipeline, name: name, **with_stage_opts(opts))
end
def successful_build(name, **opts)
create(:ci_build, :success, pipeline: pipeline, name: name, **opts)
create(:ci_build, :success, pipeline: pipeline, name: name, **with_stage_opts(opts))
end
def with_stage_opts(opts)
{ stage: "stage-#{opts[:stage_idx].to_i}" }.merge(opts)
end
def delayed_options
......
......@@ -45,7 +45,8 @@ describe Ci::RetryBuildService do
user_id auto_canceled_by_id retried failure_reason
sourced_pipelines artifacts_file_store artifacts_metadata_store
metadata runner_session trace_chunks upstream_pipeline_id
artifacts_file artifacts_metadata artifacts_size commands resource resource_group_id].freeze
artifacts_file artifacts_metadata artifacts_size commands
resource resource_group_id processed].freeze
shared_examples 'build duplication' do
let(:another_pipeline) { create(:ci_empty_pipeline, project: project) }
......@@ -202,12 +203,13 @@ describe Ci::RetryBuildService do
it 'does not enqueue the new build' do
expect(new_build).to be_created
expect(new_build).not_to be_processed
end
it 'does mark old build as retried in the database and on the instance' do
it 'does mark old build as retried' do
expect(new_build).to be_latest
expect(build).to be_retried
expect(build.reload).to be_retried
expect(build).to be_processed
end
context 'when build with deployment is retried' do
......
......@@ -330,7 +330,7 @@ describe Ci::RetryPipelineService, '#execute' do
stage: "stage_#{stage_num}",
stage_idx: stage_num,
pipeline: pipeline, **opts) do |build|
pipeline.update_status
pipeline.update_legacy_status
end
end
end
......@@ -8,7 +8,7 @@ describe PipelineUpdateWorker do
let(:pipeline) { create(:ci_pipeline) }
it 'updates pipeline status' do
expect_any_instance_of(Ci::Pipeline).to receive(:update_status)
expect_any_instance_of(Ci::Pipeline).to receive(:set_status).with('skipped')
described_class.new.perform(pipeline.id)
end
......
......@@ -8,7 +8,7 @@ describe StageUpdateWorker do
let(:stage) { create(:ci_stage_entity) }
it 'updates stage status' do
expect_any_instance_of(Ci::Stage).to receive(:update_status)
expect_any_instance_of(Ci::Stage).to receive(:set_status).with('skipped')
described_class.new.perform(stage.id)
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment