Commit 24a412b2 authored by drew cimino's avatar drew cimino

Introduce Ci::UpdateLockedUnknownArtifactsWorker

Feature flag: ci_job_artifacts_backlog_work
Control execution of our new artifact-backlog-work service by turning
Ci::JobArtifacts::UpdateUnknownLockedStatusService into a no-op.

Feature flag: ci_job_artifacts_backlog_large_loop_limit
Control the number of rows updated in or removed from
ci_job_artifacts in a single execution by increasing the loop_limit
from 100 ot 500 iterations. BATCH_SIZE = 100 implies a record limit
for a single worker execution of 10,000 or 50,000 rows.

First, we need to remove the DISTINCT clause from this query. Using
it tried to distinct the whole table before applying the limit, which
is bad. Also, using DISTINCT implicitly queried for the lowest job_id
values first, which created an enormous pile of dead tuple at the end
of the index that we were explicitly starting our query from. This is
catastrophic for index performance on replicas, but not on the
primary, which can use LP_DEAD hint to skip dead index tuples. On
replicas, performance remains degraded until we VACUUM.

Second, we apply an explicit order by expiration to prevent us from
accidentally querying rows in an order that correlates with job_id.
By ensuring that we query rows that are generally well-distributed
across the table, we prevent large blocks of dead index tuples that
need to be scanned before we can come up with 100 live tuples to
return.

Third, we destroy unlocked job artifacts without updating.
By updating rows in ci_job_artifacts to then be deleted on the next
loop cycle, we create twice as many dead tuples as necessary in the
table. Artifacts locked by ci_pipelines.locked will be updated and
left in place, but the others will get passed directly into the
DestroyBatchService to be removed. The dead tuple created by the
update is not necessary.
parent e9cee7d3
...@@ -186,6 +186,7 @@ module Ci ...@@ -186,6 +186,7 @@ module Ci
scope :downloadable, -> { where(file_type: DOWNLOADABLE_TYPES) } scope :downloadable, -> { where(file_type: DOWNLOADABLE_TYPES) }
scope :unlocked, -> { joins(job: :pipeline).merge(::Ci::Pipeline.unlocked) } scope :unlocked, -> { joins(job: :pipeline).merge(::Ci::Pipeline.unlocked) }
scope :order_expired_asc, -> { order(expire_at: :asc) }
scope :order_expired_desc, -> { order(expire_at: :desc) } scope :order_expired_desc, -> { order(expire_at: :desc) }
scope :with_destroy_preloads, -> { includes(project: [:route, :statistics]) } scope :with_destroy_preloads, -> { includes(project: [:route, :statistics]) }
...@@ -273,6 +274,10 @@ module Ci ...@@ -273,6 +274,10 @@ module Ci
self.where(project: project).sum(:size) self.where(project: project).sum(:size)
end end
def self.pluck_job_id
pluck(:job_id)
end
## ##
# FastDestroyAll concerns # FastDestroyAll concerns
# rubocop: disable CodeReuse/ServiceClass # rubocop: disable CodeReuse/ServiceClass
......
# frozen_string_literal: true
module Ci
module JobArtifacts
class UpdateUnknownLockedStatusService
include ::Gitlab::ExclusiveLeaseHelpers
include ::Gitlab::LoopHelpers
BATCH_SIZE = 100
LOOP_TIMEOUT = 5.minutes
LOOP_LIMIT = 100
LARGE_LOOP_LIMIT = 500
EXCLUSIVE_LOCK_KEY = 'unknown_status_job_artifacts:update:lock'
LOCK_TIMEOUT = 6.minutes
def initialize
@removed_count = 0
@locked_count = 0
@start_at = Time.current
@loop_limit = Feature.enabled?(:ci_job_artifacts_backlog_large_loop_limit) ? LARGE_LOOP_LIMIT : LOOP_LIMIT
end
def execute
in_lock(EXCLUSIVE_LOCK_KEY, ttl: LOCK_TIMEOUT, retries: 1) do
update_locked_status_on_unknown_artifacts
end
{ removed: @removed_count, locked: @locked_count }
end
private
def update_locked_status_on_unknown_artifacts
loop_until(timeout: LOOP_TIMEOUT, limit: @loop_limit) do
unknown_status_build_ids = safely_ordered_ci_job_artifacts_locked_unknown_relation.pluck_job_id.uniq
locked_pipe_build_ids = ::Ci::Build
.with_pipeline_locked_artifacts
.id_in(unknown_status_build_ids)
.pluck_primary_key
@locked_count += update_unknown_artifacts(locked_pipe_build_ids, Ci::JobArtifact.lockeds[:artifacts_locked])
unlocked_pipe_build_ids = unknown_status_build_ids - locked_pipe_build_ids
service_response = batch_destroy_artifacts(unlocked_pipe_build_ids)
@removed_count += service_response[:destroyed_artifacts_count]
end
end
def update_unknown_artifacts(build_ids, locked_value)
return 0 unless build_ids.any?
expired_locked_unknown_artifacts.for_job_ids(build_ids).update_all(locked: locked_value)
end
def batch_destroy_artifacts(build_ids)
deleteable_artifacts_relation =
if build_ids.any?
expired_locked_unknown_artifacts.for_job_ids(build_ids)
else
Ci::JobArtifact.none
end
Ci::JobArtifacts::DestroyBatchService.new(deleteable_artifacts_relation).execute
end
def expired_locked_unknown_artifacts
# UPDATE queries perform better without the specific order and limit
# https://gitlab.com/gitlab-org/gitlab/-/merge_requests/76509#note_891260455
Ci::JobArtifact.expired_before(@start_at).artifact_unknown
end
def safely_ordered_ci_job_artifacts_locked_unknown_relation
# Adding the ORDER and LIMIT improves performance when we don't have build_id
expired_locked_unknown_artifacts.limit(BATCH_SIZE).order_expired_asc
end
end
end
end
...@@ -264,6 +264,15 @@ ...@@ -264,6 +264,15 @@
:weight: 1 :weight: 1
:idempotent: true :idempotent: true
:tags: [] :tags: []
- :name: cronjob:ci_update_locked_unknown_artifacts
:worker_name: Ci::UpdateLockedUnknownArtifactsWorker
:feature_category: :build_artifacts
:has_external_dependencies:
:urgency: :throttled
:resource_boundary: :unknown
:weight: 1
:idempotent:
:tags: []
- :name: cronjob:clusters_integrations_check_prometheus_health - :name: cronjob:clusters_integrations_check_prometheus_health
:worker_name: Clusters::Integrations::CheckPrometheusHealthWorker :worker_name: Clusters::Integrations::CheckPrometheusHealthWorker
:feature_category: :incident_management :feature_category: :incident_management
......
# frozen_string_literal: true
module Ci
class UpdateLockedUnknownArtifactsWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
data_consistency :sticky
urgency :throttled
# rubocop:disable Scalability/CronWorkerContext
# This worker does not perform work scoped to a context
include CronjobQueue
# rubocop:enable Scalability/CronWorkerContext
feature_category :build_artifacts
def perform
return unless ::Feature.enabled?(:ci_job_artifacts_backlog_work)
artifact_counts = Ci::JobArtifacts::UpdateUnknownLockedStatusService.new.execute
log_extra_metadata_on_done(:removed_count, artifact_counts[:removed])
log_extra_metadata_on_done(:locked_count, artifact_counts[:locked])
end
end
end
---
name: ci_job_artifacts_backlog_large_loop_limit
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/76509
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/347151
milestone: '14.10'
type: development
group: group::pipeline execution
default_enabled: false
---
name: ci_job_artifacts_backlog_work
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/76509
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/347144
milestone: '14.10'
type: development
group: group::pipeline execution
default_enabled: false
...@@ -446,6 +446,9 @@ Settings.cron_jobs['pipeline_schedule_worker']['job_class'] = 'PipelineScheduleW ...@@ -446,6 +446,9 @@ Settings.cron_jobs['pipeline_schedule_worker']['job_class'] = 'PipelineScheduleW
Settings.cron_jobs['expire_build_artifacts_worker'] ||= Settingslogic.new({}) Settings.cron_jobs['expire_build_artifacts_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['expire_build_artifacts_worker']['cron'] ||= '*/7 * * * *' Settings.cron_jobs['expire_build_artifacts_worker']['cron'] ||= '*/7 * * * *'
Settings.cron_jobs['expire_build_artifacts_worker']['job_class'] = 'ExpireBuildArtifactsWorker' Settings.cron_jobs['expire_build_artifacts_worker']['job_class'] = 'ExpireBuildArtifactsWorker'
Settings.cron_jobs['update_locked_unknown_artifacts_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['update_locked_unknown_artifacts_worker']['cron'] ||= '*/7 * * * *'
Settings.cron_jobs['update_locked_unknown_artifacts_worker']['job_class'] = 'Ci::UpdateLockedUnknownArtifactsWorker'
Settings.cron_jobs['ci_pipelines_expire_artifacts_worker'] ||= Settingslogic.new({}) Settings.cron_jobs['ci_pipelines_expire_artifacts_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['ci_pipelines_expire_artifacts_worker']['cron'] ||= '*/23 * * * *' Settings.cron_jobs['ci_pipelines_expire_artifacts_worker']['cron'] ||= '*/23 * * * *'
Settings.cron_jobs['ci_pipelines_expire_artifacts_worker']['job_class'] = 'Ci::PipelineArtifacts::ExpireArtifactsWorker' Settings.cron_jobs['ci_pipelines_expire_artifacts_worker']['job_class'] = 'Ci::PipelineArtifacts::ExpireArtifactsWorker'
......
...@@ -279,6 +279,15 @@ RSpec.describe Ci::JobArtifact do ...@@ -279,6 +279,15 @@ RSpec.describe Ci::JobArtifact do
end end
end end
describe '.order_expired_asc' do
let_it_be(:first_artifact) { create(:ci_job_artifact, expire_at: 2.days.ago) }
let_it_be(:second_artifact) { create(:ci_job_artifact, expire_at: 1.day.ago) }
it 'returns ordered artifacts' do
expect(described_class.order_expired_asc).to eq([first_artifact, second_artifact])
end
end
describe '.for_project' do describe '.for_project' do
it 'returns artifacts only for given project(s)', :aggregate_failures do it 'returns artifacts only for given project(s)', :aggregate_failures do
artifact1 = create(:ci_job_artifact) artifact1 = create(:ci_job_artifact)
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Ci::JobArtifacts::UpdateUnknownLockedStatusService, :clean_gitlab_redis_shared_state do
include ExclusiveLeaseHelpers
let(:service) { described_class.new }
describe '.execute' do
subject { service.execute }
let_it_be(:locked_pipeline) { create(:ci_pipeline, :artifacts_locked) }
let_it_be(:pipeline) { create(:ci_pipeline, :unlocked) }
let_it_be(:locked_job) { create(:ci_build, :success, pipeline: locked_pipeline) }
let_it_be(:job) { create(:ci_build, :success, pipeline: pipeline) }
let!(:unknown_unlocked_artifact) do
create(:ci_job_artifact, :junit, expire_at: 1.hour.ago, job: job, locked: Ci::JobArtifact.lockeds[:unknown])
end
let!(:unknown_locked_artifact) do
create(:ci_job_artifact, :lsif,
expire_at: 1.day.ago,
job: locked_job,
locked: Ci::JobArtifact.lockeds[:unknown]
)
end
let!(:unlocked_artifact) do
create(:ci_job_artifact, :archive, expire_at: 1.hour.ago, job: job, locked: Ci::JobArtifact.lockeds[:unlocked])
end
let!(:locked_artifact) do
create(:ci_job_artifact, :sast, :raw,
expire_at: 1.day.ago,
job: locked_job,
locked: Ci::JobArtifact.lockeds[:artifacts_locked]
)
end
context 'when artifacts are expired' do
it 'sets artifact_locked when the pipeline is locked' do
expect { service.execute }
.to change { unknown_locked_artifact.reload.locked }.from('unknown').to('artifacts_locked')
.and not_change { Ci::JobArtifact.exists?(locked_artifact.id) }
end
it 'destroys the artifact when the pipeline is unlocked' do
expect { subject }.to change { Ci::JobArtifact.exists?(unknown_unlocked_artifact.id) }.from(true).to(false)
end
it 'does not update ci_job_artifact rows with known locked values' do
expect { service.execute }
.to not_change(locked_artifact, :attributes)
.and not_change { Ci::JobArtifact.exists?(locked_artifact.id) }
.and not_change(unlocked_artifact, :attributes)
.and not_change { Ci::JobArtifact.exists?(unlocked_artifact.id) }
end
it 'logs the counts of affected artifacts' do
expect(subject).to eq({ removed: 1, locked: 1 })
end
end
context 'in a single iteration' do
before do
stub_const("#{described_class}::BATCH_SIZE", 1)
end
context 'due to the LOOP_TIMEOUT' do
before do
stub_const("#{described_class}::LOOP_TIMEOUT", 0.seconds)
end
it 'affects the earliest expired artifact first' do
subject
expect(unknown_locked_artifact.reload.locked).to eq('artifacts_locked')
expect(unknown_unlocked_artifact.reload.locked).to eq('unknown')
end
it 'reports the number of destroyed artifacts' do
is_expected.to eq({ removed: 0, locked: 1 })
end
end
context 'due to @loop_limit' do
before do
stub_const("#{described_class}::LARGE_LOOP_LIMIT", 1)
end
it 'affects the most recently expired artifact first' do
subject
expect(unknown_locked_artifact.reload.locked).to eq('artifacts_locked')
expect(unknown_unlocked_artifact.reload.locked).to eq('unknown')
end
it 'reports the number of destroyed artifacts' do
is_expected.to eq({ removed: 0, locked: 1 })
end
end
end
context 'when artifact is not expired' do
let!(:unknown_unlocked_artifact) do
create(:ci_job_artifact, :junit,
expire_at: 1.year.from_now,
job: job,
locked: Ci::JobArtifact.lockeds[:unknown]
)
end
it 'does not change the locked status' do
expect { service.execute }.not_to change { unknown_unlocked_artifact.locked }
expect(Ci::JobArtifact.exists?(unknown_unlocked_artifact.id)).to eq(true)
end
end
context 'when exclusive lease has already been taken by the other instance' do
before do
stub_exclusive_lease_taken(described_class::EXCLUSIVE_LOCK_KEY, timeout: described_class::LOCK_TIMEOUT)
end
it 'raises an error and' do
expect { subject }.to raise_error(Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError)
end
end
context 'when there are no unknown status artifacts' do
before do
Ci::JobArtifact.update_all(locked: :unlocked)
end
it 'does not raise error' do
expect { subject }.not_to raise_error
end
it 'reports the number of destroyed artifacts' do
is_expected.to eq({ removed: 0, locked: 0 })
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Ci::UpdateLockedUnknownArtifactsWorker do
let(:worker) { described_class.new }
describe '#perform' do
it 'executes an instance of Ci::JobArtifacts::UpdateUnknownLockedStatusService' do
expect_next_instance_of(Ci::JobArtifacts::UpdateUnknownLockedStatusService) do |instance|
expect(instance).to receive(:execute).and_call_original
end
expect(worker).to receive(:log_extra_metadata_on_done).with(:removed_count, 0)
expect(worker).to receive(:log_extra_metadata_on_done).with(:locked_count, 0)
worker.perform
end
context 'with the ci_job_artifacts_backlog_work flag shut off' do
before do
stub_feature_flags(ci_job_artifacts_backlog_work: false)
end
it 'does not instantiate a new Ci::JobArtifacts::UpdateUnknownLockedStatusService' do
expect(Ci::JobArtifacts::UpdateUnknownLockedStatusService).not_to receive(:new)
worker.perform
end
it 'does not log any artifact counts' do
expect(worker).not_to receive(:log_extra_metadata_on_done)
worker.perform
end
it 'does not query the database' do
query_count = ActiveRecord::QueryRecorder.new { worker.perform }.count
expect(query_count).to eq(0)
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment