Commit 701b39fc authored by Robert Speicher's avatar Robert Speicher

Merge branch 'set-artifacts-locked-with-spare-time' into 'master'

Use spare loop cycles in DestroyAllExpiredService

See merge request gitlab-org/gitlab!74132
parents aa98f798 626fa470
...@@ -164,6 +164,7 @@ module Ci ...@@ -164,6 +164,7 @@ module Ci
scope :with_artifacts_not_expired, -> { with_downloadable_artifacts.where('artifacts_expire_at IS NULL OR artifacts_expire_at > ?', Time.current) } scope :with_artifacts_not_expired, -> { with_downloadable_artifacts.where('artifacts_expire_at IS NULL OR artifacts_expire_at > ?', Time.current) }
scope :with_expired_artifacts, -> { with_downloadable_artifacts.where('artifacts_expire_at < ?', Time.current) } scope :with_expired_artifacts, -> { with_downloadable_artifacts.where('artifacts_expire_at < ?', Time.current) }
scope :with_pipeline_locked_artifacts, -> { joins(:pipeline).where('pipeline.locked': Ci::Pipeline.lockeds[:artifacts_locked]) }
scope :last_month, -> { where('created_at > ?', Date.today - 1.month) } scope :last_month, -> { where('created_at > ?', Date.today - 1.month) }
scope :manual_actions, -> { where(when: :manual, status: COMPLETED_STATUSES + %i[manual]) } scope :manual_actions, -> { where(when: :manual, status: COMPLETED_STATUSES + %i[manual]) }
scope :scheduled_actions, -> { where(when: :delayed, status: COMPLETED_STATUSES + %i[scheduled]) } scope :scheduled_actions, -> { where(when: :delayed, status: COMPLETED_STATUSES + %i[scheduled]) }
......
...@@ -133,6 +133,7 @@ module Ci ...@@ -133,6 +133,7 @@ module Ci
scope :not_expired, -> { where('expire_at IS NULL OR expire_at > ?', Time.current) } scope :not_expired, -> { where('expire_at IS NULL OR expire_at > ?', Time.current) }
scope :for_sha, ->(sha, project_id) { joins(job: :pipeline).where(ci_pipelines: { sha: sha, project_id: project_id }) } scope :for_sha, ->(sha, project_id) { joins(job: :pipeline).where(ci_pipelines: { sha: sha, project_id: project_id }) }
scope :for_job_ids, ->(job_ids) { where(job_id: job_ids) }
scope :for_job_name, ->(name) { joins(:job).where(ci_builds: { name: name }) } scope :for_job_name, ->(name) { joins(:job).where(ci_builds: { name: name }) }
scope :with_job, -> { joins(:job).includes(:job) } scope :with_job, -> { joins(:job).includes(:job) }
...@@ -266,6 +267,10 @@ module Ci ...@@ -266,6 +267,10 @@ module Ci
self.where(project: project).sum(:size) self.where(project: project).sum(:size)
end end
def self.distinct_job_ids
distinct.pluck(:job_id)
end
## ##
# FastDestroyAll concerns # FastDestroyAll concerns
# rubocop: disable CodeReuse/ServiceClass # rubocop: disable CodeReuse/ServiceClass
......
...@@ -14,6 +14,7 @@ module Ci ...@@ -14,6 +14,7 @@ module Ci
def initialize def initialize
@removed_artifacts_count = 0 @removed_artifacts_count = 0
@start_at = Time.current
end end
## ##
...@@ -25,9 +26,9 @@ module Ci ...@@ -25,9 +26,9 @@ module Ci
def execute def execute
in_lock(EXCLUSIVE_LOCK_KEY, ttl: LOCK_TIMEOUT, retries: 1) do in_lock(EXCLUSIVE_LOCK_KEY, ttl: LOCK_TIMEOUT, retries: 1) do
if ::Feature.enabled?(:ci_destroy_unlocked_job_artifacts) if ::Feature.enabled?(:ci_destroy_unlocked_job_artifacts)
destroy_unlocked_job_artifacts(Time.current) destroy_unlocked_job_artifacts
else else
destroy_job_artifacts_with_slow_iteration(Time.current) destroy_job_artifacts_with_slow_iteration
end end
end end
...@@ -36,16 +37,37 @@ module Ci ...@@ -36,16 +37,37 @@ module Ci
private private
def destroy_unlocked_job_artifacts(start_at) def destroy_unlocked_job_artifacts
loop_until(timeout: LOOP_TIMEOUT, limit: LOOP_LIMIT) do loop_until(timeout: LOOP_TIMEOUT, limit: LOOP_LIMIT) do
artifacts = Ci::JobArtifact.expired_before(start_at).artifact_unlocked.limit(BATCH_SIZE) artifacts = Ci::JobArtifact.expired_before(@start_at).artifact_unlocked.limit(BATCH_SIZE)
service_response = destroy_batch(artifacts) service_response = destroy_batch(artifacts)
@removed_artifacts_count += service_response[:destroyed_artifacts_count] @removed_artifacts_count += service_response[:destroyed_artifacts_count]
update_locked_status_on_unknown_artifacts if service_response[:destroyed_artifacts_count] == 0
# Return a truthy value here to prevent exiting #loop_until
@removed_artifacts_count
end end
end end
def destroy_job_artifacts_with_slow_iteration(start_at) def update_locked_status_on_unknown_artifacts
Ci::JobArtifact.expired_before(start_at).each_batch(of: BATCH_SIZE, column: :expire_at, order: :desc) do |relation, index| build_ids = Ci::JobArtifact.expired_before(@start_at).artifact_unknown.limit(BATCH_SIZE).distinct_job_ids
return unless build_ids.present?
locked_pipeline_build_ids = ::Ci::Build.with_pipeline_locked_artifacts.id_in(build_ids).pluck_primary_key
unlocked_pipeline_build_ids = build_ids - locked_pipeline_build_ids
update_unknown_artifacts(locked_pipeline_build_ids, Ci::JobArtifact.lockeds[:artifacts_locked])
update_unknown_artifacts(unlocked_pipeline_build_ids, Ci::JobArtifact.lockeds[:unlocked])
end
def update_unknown_artifacts(build_ids, locked_value)
Ci::JobArtifact.for_job_ids(build_ids).update_all(locked: locked_value) if build_ids.any?
end
def destroy_job_artifacts_with_slow_iteration
Ci::JobArtifact.expired_before(@start_at).each_batch(of: BATCH_SIZE, column: :expire_at, order: :desc) do |relation, index|
# For performance reasons, join with ci_pipelines after the batch is queried. # For performance reasons, join with ci_pipelines after the batch is queried.
# See: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/47496 # See: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/47496
artifacts = relation.unlocked artifacts = relation.unlocked
...@@ -53,7 +75,7 @@ module Ci ...@@ -53,7 +75,7 @@ module Ci
service_response = destroy_batch(artifacts) service_response = destroy_batch(artifacts)
@removed_artifacts_count += service_response[:destroyed_artifacts_count] @removed_artifacts_count += service_response[:destroyed_artifacts_count]
break if loop_timeout?(start_at) break if loop_timeout?
break if index >= LOOP_LIMIT break if index >= LOOP_LIMIT
end end
end end
...@@ -62,8 +84,8 @@ module Ci ...@@ -62,8 +84,8 @@ module Ci
Ci::JobArtifacts::DestroyBatchService.new(artifacts).execute Ci::JobArtifacts::DestroyBatchService.new(artifacts).execute
end end
def loop_timeout?(start_at) def loop_timeout?
Time.current > start_at + LOOP_TIMEOUT Time.current > @start_at + LOOP_TIMEOUT
end end
end end
end end
......
...@@ -53,6 +53,46 @@ RSpec.describe Ci::JobArtifacts::DestroyAllExpiredService, :clean_gitlab_redis_s ...@@ -53,6 +53,46 @@ RSpec.describe Ci::JobArtifacts::DestroyAllExpiredService, :clean_gitlab_redis_s
log = ActiveRecord::QueryRecorder.new { subject } log = ActiveRecord::QueryRecorder.new { subject }
expect(log.count).to be_within(1).of(8) expect(log.count).to be_within(1).of(8)
end end
context 'with several locked-unknown artifact records' do
before do
stub_const("#{described_class}::LOOP_LIMIT", 10)
stub_const("#{described_class}::BATCH_SIZE", 2)
end
let!(:lockable_artifact_records) do
[
create(:ci_job_artifact, :metadata, :expired, locked: ::Ci::JobArtifact.lockeds[:unknown], job: locked_job),
create(:ci_job_artifact, :junit, :expired, locked: ::Ci::JobArtifact.lockeds[:unknown], job: locked_job),
create(:ci_job_artifact, :sast, :expired, locked: ::Ci::JobArtifact.lockeds[:unknown], job: locked_job),
create(:ci_job_artifact, :cobertura, :expired, locked: ::Ci::JobArtifact.lockeds[:unknown], job: locked_job),
create(:ci_job_artifact, :trace, :expired, locked: ::Ci::JobArtifact.lockeds[:unknown], job: locked_job)
]
end
let!(:unlockable_artifact_records) do
[
create(:ci_job_artifact, :metadata, :expired, locked: ::Ci::JobArtifact.lockeds[:unknown], job: job),
create(:ci_job_artifact, :junit, :expired, locked: ::Ci::JobArtifact.lockeds[:unknown], job: job),
create(:ci_job_artifact, :sast, :expired, locked: ::Ci::JobArtifact.lockeds[:unknown], job: job),
create(:ci_job_artifact, :cobertura, :expired, locked: ::Ci::JobArtifact.lockeds[:unknown], job: job),
create(:ci_job_artifact, :trace, :expired, locked: ::Ci::JobArtifact.lockeds[:unknown], job: job),
artifact
]
end
it 'updates the locked status of job artifacts from artifacts-locked pipelines' do
subject
expect(lockable_artifact_records).to be_all(&:persisted?)
expect(lockable_artifact_records).to be_all { |artifact| artifact.reload.artifact_artifacts_locked? }
end
it 'unlocks and then destroys job artifacts from artifacts-unlocked pipelines' do
expect { subject }.to change { Ci::JobArtifact.count }.by(-6)
expect(Ci::JobArtifact.where(id: unlockable_artifact_records.map(&:id))).to be_empty
end
end
end end
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment