Commit ffb0ded8 authored by Marius Bobin's avatar Marius Bobin Committed by Bob Van Landuyt

Change expired artifacts to use deleted objects

- Copy expired artifacts to ci_deleted_objects
- Update projects and namespace statistics
parent c5d9677e
......@@ -169,6 +169,7 @@ module Ci
scope :downloadable, -> { where(file_type: DOWNLOADABLE_TYPES) }
scope :unlocked, -> { joins(job: :pipeline).merge(::Ci::Pipeline.unlocked).order(expire_at: :desc) }
scope :with_destroy_preloads, -> { includes(project: [:route, :statistics]) }
scope :scoped_project, -> { where('ci_job_artifacts.project_id = projects.id') }
......
......@@ -6,45 +6,106 @@ module Ci
include ::Gitlab::LoopHelpers
BATCH_SIZE = 100
LOOP_TIMEOUT = 45.minutes
LOOP_TIMEOUT = 5.minutes
LEGACY_LOOP_TIMEOUT = 45.minutes
LOOP_LIMIT = 1000
EXCLUSIVE_LOCK_KEY = 'expired_job_artifacts:destroy:lock'
LOCK_TIMEOUT = 50.minutes
LOCK_TIMEOUT = 10.minutes
LEGACY_LOCK_TIMEOUT = 50.minutes
##
# Destroy expired job artifacts on GitLab instance
#
# This destroy process cannot run for more than 45 minutes. This is for
# This destroy process cannot run for more than 10 minutes. This is for
# preventing multiple `ExpireBuildArtifactsWorker` CRON jobs run concurrently,
# which is scheduled at every hour.
def execute
in_lock(EXCLUSIVE_LOCK_KEY, ttl: LOCK_TIMEOUT, retries: 1) do
loop_until(timeout: LOOP_TIMEOUT, limit: LOOP_LIMIT) do
destroy_batch(Ci::JobArtifact) || destroy_batch(Ci::PipelineArtifact)
in_lock(EXCLUSIVE_LOCK_KEY, ttl: lock_timeout, retries: 1) do
loop_until(timeout: loop_timeout, limit: LOOP_LIMIT) do
destroy_artifacts_batch
end
end
end
private
def destroy_batch(klass)
artifact_batch = if klass == Ci::JobArtifact
klass.expired(BATCH_SIZE).unlocked
def destroy_artifacts_batch
destroy_job_artifacts_batch || destroy_pipeline_artifacts_batch
end
def destroy_job_artifacts_batch
artifacts = Ci::JobArtifact
.expired(BATCH_SIZE)
.unlocked
.with_destroy_preloads
.to_a
return false if artifacts.empty?
if parallel_destroy?
parallel_destroy_batch(artifacts)
else
klass.expired(BATCH_SIZE)
legacy_destroy_batch(artifacts)
destroy_related_records_for(artifacts)
end
artifacts = artifact_batch.to_a
true
end
def destroy_pipeline_artifacts_batch
artifacts = Ci::PipelineArtifact.expired(BATCH_SIZE).to_a
return false if artifacts.empty?
legacy_destroy_batch(artifacts)
true
end
def parallel_destroy?
::Feature.enabled?(:ci_delete_objects)
end
def legacy_destroy_batch(artifacts)
artifacts.each(&:destroy!)
run_after_destroy(artifacts)
end
def parallel_destroy_batch(job_artifacts)
Ci::DeletedObject.transaction do
Ci::DeletedObject.bulk_import(job_artifacts)
Ci::JobArtifact.id_in(job_artifacts.map(&:id)).delete_all
destroy_related_records_for(job_artifacts)
end
# This is executed outside of the transaction because it depends on Redis
update_statistics_for(job_artifacts)
end
# This method is implemented in EE and it must do only database work
def destroy_related_records_for(job_artifacts); end
def update_statistics_for(job_artifacts)
artifacts_by_project = job_artifacts.group_by(&:project)
artifacts_by_project.each do |project, artifacts|
delta = -artifacts.sum { |artifact| artifact.size.to_i }
ProjectStatistics.increment_statistic(
project, Ci::JobArtifact.project_statistics_name, delta)
end
end
true # This is required because of the design of `loop_until` method.
def loop_timeout
if parallel_destroy?
LOOP_TIMEOUT
else
LEGACY_LOOP_TIMEOUT
end
end
def run_after_destroy(artifacts); end
def lock_timeout
if parallel_destroy?
LOCK_TIMEOUT
else
LEGACY_LOCK_TIMEOUT
end
end
end
end
......
---
name: ci_delete_objects
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/42242
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/247103
group: group::continuous integration
type: development
default_enabled: false
\ No newline at end of file
......@@ -3,8 +3,13 @@
module EE
module Ci
module DestroyExpiredJobArtifactsService
def run_after_destroy(artifacts)
destroy_security_findings_for(artifacts) if artifacts.first.is_a?(::Ci::JobArtifact)
extend ::Gitlab::Utils::Override
private
override :destroy_related_records_for
def destroy_related_records_for(artifacts)
destroy_security_findings_for(artifacts)
end
def destroy_security_findings_for(artifacts)
......
......@@ -61,7 +61,9 @@ RSpec.describe Ci::DestroyExpiredJobArtifactsService, :clean_gitlab_redis_shared
end
context 'when failed to destroy artifact' do
context 'with ci_delete_objects disabled' do
before do
stub_feature_flags(ci_delete_objects: false)
stub_const('Ci::DestroyExpiredJobArtifactsService::LOOP_LIMIT', 10)
allow_next_found_instance_of(Ci::JobArtifact) do |artifact|
......@@ -75,6 +77,22 @@ RSpec.describe Ci::DestroyExpiredJobArtifactsService, :clean_gitlab_redis_shared
end
end
context 'with ci_delete_objects enabled' do
before do
stub_const('Ci::DestroyExpiredJobArtifactsService::LOOP_LIMIT', 10)
expect(Ci::DeletedObject)
.to receive(:bulk_import)
.once
.and_raise(ActiveRecord::RecordNotDestroyed)
end
it 'raises an exception and stop destroying' do
expect { subject }.to raise_error(ActiveRecord::RecordNotDestroyed)
.and not_change { Security::Finding.count }.from(1)
end
end
end
context 'when there are artifacts more than batch sizes' do
before do
stub_const('Ci::DestroyExpiredJobArtifactsService::BATCH_SIZE', 1)
......
......@@ -5,26 +5,77 @@ require 'spec_helper'
RSpec.describe Ci::DestroyExpiredJobArtifactsService, :clean_gitlab_redis_shared_state do
include ExclusiveLeaseHelpers
let(:service) { described_class.new }
describe '.execute' do
subject { service.execute }
let(:service) { described_class.new }
let_it_be(:artifact) { create(:ci_job_artifact, expire_at: 1.day.ago) }
let_it_be(:artifact, reload: true) do
create(:ci_job_artifact, expire_at: 1.day.ago)
end
before(:all) do
artifact.job.pipeline.unlocked!
end
context 'when artifact is expired' do
context 'when artifact is not locked' do
context 'with preloaded relationships' do
before do
artifact.job.pipeline.unlocked!
job = create(:ci_build, pipeline: artifact.job.pipeline)
create(:ci_job_artifact, :archive, :expired, job: job)
stub_const('Ci::DestroyExpiredJobArtifactsService::LOOP_LIMIT', 1)
end
it 'destroys job artifact' do
it 'performs the smallest number of queries for job_artifacts' do
log = ActiveRecord::QueryRecorder.new { subject }
# SELECT expired ci_job_artifacts
# PRELOAD projects, routes, project_statistics
# BEGIN
# INSERT into ci_deleted_objects
# DELETE loaded ci_job_artifacts
# DELETE security_findings -- for EE
# COMMIT
expect(log.count).to be_within(1).of(8)
end
end
context 'when artifact is not locked' do
it 'deletes job artifact record' do
expect { subject }.to change { Ci::JobArtifact.count }.by(-1)
end
context 'when the artifact does not a file attached to it' do
it 'does not create deleted objects' do
expect(artifact.exists?).to be_falsy # sanity check
expect { subject }.not_to change { Ci::DeletedObject.count }
end
end
context 'when the artifact has a file attached to it' do
before do
artifact.file = fixture_file_upload(Rails.root.join('spec/fixtures/ci_build_artifacts.zip'), 'application/zip')
artifact.save!
end
it 'creates a deleted object' do
expect { subject }.to change { Ci::DeletedObject.count }.by(1)
end
it 'resets project statistics' do
expect(ProjectStatistics).to receive(:increment_statistic).once
.with(artifact.project, :build_artifacts_size, -artifact.file.size)
.and_call_original
subject
end
it 'does not remove the files' do
expect { subject }.not_to change { artifact.file.exists? }
end
end
end
context 'when artifact is locked' do
......@@ -61,6 +112,11 @@ RSpec.describe Ci::DestroyExpiredJobArtifactsService, :clean_gitlab_redis_shared
context 'when failed to destroy artifact' do
before do
stub_const('Ci::DestroyExpiredJobArtifactsService::LOOP_LIMIT', 10)
end
context 'with ci_delete_objects disabled' do
before do
stub_feature_flags(ci_delete_objects: false)
allow_any_instance_of(Ci::JobArtifact)
.to receive(:destroy!)
......@@ -72,6 +128,39 @@ RSpec.describe Ci::DestroyExpiredJobArtifactsService, :clean_gitlab_redis_shared
end
end
context 'with ci_delete_objects enabled' do
context 'when the import fails' do
before do
stub_const('Ci::DestroyExpiredJobArtifactsService::LOOP_LIMIT', 10)
expect(Ci::DeletedObject)
.to receive(:bulk_import)
.once
.and_raise(ActiveRecord::RecordNotDestroyed)
end
it 'raises an exception and stop destroying' do
expect { subject }.to raise_error(ActiveRecord::RecordNotDestroyed)
.and not_change { Ci::JobArtifact.count }.from(1)
end
end
context 'when the delete fails' do
before do
stub_const('Ci::DestroyExpiredJobArtifactsService::LOOP_LIMIT', 10)
expect(Ci::JobArtifact)
.to receive(:id_in)
.once
.and_raise(ActiveRecord::RecordNotDestroyed)
end
it 'raises an exception rolls back the insert' do
expect { subject }.to raise_error(ActiveRecord::RecordNotDestroyed)
.and not_change { Ci::DeletedObject.count }.from(0)
end
end
end
end
context 'when exclusive lease has already been taken by the other instance' do
before do
stub_exclusive_lease_taken(described_class::EXCLUSIVE_LOCK_KEY, timeout: described_class::LOCK_TIMEOUT)
......@@ -85,7 +174,7 @@ RSpec.describe Ci::DestroyExpiredJobArtifactsService, :clean_gitlab_redis_shared
context 'when timeout happens' do
before do
stub_const('Ci::DestroyExpiredJobArtifactsService::LOOP_TIMEOUT', 1.second)
allow_any_instance_of(described_class).to receive(:destroy_batch) { true }
allow_any_instance_of(described_class).to receive(:destroy_artifacts_batch) { true }
end
it 'returns false and does not continue destroying' do
......@@ -176,4 +265,16 @@ RSpec.describe Ci::DestroyExpiredJobArtifactsService, :clean_gitlab_redis_shared
end
end
end
describe '.destroy_job_artifacts_batch' do
it 'returns a falsy value without artifacts' do
expect(service.send(:destroy_job_artifacts_batch)).to be_falsy
end
end
describe '.destroy_pipeline_artifacts_batch' do
it 'returns a falsy value without artifacts' do
expect(service.send(:destroy_pipeline_artifacts_batch)).to be_falsy
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment