Commit e5e0c6a2 authored by David Fernandez's avatar David Fernandez

Add the registry migration guard job

This job is a recurrent one. Its responsibility is to monitor ongoing
migrations and detect stale ones. Stale migrations will be aborted

Changelog: added
parent efb701f1
......@@ -10,6 +10,7 @@ class ContainerRepository < ApplicationRecord
REQUIRING_CLEANUP_STATUSES = %i[cleanup_unscheduled cleanup_scheduled].freeze
IDLE_MIGRATION_STATES = %w[default pre_import_done import_done import_aborted import_skipped].freeze
ACTIVE_MIGRATION_STATES = %w[pre_importing importing].freeze
ABORTABLE_MIGRATION_STATES = (ACTIVE_MIGRATION_STATES + ['pre_import_done']).freeze
MIGRATION_STATES = (IDLE_MIGRATION_STATES + ACTIVE_MIGRATION_STATES).freeze
belongs_to :project
......@@ -17,7 +18,7 @@ class ContainerRepository < ApplicationRecord
validates :name, length: { minimum: 0, allow_nil: false }
validates :name, uniqueness: { scope: :project_id }
validates :migration_state, presence: true, inclusion: { in: MIGRATION_STATES }
validates :migration_aborted_in_state, inclusion: { in: ACTIVE_MIGRATION_STATES }, allow_nil: true
validates :migration_aborted_in_state, inclusion: { in: ABORTABLE_MIGRATION_STATES }, allow_nil: true
validates :migration_retries_count, presence: true,
numericality: { greater_than_or_equal_to: 0 },
......@@ -43,6 +44,9 @@ class ContainerRepository < ApplicationRecord
scope :search_by_name, ->(query) { fuzzy_search(query, [:name], use_minimum_char_limit: false) }
scope :waiting_for_cleanup, -> { where(expiration_policy_cleanup_status: WAITING_CLEANUP_STATUSES) }
scope :expiration_policy_started_at_nil_or_before, ->(timestamp) { where('expiration_policy_started_at < ? OR expiration_policy_started_at IS NULL', timestamp) }
scope :with_migration_import_started_at_nil_or_before, ->(timestamp) { where("COALESCE(migration_import_started_at, '01-01-1970') < ?", timestamp) }
scope :with_migration_pre_import_started_at_nil_or_before, ->(timestamp) { where("COALESCE(migration_pre_import_started_at, '01-01-1970') < ?", timestamp) }
scope :with_migration_pre_import_done_at_nil_or_before, ->(timestamp) { where("COALESCE(migration_pre_import_done_at, '01-01-1970') < ?", timestamp) }
scope :with_stale_ongoing_cleanup, ->(threshold) { cleanup_ongoing.where('expiration_policy_started_at < ?', threshold) }
state_machine :migration_state, initial: :default do
......@@ -96,7 +100,7 @@ class ContainerRepository < ApplicationRecord
end
event :abort_import do
transition ACTIVE_MIGRATION_STATES.map(&:to_sym) => :import_aborted
transition ABORTABLE_MIGRATION_STATES.map(&:to_sym) => :import_aborted
end
event :skip_import do
......@@ -181,6 +185,22 @@ class ContainerRepository < ApplicationRecord
with_enabled_policy.cleanup_unfinished
end
def self.with_stale_migration(before_timestamp)
stale_pre_importing = with_migration_states(:pre_importing)
.with_migration_pre_import_started_at_nil_or_before(before_timestamp)
stale_pre_import_done = with_migration_states(:pre_import_done)
.with_migration_pre_import_done_at_nil_or_before(before_timestamp)
stale_importing = with_migration_states(:importing)
.with_migration_import_started_at_nil_or_before(before_timestamp)
union = ::Gitlab::SQL::Union.new([
stale_pre_importing,
stale_pre_import_done,
stale_importing
])
from("(#{union.to_sql}) #{ContainerRepository.table_name}")
end
def skip_import(reason:)
self.migration_skipped_reason = reason
......
......@@ -273,6 +273,15 @@
:weight: 1
:idempotent:
:tags: []
- :name: cronjob:container_registry_migration_guard
:worker_name: ContainerRegistry::Migration::GuardWorker
:feature_category: :container_registry
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: cronjob:database_batched_background_migration
:worker_name: Database::BatchedBackgroundMigrationWorker
:feature_category: :database
......
# frozen_string_literal: true
module ContainerRegistry
module Migration
class GuardWorker
include ApplicationWorker
# This is a general worker with no context.
# It is not scoped to a project, user or group.
# We don't have a context.
include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
data_consistency :always
feature_category :container_registry
urgency :low
worker_resource_boundary :unknown
deduplicate :until_executed
idempotent!
def perform
return unless Gitlab.com?
repositories = ::ContainerRepository.with_stale_migration(step_before_timestamp)
.limit(max_capacity)
# the #to_a is safe as the amount of entries is limited.
# In addition, we're calling #each in the next line and we don't want two different SQL queries for these two lines
log_extra_metadata_on_done(:stale_migrations_count, repositories.to_a.size)
repositories.each do |repository|
repository.abort_import
end
end
private
def step_before_timestamp
::ContainerRegistry::Migration.max_step_duration.seconds.ago
end
def max_capacity
# doubling the actual capacity to prevent issues in case the capacity
# is not properly applied
::ContainerRegistry::Migration.capacity * 2
end
end
end
end
......@@ -539,6 +539,9 @@ Settings.cron_jobs['namespaces_prune_aggregation_schedules_worker']['job_class']
Settings.cron_jobs['container_expiration_policy_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['container_expiration_policy_worker']['cron'] ||= '50 * * * *'
Settings.cron_jobs['container_expiration_policy_worker']['job_class'] = 'ContainerExpirationPolicyWorker'
Settings.cron_jobs['container_registry_migration_guard_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['container_registry_migration_guard_worker']['cron'] ||= '*/10 * * * *'
Settings.cron_jobs['container_registry_migration_guard_worker']['job_class'] = 'ContainerRegistry::Migration::GuardWorker'
Settings.cron_jobs['image_ttl_group_policy_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['image_ttl_group_policy_worker']['cron'] ||= '40 0 * * *'
Settings.cron_jobs['image_ttl_group_policy_worker']['job_class'] = 'DependencyProxy::ImageTtlGroupPolicyWorker'
......@@ -548,7 +551,6 @@ Settings.cron_jobs['cleanup_dependency_proxy_worker']['job_class'] = 'Dependency
Settings.cron_jobs['cleanup_package_registry_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['cleanup_package_registry_worker']['cron'] ||= '20 0,12 * * *'
Settings.cron_jobs['cleanup_package_registry_worker']['job_class'] = 'Packages::CleanupPackageRegistryWorker'
Settings.cron_jobs['x509_issuer_crl_check_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['x509_issuer_crl_check_worker']['cron'] ||= '30 1 * * *'
Settings.cron_jobs['x509_issuer_crl_check_worker']['job_class'] = 'X509IssuerCrlCheckWorker'
......
# frozen_string_literal: true
class AddMigrationIndexesToContainerRepositories < Gitlab::Database::Migration[1.0]
PRE_IMPORTING_INDEX = 'idx_container_repos_on_pre_import_started_at_when_pre_importing'
PRE_IMPORT_DONE_INDEX = 'idx_container_repos_on_pre_import_done_at_when_pre_import_done'
IMPORTING_INDEX = 'idx_container_repos_on_import_started_at_when_importing'
disable_ddl_transaction!
def up
add_concurrent_index :container_repositories, :migration_pre_import_started_at, name: PRE_IMPORTING_INDEX, where: "migration_state = 'pre_importing'"
add_concurrent_index :container_repositories, :migration_pre_import_done_at, name: PRE_IMPORT_DONE_INDEX, where: "migration_state = 'pre_import_done'"
add_concurrent_index :container_repositories, :migration_import_started_at, name: IMPORTING_INDEX, where: "migration_state = 'importing'"
end
def down
remove_concurrent_index_by_name :container_repositories, IMPORTING_INDEX
remove_concurrent_index_by_name :container_repositories, PRE_IMPORT_DONE_INDEX
remove_concurrent_index_by_name :container_repositories, PRE_IMPORTING_INDEX
end
end
3bcc97592e8e329e39917deffae6619e69215930a688bebad2949f69155b53f9
\ No newline at end of file
......@@ -25243,6 +25243,12 @@ CREATE INDEX idx_container_exp_policies_on_project_id_next_run_at_enabled ON con
CREATE INDEX idx_container_repos_on_exp_cleanup_status_project_id_start_date ON container_repositories USING btree (expiration_policy_cleanup_status, project_id, expiration_policy_started_at);
CREATE INDEX idx_container_repos_on_import_started_at_when_importing ON container_repositories USING btree (migration_import_started_at) WHERE (migration_state = 'importing'::text);
CREATE INDEX idx_container_repos_on_pre_import_done_at_when_pre_import_done ON container_repositories USING btree (migration_pre_import_done_at) WHERE (migration_state = 'pre_import_done'::text);
CREATE INDEX idx_container_repos_on_pre_import_started_at_when_pre_importing ON container_repositories USING btree (migration_pre_import_started_at) WHERE (migration_state = 'pre_importing'::text);
CREATE INDEX idx_deployment_clusters_on_cluster_id_and_kubernetes_namespace ON deployment_clusters USING btree (cluster_id, kubernetes_namespace);
CREATE INDEX idx_devops_adoption_segments_namespace_end_time ON analytics_devops_adoption_snapshots USING btree (namespace_id, end_time);
# frozen_string_literal: true
module ContainerRegistry
class Migration
module Migration
class << self
delegate :container_registry_import_max_tags_count, to: ::Gitlab::CurrentSettings
delegate :container_registry_import_max_retries, to: ::Gitlab::CurrentSettings
......
......@@ -37,12 +37,12 @@ RSpec.describe ContainerRepository, :aggregate_failures do
it { is_expected.to validate_presence_of(:migration_retries_count) }
it { is_expected.to validate_numericality_of(:migration_retries_count).is_greater_than_or_equal_to(0) }
it { is_expected.to validate_inclusion_of(:migration_aborted_in_state).in_array(ContainerRepository::ACTIVE_MIGRATION_STATES) }
it { is_expected.to validate_inclusion_of(:migration_aborted_in_state).in_array(described_class::ABORTABLE_MIGRATION_STATES) }
it { is_expected.to allow_value(nil).for(:migration_aborted_in_state) }
context 'migration_state' do
it { is_expected.to validate_presence_of(:migration_state) }
it { is_expected.to validate_inclusion_of(:migration_state).in_array(ContainerRepository::MIGRATION_STATES) }
it { is_expected.to validate_inclusion_of(:migration_state).in_array(described_class::MIGRATION_STATES) }
describe 'pre_importing' do
it 'validates expected attributes' do
......@@ -161,7 +161,7 @@ RSpec.describe ContainerRepository, :aggregate_failures do
end
shared_examples 'transitioning from allowed states' do |allowed_states|
ContainerRepository::MIGRATION_STATES.each do |state|
described_class::MIGRATION_STATES.each do |state|
result = allowed_states.include?(state)
context "when transitioning from #{state}" do
......@@ -283,7 +283,7 @@ RSpec.describe ContainerRepository, :aggregate_failures do
subject { repository.abort_import }
it_behaves_like 'transitioning from allowed states', %w[pre_importing importing]
it_behaves_like 'transitioning from allowed states', %w[pre_importing pre_import_done importing]
it 'sets migration_aborted_at and migration_aborted_at and increments the retry count' do
expect { subject }.to change { repository.migration_aborted_at }
......@@ -634,7 +634,7 @@ RSpec.describe ContainerRepository, :aggregate_failures do
let(:path) { ContainerRegistry::Path.new(project.full_path + '/some/image') }
it 'does not throw validation errors and only creates one repository' do
expect { repository_creation_race(path) }.to change { ContainerRepository.count }.by(1)
expect { repository_creation_race(path) }.to change { described_class.count }.by(1)
end
it 'retrieves a persisted repository for all concurrent calls' do
......@@ -652,7 +652,7 @@ RSpec.describe ContainerRepository, :aggregate_failures do
Thread.new do
true while wait_for_it
::ContainerRepository.find_or_create_from_path(path)
described_class.find_or_create_from_path(path)
end
end
wait_for_it = false
......@@ -788,6 +788,36 @@ RSpec.describe ContainerRepository, :aggregate_failures do
it { is_expected.to contain_exactly(repository1, repository2, repository4) }
end
describe '.with_migration_import_started_at_nil_or_before' do
let_it_be(:repository1) { create(:container_repository, migration_import_started_at: 5.minutes.ago) }
let_it_be(:repository2) { create(:container_repository, migration_import_started_at: nil) }
let_it_be(:repository3) { create(:container_repository, migration_import_started_at: 10.minutes.ago) }
subject { described_class.with_migration_import_started_at_nil_or_before(7.minutes.ago) }
it { is_expected.to contain_exactly(repository2, repository3) }
end
describe '.with_migration_pre_import_started_at_nil_or_before' do
let_it_be(:repository1) { create(:container_repository, migration_pre_import_started_at: 5.minutes.ago) }
let_it_be(:repository2) { create(:container_repository, migration_pre_import_started_at: nil) }
let_it_be(:repository3) { create(:container_repository, migration_pre_import_started_at: 10.minutes.ago) }
subject { described_class.with_migration_pre_import_started_at_nil_or_before(7.minutes.ago) }
it { is_expected.to contain_exactly(repository2, repository3) }
end
describe '.with_migration_pre_import_done_at_nil_or_before' do
let_it_be(:repository1) { create(:container_repository, migration_pre_import_done_at: 5.minutes.ago) }
let_it_be(:repository2) { create(:container_repository, migration_pre_import_done_at: nil) }
let_it_be(:repository3) { create(:container_repository, migration_pre_import_done_at: 10.minutes.ago) }
subject { described_class.with_migration_pre_import_done_at_nil_or_before(7.minutes.ago) }
it { is_expected.to contain_exactly(repository2, repository3) }
end
describe '.with_stale_ongoing_cleanup' do
let_it_be(:repository1) { create(:container_repository, :cleanup_ongoing, expiration_policy_started_at: 1.day.ago) }
let_it_be(:repository2) { create(:container_repository, :cleanup_ongoing, expiration_policy_started_at: 25.minutes.ago) }
......@@ -837,7 +867,7 @@ RSpec.describe ContainerRepository, :aggregate_failures do
describe '#migration_in_active_state?' do
subject { container_repository.migration_in_active_state? }
ContainerRepository::MIGRATION_STATES.each do |state|
described_class::MIGRATION_STATES.each do |state|
context "when in #{state} migration_state" do
let(:container_repository) { create(:container_repository, state.to_sym)}
......@@ -849,7 +879,7 @@ RSpec.describe ContainerRepository, :aggregate_failures do
describe '#migration_importing?' do
subject { container_repository.migration_importing? }
ContainerRepository::MIGRATION_STATES.each do |state|
described_class::MIGRATION_STATES.each do |state|
context "when in #{state} migration_state" do
let(:container_repository) { create(:container_repository, state.to_sym)}
......@@ -861,7 +891,7 @@ RSpec.describe ContainerRepository, :aggregate_failures do
describe '#migration_pre_importing?' do
subject { container_repository.migration_pre_importing? }
ContainerRepository::MIGRATION_STATES.each do |state|
described_class::MIGRATION_STATES.each do |state|
context "when in #{state} migration_state" do
let(:container_repository) { create(:container_repository, state.to_sym)}
......@@ -922,4 +952,34 @@ RSpec.describe ContainerRepository, :aggregate_failures do
end
end
end
describe '.with_stale_migration' do
let_it_be(:repository) { create(:container_repository) }
let_it_be(:stale_pre_importing_old_timestamp) { create(:container_repository, :pre_importing, migration_pre_import_started_at: 10.minutes.ago) }
let_it_be(:stale_pre_importing_nil_timestamp) { create(:container_repository, :pre_importing).tap { |r| r.update_column(:migration_pre_import_started_at, nil) } }
let_it_be(:stale_pre_importing_recent_timestamp) { create(:container_repository, :pre_importing, migration_pre_import_started_at: 2.minutes.ago) }
let_it_be(:stale_pre_import_done_old_timestamp) { create(:container_repository, :pre_import_done, migration_pre_import_done_at: 10.minutes.ago) }
let_it_be(:stale_pre_import_done_nil_timestamp) { create(:container_repository, :pre_import_done).tap { |r| r.update_column(:migration_pre_import_done_at, nil) } }
let_it_be(:stale_pre_import_done_recent_timestamp) { create(:container_repository, :pre_import_done, migration_pre_import_done_at: 2.minutes.ago) }
let_it_be(:stale_importing_old_timestamp) { create(:container_repository, :importing, migration_import_started_at: 10.minutes.ago) }
let_it_be(:stale_importing_nil_timestamp) { create(:container_repository, :importing).tap { |r| r.update_column(:migration_import_started_at, nil) } }
let_it_be(:stale_importing_recent_timestamp) { create(:container_repository, :importing, migration_import_started_at: 2.minutes.ago) }
let(:stale_migrations) do
[
stale_pre_importing_old_timestamp,
stale_pre_importing_nil_timestamp,
stale_pre_import_done_old_timestamp,
stale_pre_import_done_nil_timestamp,
stale_importing_old_timestamp,
stale_importing_nil_timestamp
]
end
subject { described_class.with_stale_migration(5.minutes.ago) }
it { is_expected.to contain_exactly(*stale_migrations) }
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe ContainerRegistry::Migration::GuardWorker, :aggregate_failures do
include_context 'container registry client'
let(:worker) { described_class.new }
describe '#perform' do
let(:pre_importing_migrations) { ::ContainerRepository.with_migration_states(:pre_importing) }
let(:pre_import_done_migrations) { ::ContainerRepository.with_migration_states(:pre_import_done) }
let(:importing_migrations) { ::ContainerRepository.with_migration_states(:importing) }
let(:import_aborted_migrations) { ::ContainerRepository.with_migration_states(:import_aborted) }
let(:import_done_migrations) { ::ContainerRepository.with_migration_states(:import_done) }
subject { worker.perform }
before do
stub_container_registry_config(enabled: true, api_url: registry_api_url, key: 'spec/fixtures/x509_certificate_pk.key')
allow(::ContainerRegistry::Migration).to receive(:max_step_duration).and_return(5.minutes)
end
context 'on gitlab.com' do
before do
allow(::Gitlab).to receive(:com?).and_return(true)
end
context 'with no stale migrations' do
it_behaves_like 'an idempotent worker'
it 'will not update any migration state' do
expect(worker).to receive(:log_extra_metadata_on_done).with(:stale_migrations_count, 0)
expect { subject }
.to not_change(pre_importing_migrations, :count)
.and not_change(pre_import_done_migrations, :count)
.and not_change(importing_migrations, :count)
.and not_change(import_aborted_migrations, :count)
end
end
context 'with pre_importing stale migrations' do
let(:ongoing_migration) { create(:container_repository, :pre_importing) }
let(:stale_migration) { create(:container_repository, :pre_importing, migration_pre_import_started_at: 10.minutes.ago) }
it 'will abort the migration' do
expect(worker).to receive(:log_extra_metadata_on_done).with(:stale_migrations_count, 1)
expect { subject }
.to change(pre_importing_migrations, :count).by(-1)
.and not_change(pre_import_done_migrations, :count)
.and not_change(importing_migrations, :count)
.and not_change(import_done_migrations, :count)
.and change(import_aborted_migrations, :count).by(1)
.and change { stale_migration.reload.migration_state }.from('pre_importing').to('import_aborted')
.and not_change { ongoing_migration.migration_state }
end
end
context 'with pre_import_done stale migrations' do
let(:ongoing_migration) { create(:container_repository, :pre_import_done) }
let(:stale_migration) { create(:container_repository, :pre_import_done, migration_pre_import_done_at: 10.minutes.ago) }
before do
allow(::ContainerRegistry::Migration).to receive(:max_step_duration).and_return(5.minutes)
expect(worker).to receive(:log_extra_metadata_on_done).with(:stale_migrations_count, 1)
end
it 'will abort the migration' do
expect { subject }
.to not_change(pre_importing_migrations, :count)
.and change(pre_import_done_migrations, :count).by(-1)
.and not_change(importing_migrations, :count)
.and not_change(import_done_migrations, :count)
.and change(import_aborted_migrations, :count).by(1)
.and change { stale_migration.reload.migration_state }.from('pre_import_done').to('import_aborted')
.and not_change { ongoing_migration.migration_state }
end
end
context 'with importing stale migrations' do
let(:ongoing_migration) { create(:container_repository, :importing) }
let(:stale_migration) { create(:container_repository, :importing, migration_import_started_at: 10.minutes.ago) }
before do
allow(::ContainerRegistry::Migration).to receive(:max_step_duration).and_return(5.minutes)
expect(worker).to receive(:log_extra_metadata_on_done).with(:stale_migrations_count, 1)
end
it 'will abort the migration' do
expect { subject }
.to not_change(pre_importing_migrations, :count)
.and not_change(pre_import_done_migrations, :count)
.and change(importing_migrations, :count).by(-1)
.and not_change(import_done_migrations, :count)
.and change(import_aborted_migrations, :count).by(1)
.and change { stale_migration.reload.migration_state }.from('importing').to('import_aborted')
.and not_change { ongoing_migration.migration_state }
end
end
end
context 'not on gitlab.com' do
before do
allow(::Gitlab).to receive(:com?).and_return(false)
end
it 'is a no op' do
expect(::ContainerRepository).not_to receive(:with_stale_migration)
expect(worker).not_to receive(:log_extra_metadata_on_done)
subject
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment