Commit 9bf76fe0 authored by Marius Bobin's avatar Marius Bobin Committed by Bob Van Landuyt

Add Ci::DeletedObject model and workers

- Add database migration to create ci_deleted_objects table
- Add worker to removed objects
- Add scheduled worker to start regular workers
parent af8a060f
# frozen_string_literal: true
module Ci
class DeletedObject < ApplicationRecord
extend Gitlab::Ci::Model
mount_uploader :file, DeletedObjectUploader
scope :ready_for_destruction, ->(limit) do
where('pick_up_at < ?', Time.current).limit(limit)
end
scope :lock_for_destruction, ->(limit) do
ready_for_destruction(limit)
.select(:id)
.order(:pick_up_at)
.lock('FOR UPDATE SKIP LOCKED')
end
def self.bulk_import(artifacts)
attributes = artifacts.each.with_object([]) do |artifact, accumulator|
record = artifact.to_deleted_object_attrs
accumulator << record if record[:store_dir] && record[:file]
end
self.insert_all(attributes) if attributes.any?
end
def delete_file_from_storage
file.remove!
true
rescue => exception
Gitlab::ErrorTracking.track_exception(exception)
false
end
end
end
......@@ -290,6 +290,15 @@ module Ci
max_size&.megabytes.to_i
end
def to_deleted_object_attrs
{
file_store: file_store,
store_dir: file.store_dir.to_s,
file: file_identifier,
pick_up_at: expire_at || Time.current
}
end
private
def set_size
......
# frozen_string_literal: true
module Ci
class DeleteObjectsService
TransactionInProgressError = Class.new(StandardError)
TRANSACTION_MESSAGE = "can't perform network calls inside a database transaction"
BATCH_SIZE = 100
RETRY_IN = 10.minutes
def execute
objects = load_next_batch
destroy_everything(objects)
end
def remaining_batches_count(max_batch_count:)
Ci::DeletedObject
.ready_for_destruction(max_batch_count * BATCH_SIZE)
.size
.fdiv(BATCH_SIZE)
.ceil
end
private
# rubocop: disable CodeReuse/ActiveRecord
def load_next_batch
# `find_by_sql` performs a write in this case and we need to wrap it in
# a transaction to stick to the primary database.
Ci::DeletedObject.transaction do
Ci::DeletedObject.find_by_sql([
next_batch_sql, new_pick_up_at: RETRY_IN.from_now
])
end
end
# rubocop: enable CodeReuse/ActiveRecord
def next_batch_sql
<<~SQL.squish
UPDATE "ci_deleted_objects"
SET "pick_up_at" = :new_pick_up_at
WHERE "ci_deleted_objects"."id" IN (#{locked_object_ids_sql})
RETURNING *
SQL
end
def locked_object_ids_sql
Ci::DeletedObject.lock_for_destruction(BATCH_SIZE).to_sql
end
def destroy_everything(objects)
raise TransactionInProgressError, TRANSACTION_MESSAGE if transaction_open?
return unless objects.any?
deleted = objects.select(&:delete_file_from_storage)
Ci::DeletedObject.id_in(deleted.map(&:id)).delete_all
end
def transaction_open?
Ci::DeletedObject.connection.transaction_open?
end
end
end
# frozen_string_literal: true
class DeletedObjectUploader < GitlabUploader
include ObjectStorage::Concern
storage_options Gitlab.config.artifacts
def store_dir
model.store_dir
end
end
......@@ -147,6 +147,14 @@
:weight: 1
:idempotent:
:tags: []
- :name: cronjob:ci_schedule_delete_objects_cron
:feature_category: :continuous_integration
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: cronjob:container_expiration_policy
:feature_category: :container_registry
:has_external_dependencies:
......@@ -1305,6 +1313,14 @@
:idempotent:
:tags:
- :requires_disk_io
- :name: ci_delete_objects
:feature_category: :continuous_integration
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: create_commit_signature
:feature_category: :source_code_management
:has_external_dependencies:
......
# frozen_string_literal: true
module Ci
class DeleteObjectsWorker
include ApplicationWorker
include LimitedCapacity::Worker
feature_category :continuous_integration
idempotent!
def perform_work(*args)
service.execute
end
def remaining_work_count(*args)
@remaining_work_count ||= service
.remaining_batches_count(max_batch_count: remaining_capacity)
end
def max_running_jobs
if ::Feature.enabled?(:ci_delete_objects_low_concurrency)
2
elsif ::Feature.enabled?(:ci_delete_objects_medium_concurrency)
20
elsif ::Feature.enabled?(:ci_delete_objects_high_concurrency)
50
else
0
end
end
private
def service
@service ||= DeleteObjectsService.new
end
end
end
# frozen_string_literal: true
module Ci
class ScheduleDeleteObjectsCronWorker
include ApplicationWorker
# rubocop:disable Scalability/CronWorkerContext
# This worker does not perform work scoped to a context
include CronjobQueue
# rubocop:enable Scalability/CronWorkerContext
feature_category :continuous_integration
idempotent!
def perform(*args)
Ci::DeleteObjectsWorker.perform_with_capacity(*args)
end
end
end
---
title: Parallelize removal of expired artifacts
merge_request: 39464
author:
type: changed
---
name: ci_delete_objects_high_concurrency
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/39464
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/247103
group: group::continuous integration
type: development
default_enabled: false
\ No newline at end of file
---
name: ci_delete_objects_low_concurrency
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/39464
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/247103
group: group::continuous integration
type: development
default_enabled: false
\ No newline at end of file
---
name: ci_delete_objects_medium_concurrency
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/39464
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/247103
group: group::continuous integration
type: development
default_enabled: false
\ No newline at end of file
......@@ -435,6 +435,9 @@ production: &base
# Remove expired build artifacts
expire_build_artifacts_worker:
cron: "50 * * * *"
# Remove files from object storage
ci_schedule_delete_objects_worker:
cron: "*/16 * * * *"
# Stop expired environments
environments_auto_stop_cron_worker:
cron: "24 * * * *"
......
......@@ -416,6 +416,9 @@ Settings.cron_jobs['pipeline_schedule_worker']['job_class'] = 'PipelineScheduleW
Settings.cron_jobs['expire_build_artifacts_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['expire_build_artifacts_worker']['cron'] ||= '50 * * * *'
Settings.cron_jobs['expire_build_artifacts_worker']['job_class'] = 'ExpireBuildArtifactsWorker'
Settings.cron_jobs['ci_schedule_delete_objects_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['ci_schedule_delete_objects_worker']['cron'] ||= '*/16 * * * *'
Settings.cron_jobs['ci_schedule_delete_objects_worker']['job_class'] = 'Ci::ScheduleDeleteObjectsCronWorker'
Settings.cron_jobs['environments_auto_stop_cron_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['environments_auto_stop_cron_worker']['cron'] ||= '24 * * * *'
Settings.cron_jobs['environments_auto_stop_cron_worker']['job_class'] = 'Environments::AutoStopCronWorker'
......
......@@ -50,6 +50,8 @@
- 2
- - ci_batch_reset_minutes
- 1
- - ci_delete_objects
- 1
- - container_repository
- 1
- - create_commit_signature
......
# frozen_string_literal: true
class CreateCiDeletedObjects < ActiveRecord::Migration[6.0]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
disable_ddl_transaction!
def up
create_table :ci_deleted_objects, if_not_exists: true do |t|
t.integer :file_store, limit: 2, default: 1, null: false
t.datetime_with_timezone :pick_up_at, null: false, default: -> { 'now()' }, index: true
t.text :store_dir, null: false
# rubocop:disable Migration/AddLimitToTextColumns
# This column depends on the `file` column from `ci_job_artifacts` table
# which doesn't have a constraint limit on it.
t.text :file, null: false
# rubocop:enable Migration/AddLimitToTextColumns
end
add_text_limit(:ci_deleted_objects, :store_dir, 1024)
end
def down
drop_table :ci_deleted_objects
end
end
5f7a5fa697d769f5ccc9f0a6f19a91c8935f2559e019d50895574819494baf7e
\ No newline at end of file
......@@ -10074,6 +10074,24 @@ CREATE SEQUENCE ci_daily_build_group_report_results_id_seq
ALTER SEQUENCE ci_daily_build_group_report_results_id_seq OWNED BY ci_daily_build_group_report_results.id;
CREATE TABLE ci_deleted_objects (
id bigint NOT NULL,
file_store smallint DEFAULT 1 NOT NULL,
pick_up_at timestamp with time zone DEFAULT now() NOT NULL,
store_dir text NOT NULL,
file text NOT NULL,
CONSTRAINT check_5e151d6912 CHECK ((char_length(store_dir) <= 1024))
);
CREATE SEQUENCE ci_deleted_objects_id_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
ALTER SEQUENCE ci_deleted_objects_id_seq OWNED BY ci_deleted_objects.id;
CREATE TABLE ci_freeze_periods (
id bigint NOT NULL,
project_id bigint NOT NULL,
......@@ -17293,6 +17311,8 @@ ALTER TABLE ONLY ci_builds_runner_session ALTER COLUMN id SET DEFAULT nextval('c
ALTER TABLE ONLY ci_daily_build_group_report_results ALTER COLUMN id SET DEFAULT nextval('ci_daily_build_group_report_results_id_seq'::regclass);
ALTER TABLE ONLY ci_deleted_objects ALTER COLUMN id SET DEFAULT nextval('ci_deleted_objects_id_seq'::regclass);
ALTER TABLE ONLY ci_freeze_periods ALTER COLUMN id SET DEFAULT nextval('ci_freeze_periods_id_seq'::regclass);
ALTER TABLE ONLY ci_group_variables ALTER COLUMN id SET DEFAULT nextval('ci_group_variables_id_seq'::regclass);
......@@ -18282,6 +18302,9 @@ ALTER TABLE ONLY ci_builds_runner_session
ALTER TABLE ONLY ci_daily_build_group_report_results
ADD CONSTRAINT ci_daily_build_group_report_results_pkey PRIMARY KEY (id);
ALTER TABLE ONLY ci_deleted_objects
ADD CONSTRAINT ci_deleted_objects_pkey PRIMARY KEY (id);
ALTER TABLE ONLY ci_freeze_periods
ADD CONSTRAINT ci_freeze_periods_pkey PRIMARY KEY (id);
......@@ -19820,6 +19843,8 @@ CREATE UNIQUE INDEX index_ci_builds_runner_session_on_build_id ON ci_builds_runn
CREATE INDEX index_ci_daily_build_group_report_results_on_last_pipeline_id ON ci_daily_build_group_report_results USING btree (last_pipeline_id);
CREATE INDEX index_ci_deleted_objects_on_pick_up_at ON ci_deleted_objects USING btree (pick_up_at);
CREATE INDEX index_ci_freeze_periods_on_project_id ON ci_freeze_periods USING btree (project_id);
CREATE UNIQUE INDEX index_ci_group_variables_on_group_id_and_key ON ci_group_variables USING btree (group_id, key);
......
# frozen_string_literal: true
FactoryBot.define do
factory :ci_deleted_object, class: 'Ci::DeletedObject' do
pick_up_at { Time.current }
store_dir { SecureRandom.uuid }
file { fixture_file_upload(Rails.root.join('spec/fixtures/ci_build_artifacts.zip'), 'application/zip') }
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Ci::DeletedObject, :aggregate_failures do
describe 'attributes' do
it { is_expected.to respond_to(:file) }
it { is_expected.to respond_to(:store_dir) }
it { is_expected.to respond_to(:file_store) }
it { is_expected.to respond_to(:pick_up_at) }
end
describe '.bulk_import' do
context 'with data' do
let!(:artifact) { create(:ci_job_artifact, :archive, :expired) }
it 'imports data' do
expect { described_class.bulk_import(Ci::JobArtifact.all) }.to change { described_class.count }.by(1)
deleted_artifact = described_class.first
expect(deleted_artifact.file_store).to eq(artifact.file_store)
expect(deleted_artifact.store_dir).to eq(artifact.file.store_dir.to_s)
expect(deleted_artifact.file_identifier).to eq(artifact.file_identifier)
expect(deleted_artifact.pick_up_at).to eq(artifact.expire_at)
end
end
context 'with invalid data' do
let!(:artifact) { create(:ci_job_artifact) }
it 'does not import anything' do
expect(artifact.file_identifier).to be_nil
expect { described_class.bulk_import([artifact]) }
.not_to change { described_class.count }
end
end
context 'with empty data' do
it 'returns successfully' do
expect { described_class.bulk_import([]) }
.not_to change { described_class.count }
end
end
end
context 'ActiveRecord scopes' do
let_it_be(:not_ready) { create(:ci_deleted_object, pick_up_at: 1.day.from_now) }
let_it_be(:ready) { create(:ci_deleted_object, pick_up_at: 1.day.ago) }
describe '.ready_for_destruction' do
it 'returns objects that are ready' do
result = described_class.ready_for_destruction(2)
expect(result).to contain_exactly(ready)
end
end
describe '.lock_for_destruction' do
subject(:result) { described_class.lock_for_destruction(10) }
it 'returns objects that are ready' do
expect(result).to contain_exactly(ready)
end
it 'selects only the id' do
expect(result.select_values).to contain_exactly(:id)
end
it 'orders by pick_up_at' do
expect(result.order_values.map(&:to_sql))
.to contain_exactly("\"ci_deleted_objects\".\"pick_up_at\" ASC")
end
it 'applies limit' do
expect(result.limit_value).to eq(10)
end
it 'uses select for update' do
expect(result.locked?).to eq('FOR UPDATE SKIP LOCKED')
end
end
end
describe '#delete_file_from_storage' do
let(:object) { build(:ci_deleted_object) }
it 'does not raise errors' do
expect(object.file).to receive(:remove!).and_raise(StandardError)
expect(object.delete_file_from_storage).to be_falsy
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Ci::DeleteObjectsService, :aggregate_failure do
let(:service) { described_class.new }
let(:artifact) { create(:ci_job_artifact, :archive) }
let(:data) { [artifact] }
describe '#execute' do
before do
Ci::DeletedObject.bulk_import(data)
# We disable the check because the specs are wrapped in a transaction
allow(service).to receive(:transaction_open?).and_return(false)
end
subject(:execute) { service.execute }
it 'deletes records' do
expect { execute }.to change { Ci::DeletedObject.count }.by(-1)
end
it 'deletes files' do
expect { execute }.to change { artifact.file.exists? }
end
context 'when trying to execute without records' do
let(:data) { [] }
it 'does not change the number of objects' do
expect { execute }.not_to change { Ci::DeletedObject.count }
end
end
context 'when trying to remove the same file multiple times' do
let(:objects) { Ci::DeletedObject.all.to_a }
before do
expect(service).to receive(:load_next_batch).twice.and_return(objects)
end
it 'executes successfully' do
2.times { expect(service.execute).to be_truthy }
end
end
context 'with artifacts both ready and not ready for deletion' do
let(:data) { [] }
let_it_be(:past_ready) { create(:ci_deleted_object, pick_up_at: 2.days.ago) }
let_it_be(:ready) { create(:ci_deleted_object, pick_up_at: 1.day.ago) }
it 'skips records with pick_up_at in the future' do
not_ready = create(:ci_deleted_object, pick_up_at: 1.day.from_now)
expect { execute }.to change { Ci::DeletedObject.count }.from(3).to(1)
expect(not_ready.reload.present?).to be_truthy
end
it 'limits the number of records removed' do
stub_const("#{described_class}::BATCH_SIZE", 1)
expect { execute }.to change { Ci::DeletedObject.count }.by(-1)
end
it 'removes records in order' do
stub_const("#{described_class}::BATCH_SIZE", 1)
execute
expect { past_ready.reload }.to raise_error(ActiveRecord::RecordNotFound)
expect(ready.reload.present?).to be_truthy
end
it 'updates pick_up_at timestamp' do
allow(service).to receive(:destroy_everything)
execute
expect(past_ready.reload.pick_up_at).to be_like_time(10.minutes.from_now)
end
it 'does not delete objects for which file deletion has failed' do
expect(past_ready)
.to receive(:delete_file_from_storage)
.and_return(false)
expect(service)
.to receive(:load_next_batch)
.and_return([past_ready, ready])
expect { execute }.to change { Ci::DeletedObject.count }.from(2).to(1)
expect(past_ready.reload.present?).to be_truthy
end
end
context 'with an open database transaction' do
it 'raises an exception and does not remove records' do
expect(service).to receive(:transaction_open?).and_return(true)
expect { execute }
.to raise_error(Ci::DeleteObjectsService::TransactionInProgressError)
.and change { Ci::DeletedObject.count }.by(0)
end
end
end
describe '#remaining_batches_count' do
subject { service.remaining_batches_count(max_batch_count: 3) }
context 'when there is less than one batch size' do
before do
Ci::DeletedObject.bulk_import(data)
end
it { is_expected.to eq(1) }
end
context 'when there is more than one batch size' do
before do
objects_scope = double
expect(Ci::DeletedObject)
.to receive(:ready_for_destruction)
.and_return(objects_scope)
expect(objects_scope).to receive(:size).and_return(110)
end
it { is_expected.to eq(2) }
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Ci::DeleteObjectsWorker do
let(:worker) { described_class.new }
it { expect(described_class.idempotent?).to be_truthy }
describe '#perform' do
it 'executes a service' do
expect_next_instance_of(Ci::DeleteObjectsService) do |instance|
expect(instance).to receive(:execute)
expect(instance).to receive(:remaining_batches_count).once.and_call_original
end
worker.perform
end
end
describe '#max_running_jobs' do
using RSpec::Parameterized::TableSyntax
before do
stub_feature_flags(
ci_delete_objects_low_concurrency: low,
ci_delete_objects_medium_concurrency: medium,
ci_delete_objects_high_concurrency: high
)
end
subject(:max_running_jobs) { worker.max_running_jobs }
where(:low, :medium, :high, :expected) do
false | false | false | 0
true | true | true | 2
true | false | false | 2
false | true | false | 20
false | true | true | 20
false | false | true | 50
end
with_them do
it 'sets up concurrency depending on the feature flag' do
expect(max_running_jobs).to eq(expected)
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Ci::ScheduleDeleteObjectsCronWorker do
let(:worker) { described_class.new }
describe '#perform' do
it 'enqueues DeleteObjectsWorker jobs' do
expect(Ci::DeleteObjectsWorker).to receive(:perform_with_capacity)
worker.perform
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment