Commit 9ebf6383 authored by Steve Abrams's avatar Steve Abrams

Registry import enqueuer

Worker to queue container repository imports

Changelog: added
parent deafbf11
...@@ -5,14 +5,17 @@ class ContainerRepository < ApplicationRecord ...@@ -5,14 +5,17 @@ class ContainerRepository < ApplicationRecord
include Gitlab::SQL::Pattern include Gitlab::SQL::Pattern
include EachBatch include EachBatch
include Sortable include Sortable
include AfterCommitQueue
WAITING_CLEANUP_STATUSES = %i[cleanup_scheduled cleanup_unfinished].freeze WAITING_CLEANUP_STATUSES = %i[cleanup_scheduled cleanup_unfinished].freeze
REQUIRING_CLEANUP_STATUSES = %i[cleanup_unscheduled cleanup_scheduled].freeze REQUIRING_CLEANUP_STATUSES = %i[cleanup_unscheduled cleanup_scheduled].freeze
IDLE_MIGRATION_STATES = %w[default pre_import_done import_done import_aborted import_skipped].freeze IDLE_MIGRATION_STATES = %w[default pre_import_done import_done import_aborted import_skipped].freeze
ACTIVE_MIGRATION_STATES = %w[pre_importing importing].freeze ACTIVE_MIGRATION_STATES = %w[pre_importing importing].freeze
ABORTABLE_MIGRATION_STATES = (ACTIVE_MIGRATION_STATES + ['pre_import_done']).freeze ABORTABLE_MIGRATION_STATES = (ACTIVE_MIGRATION_STATES + %w[pre_import_done default]).freeze
MIGRATION_STATES = (IDLE_MIGRATION_STATES + ACTIVE_MIGRATION_STATES).freeze MIGRATION_STATES = (IDLE_MIGRATION_STATES + ACTIVE_MIGRATION_STATES).freeze
TooManyImportsError = Class.new(StandardError)
belongs_to :project belongs_to :project
validates :name, length: { minimum: 0, allow_nil: false } validates :name, length: { minimum: 0, allow_nil: false }
...@@ -48,6 +51,32 @@ class ContainerRepository < ApplicationRecord ...@@ -48,6 +51,32 @@ class ContainerRepository < ApplicationRecord
scope :with_migration_pre_import_started_at_nil_or_before, ->(timestamp) { where("COALESCE(migration_pre_import_started_at, '01-01-1970') < ?", timestamp) } scope :with_migration_pre_import_started_at_nil_or_before, ->(timestamp) { where("COALESCE(migration_pre_import_started_at, '01-01-1970') < ?", timestamp) }
scope :with_migration_pre_import_done_at_nil_or_before, ->(timestamp) { where("COALESCE(migration_pre_import_done_at, '01-01-1970') < ?", timestamp) } scope :with_migration_pre_import_done_at_nil_or_before, ->(timestamp) { where("COALESCE(migration_pre_import_done_at, '01-01-1970') < ?", timestamp) }
scope :with_stale_ongoing_cleanup, ->(threshold) { cleanup_ongoing.where('expiration_policy_started_at < ?', threshold) } scope :with_stale_ongoing_cleanup, ->(threshold) { cleanup_ongoing.where('expiration_policy_started_at < ?', threshold) }
scope :import_in_process, -> { where(migration_state: %w[pre_importing pre_import_done importing]) }
scope :recently_done_migration_step, -> do
where(migration_state: %w[import_done pre_import_done import_aborted])
.order(Arel.sql('GREATEST(migration_pre_import_done_at, migration_import_done_at, migration_aborted_at) DESC'))
end
scope :ready_for_import, -> do
# There is no yaml file for the container_registry_phase_2_deny_list
# feature flag since it is only accessed in this query.
# https://gitlab.com/gitlab-org/gitlab/-/issues/350543 tracks the rollout and
# removal of this feature flag.
joins(:project).where(
migration_state: [:default],
created_at: ...ContainerRegistry::Migration.created_before
).with_target_import_tier
.where(
"NOT EXISTS (
SELECT 1
FROM feature_gates
WHERE feature_gates.feature_key = 'container_registry_phase_2_deny_list'
AND feature_gates.key = 'actors'
AND feature_gates.value = concat('Group:', projects.namespace_id)
)"
)
end
state_machine :migration_state, initial: :default do state_machine :migration_state, initial: :default do
state :pre_importing do state :pre_importing do
...@@ -104,7 +133,7 @@ class ContainerRepository < ApplicationRecord ...@@ -104,7 +133,7 @@ class ContainerRepository < ApplicationRecord
end end
event :skip_import do event :skip_import do
transition %i[default pre_importing importing] => :import_skipped transition ABORTABLE_MIGRATION_STATES.map(&:to_sym) => :import_skipped
end end
event :retry_pre_import do event :retry_pre_import do
...@@ -121,7 +150,9 @@ class ContainerRepository < ApplicationRecord ...@@ -121,7 +150,9 @@ class ContainerRepository < ApplicationRecord
end end
after_transition any => :pre_importing do |container_repository| after_transition any => :pre_importing do |container_repository|
container_repository.abort_import unless container_repository.migration_pre_import == :ok container_repository.try_import do
container_repository.migration_pre_import
end
end end
before_transition pre_importing: :pre_import_done do |container_repository| before_transition pre_importing: :pre_import_done do |container_repository|
...@@ -134,7 +165,9 @@ class ContainerRepository < ApplicationRecord ...@@ -134,7 +165,9 @@ class ContainerRepository < ApplicationRecord
end end
after_transition any => :importing do |container_repository| after_transition any => :importing do |container_repository|
container_repository.abort_import unless container_repository.migration_import == :ok container_repository.try_import do
container_repository.migration_import
end
end end
before_transition importing: :import_done do |container_repository| before_transition importing: :import_done do |container_repository|
...@@ -156,9 +189,10 @@ class ContainerRepository < ApplicationRecord ...@@ -156,9 +189,10 @@ class ContainerRepository < ApplicationRecord
container_repository.migration_skipped_at = Time.zone.now container_repository.migration_skipped_at = Time.zone.now
end end
before_transition any => %i[import_done import_aborted] do before_transition any => %i[import_done import_aborted] do |container_repository|
# EnqueuerJob.enqueue perform_async or perform_in depending on the speed FF container_repository.run_after_commit do
# To be implemented in https://gitlab.com/gitlab-org/gitlab/-/issues/349744 ::ContainerRegistry::Migration::EnqueuerWorker.perform_async
end
end end
end end
...@@ -201,6 +235,14 @@ class ContainerRepository < ApplicationRecord ...@@ -201,6 +235,14 @@ class ContainerRepository < ApplicationRecord
from("(#{union.to_sql}) #{ContainerRepository.table_name}") from("(#{union.to_sql}) #{ContainerRepository.table_name}")
end end
def self.with_target_import_tier
# overridden in ee
#
# Repositories are being migrated by tier on Saas, so we need to
# filter by plan/subscription which is not available in FOSS
all
end
def skip_import(reason:) def skip_import(reason:)
self.migration_skipped_reason = reason self.migration_skipped_reason = reason
...@@ -230,6 +272,41 @@ class ContainerRepository < ApplicationRecord ...@@ -230,6 +272,41 @@ class ContainerRepository < ApplicationRecord
finish_pre_import && start_import finish_pre_import && start_import
end end
def retry_migration
return if migration_import_done_at
if migration_pre_import_done_at
retry_import
else
retry_pre_import
end
end
def try_import
raise ArgumentError, 'block not given' unless block_given?
try_count = 0
begin
try_count += 1
return true if yield == :ok
abort_import
false
rescue TooManyImportsError
if try_count <= ::ContainerRegistry::Migration.start_max_retries
sleep 0.1 * try_count
retry
else
abort_import
false
end
end
end
def last_import_step_done_at
[migration_pre_import_done_at, migration_import_done_at, migration_aborted_at].compact.max
end
# rubocop: disable CodeReuse/ServiceClass # rubocop: disable CodeReuse/ServiceClass
def registry def registry
@registry ||= begin @registry ||= begin
...@@ -327,13 +404,19 @@ class ContainerRepository < ApplicationRecord ...@@ -327,13 +404,19 @@ class ContainerRepository < ApplicationRecord
def migration_pre_import def migration_pre_import
return :error unless gitlab_api_client.supports_gitlab_api? return :error unless gitlab_api_client.supports_gitlab_api?
gitlab_api_client.pre_import_repository(self.path) response = gitlab_api_client.pre_import_repository(self.path)
raise TooManyImportsError if response == :too_many_imports
response
end end
def migration_import def migration_import
return :error unless gitlab_api_client.supports_gitlab_api? return :error unless gitlab_api_client.supports_gitlab_api?
gitlab_api_client.import_repository(self.path) response = gitlab_api_client.import_repository(self.path)
raise TooManyImportsError if response == :too_many_imports
response
end end
def self.build_from_path(path) def self.build_from_path(path)
......
...@@ -273,6 +273,15 @@ ...@@ -273,6 +273,15 @@
:weight: 1 :weight: 1
:idempotent: :idempotent:
:tags: [] :tags: []
- :name: cronjob:container_registry_migration_enqueuer
:worker_name: ContainerRegistry::Migration::EnqueuerWorker
:feature_category: :container_registry
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: cronjob:container_registry_migration_guard - :name: cronjob:container_registry_migration_guard
:worker_name: ContainerRegistry::Migration::GuardWorker :worker_name: ContainerRegistry::Migration::GuardWorker
:feature_category: :container_registry :feature_category: :container_registry
......
# frozen_string_literal: true
module ContainerRegistry
module Migration
class EnqueuerWorker
include ApplicationWorker
include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
include Gitlab::Utils::StrongMemoize
data_consistency :always
feature_category :container_registry
urgency :low
deduplicate :until_executing, including_scheduled: true
idempotent!
def perform
return unless migration.enabled?
return unless below_capacity?
return unless waiting_time_passed?
re_enqueue_if_capacity if handle_aborted_migration || handle_next_migration
rescue StandardError => e
Gitlab::ErrorTracking.log_exception(
e,
next_repository_id: next_repository&.id,
next_aborted_repository_id: next_aborted_repository&.id
)
next_repository&.abort_import
end
private
def handle_aborted_migration
return unless next_aborted_repository&.retry_migration
log_extra_metadata_on_done(:container_repository_id, next_aborted_repository.id)
log_extra_metadata_on_done(:import_type, 'retry')
true
end
def handle_next_migration
return unless next_repository
# We return true because the repository was successfully processed (migration_state is changed)
return true if tag_count_too_high?
return unless next_repository.start_pre_import
log_extra_metadata_on_done(:container_repository_id, next_repository.id)
log_extra_metadata_on_done(:import_type, 'next')
true
end
def tag_count_too_high?
return false unless next_repository.tags_count > migration.max_tags_count
next_repository.skip_import(reason: :too_many_tags)
true
end
def below_capacity?
current_capacity <= maximum_capacity
end
def waiting_time_passed?
delay = migration.enqueue_waiting_time
return true if delay == 0
return true unless last_step_completed_repository
last_step_completed_repository.last_import_step_done_at < Time.zone.now - delay
end
def current_capacity
strong_memoize(:current_capacity) do
ContainerRepository.with_migration_states(
%w[pre_importing pre_import_done importing]
).count
end
end
def maximum_capacity
migration.capacity
end
def next_repository
strong_memoize(:next_repository) do
ContainerRepository.ready_for_import.take # rubocop:disable CodeReuse/ActiveRecord
end
end
def next_aborted_repository
strong_memoize(:next_aborted_repository) do
ContainerRepository.with_migration_state('import_aborted').take # rubocop:disable CodeReuse/ActiveRecord
end
end
def last_step_completed_repository
strong_memoize(:last_step_completed_repository) do
ContainerRepository.recently_done_migration_step.first
end
end
def migration
::ContainerRegistry::Migration
end
def re_enqueue_if_capacity
return unless current_capacity < maximum_capacity
self.class.perform_async
end
end
end
end
---
name: container_registry_migration_limit_gitlab_org
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/78613
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/350543
milestone: '14.8'
type: development
group: group::package
default_enabled: false
...@@ -545,6 +545,9 @@ Settings.cron_jobs['container_registry_migration_guard_worker']['job_class'] = ' ...@@ -545,6 +545,9 @@ Settings.cron_jobs['container_registry_migration_guard_worker']['job_class'] = '
Settings.cron_jobs['container_registry_migration_observer_worker'] ||= Settingslogic.new({}) Settings.cron_jobs['container_registry_migration_observer_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['container_registry_migration_observer_worker']['cron'] ||= '*/30 * * * *' Settings.cron_jobs['container_registry_migration_observer_worker']['cron'] ||= '*/30 * * * *'
Settings.cron_jobs['container_registry_migration_observer_worker']['job_class'] = 'ContainerRegistry::Migration::ObserverWorker' Settings.cron_jobs['container_registry_migration_observer_worker']['job_class'] = 'ContainerRegistry::Migration::ObserverWorker'
Settings.cron_jobs['container_registry_migration_enqueuer_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['container_registry_migration_enqueuer_worker']['cron'] ||= '45 */1 * * *'
Settings.cron_jobs['container_registry_migration_enqueuer_worker']['job_class'] = 'ContainerRegistry::Migration::EnqueuerWorker'
Settings.cron_jobs['image_ttl_group_policy_worker'] ||= Settingslogic.new({}) Settings.cron_jobs['image_ttl_group_policy_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['image_ttl_group_policy_worker']['cron'] ||= '40 0 * * *' Settings.cron_jobs['image_ttl_group_policy_worker']['cron'] ||= '40 0 * * *'
Settings.cron_jobs['image_ttl_group_policy_worker']['job_class'] = 'DependencyProxy::ImageTtlGroupPolicyWorker' Settings.cron_jobs['image_ttl_group_policy_worker']['job_class'] = 'DependencyProxy::ImageTtlGroupPolicyWorker'
......
# frozen_string_literal: true
class AddIndexOnMigrationStateAndImportDoneAtToContainerRepositories < Gitlab::Database::Migration[1.0]
INDEX_NAME = 'index_container_repositories_on_migration_state_import_done_at'
disable_ddl_transaction!
def up
add_concurrent_index :container_repositories, [:migration_state, :migration_import_done_at], name: INDEX_NAME
end
def down
remove_concurrent_index_by_name :container_repositories, INDEX_NAME
end
end
# frozen_string_literal: true
class AddIndexOnGreatestDoneAtToContainerRepositories < Gitlab::Database::Migration[1.0]
INDEX_NAME = 'index_container_repositories_on_greatest_done_at'
disable_ddl_transaction!
def up
add_concurrent_index :container_repositories,
'GREATEST(migration_pre_import_done_at, migration_import_done_at, migration_aborted_at)',
where: "migration_state IN ('import_done', 'pre_import_done', 'import_aborted')",
name: INDEX_NAME
end
def down
remove_concurrent_index_by_name :container_repositories, INDEX_NAME
end
end
087338f0b438d2aa33bc22bd3973d818c5d1f40948525d95181751722158605b
\ No newline at end of file
efecc3c6468d8a5036352f5b62e8d70de835d1beb4e45ba6d3906906d0317848
\ No newline at end of file
...@@ -26066,6 +26066,10 @@ CREATE INDEX index_composer_cache_files_where_namespace_id_is_null ON packages_c ...@@ -26066,6 +26066,10 @@ CREATE INDEX index_composer_cache_files_where_namespace_id_is_null ON packages_c
CREATE INDEX index_container_expiration_policies_on_next_run_at_and_enabled ON container_expiration_policies USING btree (next_run_at, enabled); CREATE INDEX index_container_expiration_policies_on_next_run_at_and_enabled ON container_expiration_policies USING btree (next_run_at, enabled);
CREATE INDEX index_container_repositories_on_greatest_done_at ON container_repositories USING btree (GREATEST(migration_pre_import_done_at, migration_import_done_at, migration_aborted_at)) WHERE (migration_state = ANY (ARRAY['import_done'::text, 'pre_import_done'::text, 'import_aborted'::text]));
CREATE INDEX index_container_repositories_on_migration_state_import_done_at ON container_repositories USING btree (migration_state, migration_import_done_at);
CREATE INDEX index_container_repositories_on_project_id ON container_repositories USING btree (project_id); CREATE INDEX index_container_repositories_on_project_id ON container_repositories USING btree (project_id);
CREATE INDEX index_container_repositories_on_project_id_and_id ON container_repositories USING btree (project_id, id); CREATE INDEX index_container_repositories_on_project_id_and_id ON container_repositories USING btree (project_id, id);
...@@ -4,11 +4,15 @@ module EE ...@@ -4,11 +4,15 @@ module EE
module ContainerRepository module ContainerRepository
extend ActiveSupport::Concern extend ActiveSupport::Concern
GITLAB_ORG_NAMESPACE = 'gitlab-org'
prepended do prepended do
scope :project_id_in, ->(ids) { joins(:project).merge(::Project.id_in(ids)) } scope :project_id_in, ->(ids) { joins(:project).merge(::Project.id_in(ids)) }
end end
class_methods do class_methods do
extend ::Gitlab::Utils::Override
# @param primary_key_in [Range, ContainerRepository] arg to pass to primary_key_in scope # @param primary_key_in [Range, ContainerRepository] arg to pass to primary_key_in scope
# @return [ActiveRecord::Relation<ContainerRepository>] everything that should be synced to this node, restricted by primary key # @return [ActiveRecord::Relation<ContainerRepository>] everything that should be synced to this node, restricted by primary key
def replicables_for_current_secondary(primary_key_in) def replicables_for_current_secondary(primary_key_in)
...@@ -16,6 +20,21 @@ module EE ...@@ -16,6 +20,21 @@ module EE
node.container_repositories.primary_key_in(primary_key_in) node.container_repositories.primary_key_in(primary_key_in)
end end
override :with_target_import_tier
def with_target_import_tier
# self-managed instances are singlular plans, so they do not need
# these filters
return all unless ::Gitlab.com?
if ::ContainerRegistry::Migration.limit_gitlab_org?
joins(project: [:namespace]).where(namespaces: { path: GITLAB_ORG_NAMESPACE })
else
joins(
project: [namespace: [gitlab_subscription: [:hosted_plan]]]
).where(plans: { id: ::ContainerRegistry::Migration.target_plan.id })
end
end
end end
def push_blob(digest, file_path) def push_blob(digest, file_path)
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe ContainerRepository, :saas do
describe '.with_target_import_tier' do
let_it_be(:project) { create(:project) }
let_it_be(:valid_container_repository) { create(:container_repository, project: project) }
let_it_be(:gitlab_namespace) { create(:namespace, path: 'gitlab-org') }
let_it_be(:gitlab_project) { create(:project, namespace: gitlab_namespace) }
let_it_be(:gitlab_container_repository) { create(:container_repository, project: gitlab_project) }
let_it_be(:ultimate_project) { create(:project) }
let_it_be(:ultimate_container_repository) { create(:container_repository, project: ultimate_project) }
let(:subscription) { create(:gitlab_subscription, :premium, namespace: project.namespace) }
let(:ultimate_subscription) { create(:gitlab_subscription, :ultimate, namespace: ultimate_project.namespace) }
subject { described_class.with_target_import_tier }
before do
stub_application_setting(container_registry_import_target_plan: subscription.hosted_plan.name)
end
context 'limit_gitlab_org enabled' do
it { is_expected.to contain_exactly(gitlab_container_repository) }
end
context 'limit_gitlab_org disabled' do
before do
stub_feature_flags(container_registry_migration_limit_gitlab_org: false)
end
it { is_expected.to contain_exactly(valid_container_repository) }
end
end
describe '.ready_for_import' do
include_context 'importable repositories'
let_it_be(:ultimate_project) { create(:project) }
let_it_be(:ultimate_container_repository) { create(:container_repository, project: ultimate_project, created_at: 2.days.ago) }
let_it_be(:subscription) { create(:gitlab_subscription, :premium, namespace: project.namespace) }
let_it_be(:denied_subscription) { create(:gitlab_subscription, :premium, namespace: denied_project.namespace) }
let_it_be(:ultimate_subscription) { create(:gitlab_subscription, :ultimate, namespace: ultimate_project.namespace) }
subject { described_class.ready_for_import }
before do
stub_application_setting(container_registry_import_target_plan: subscription.hosted_plan.name)
end
it { is_expected.to contain_exactly(valid_container_repository, valid_container_repository2) }
end
end
...@@ -22,6 +22,10 @@ module ContainerRegistry ...@@ -22,6 +22,10 @@ module ContainerRegistry
Feature.enabled?(:container_registry_migration_phase2_enabled) Feature.enabled?(:container_registry_migration_phase2_enabled)
end end
def self.limit_gitlab_org?
Feature.enabled?(:container_registry_migration_limit_gitlab_org)
end
def self.enqueue_waiting_time def self.enqueue_waiting_time
return 0 if Feature.enabled?(:container_registry_migration_phase2_enqueue_speed_fast) return 0 if Feature.enabled?(:container_registry_migration_phase2_enqueue_speed_fast)
return 6.hours if Feature.enabled?(:container_registry_migration_phase2_enqueue_speed_slow) return 6.hours if Feature.enabled?(:container_registry_migration_phase2_enqueue_speed_slow)
...@@ -36,5 +40,9 @@ module ContainerRegistry ...@@ -36,5 +40,9 @@ module ContainerRegistry
0 0
end end
def self.target_plan
Plan.find_by_name(target_plan_name)
end
end end
end end
...@@ -19,6 +19,20 @@ RSpec.describe ContainerRegistry::Migration do ...@@ -19,6 +19,20 @@ RSpec.describe ContainerRegistry::Migration do
end end
end end
describe '.limit_gitlab_org?' do
subject { described_class.limit_gitlab_org? }
it { is_expected.to eq(true) }
context 'feature flag disabled' do
before do
stub_feature_flags(container_registry_migration_limit_gitlab_org: false)
end
it { is_expected.to eq(false) }
end
end
describe '.enqueue_waiting_time' do describe '.enqueue_waiting_time' do
subject { described_class.enqueue_waiting_time } subject { described_class.enqueue_waiting_time }
...@@ -139,4 +153,16 @@ RSpec.describe ContainerRegistry::Migration do ...@@ -139,4 +153,16 @@ RSpec.describe ContainerRegistry::Migration do
expect(described_class.created_before).to eq(value) expect(described_class.created_before).to eq(value)
end end
end end
describe '.target_plan' do
let_it_be(:plan) { create(:plan) }
before do
stub_application_setting(container_registry_import_target_plan: plan.name)
end
it 'returns the matching application_setting' do
expect(described_class.target_plan).to eq(plan)
end
end
end end
...@@ -174,6 +174,14 @@ RSpec.describe ContainerRepository, :aggregate_failures do ...@@ -174,6 +174,14 @@ RSpec.describe ContainerRepository, :aggregate_failures do
end end
end end
shared_examples 'queueing the next import' do
it 'starts the worker' do
expect(::ContainerRegistry::Migration::EnqueuerWorker).to receive(:perform_async)
subject
end
end
describe '#start_pre_import' do describe '#start_pre_import' do
let_it_be_with_reload(:repository) { create(:container_repository) } let_it_be_with_reload(:repository) { create(:container_repository) }
...@@ -256,8 +264,9 @@ RSpec.describe ContainerRepository, :aggregate_failures do ...@@ -256,8 +264,9 @@ RSpec.describe ContainerRepository, :aggregate_failures do
subject { repository.finish_import } subject { repository.finish_import }
it_behaves_like 'transitioning from allowed states', %w[importing] it_behaves_like 'transitioning from allowed states', %w[importing]
it_behaves_like 'queueing the next import'
it 'sets migration_import_done_at' do it 'sets migration_import_done_at and queues the next import' do
expect { subject }.to change { repository.reload.migration_import_done_at } expect { subject }.to change { repository.reload.migration_import_done_at }
expect(repository).to be_import_done expect(repository).to be_import_done
...@@ -283,9 +292,10 @@ RSpec.describe ContainerRepository, :aggregate_failures do ...@@ -283,9 +292,10 @@ RSpec.describe ContainerRepository, :aggregate_failures do
subject { repository.abort_import } subject { repository.abort_import }
it_behaves_like 'transitioning from allowed states', %w[pre_importing pre_import_done importing] it_behaves_like 'transitioning from allowed states', ContainerRepository::ABORTABLE_MIGRATION_STATES
it_behaves_like 'queueing the next import'
it 'sets migration_aborted_at and migration_aborted_at and increments the retry count' do it 'sets migration_aborted_at and migration_aborted_at, increments the retry count, and queues the next import' do
expect { subject }.to change { repository.migration_aborted_at } expect { subject }.to change { repository.migration_aborted_at }
.and change { repository.reload.migration_retries_count }.by(1) .and change { repository.reload.migration_retries_count }.by(1)
...@@ -299,7 +309,7 @@ RSpec.describe ContainerRepository, :aggregate_failures do ...@@ -299,7 +309,7 @@ RSpec.describe ContainerRepository, :aggregate_failures do
subject { repository.skip_import(reason: :too_many_retries) } subject { repository.skip_import(reason: :too_many_retries) }
it_behaves_like 'transitioning from allowed states', %w[default pre_importing importing] it_behaves_like 'transitioning from allowed states', ContainerRepository::ABORTABLE_MIGRATION_STATES
it 'sets migration_skipped_at and migration_skipped_reason' do it 'sets migration_skipped_at and migration_skipped_reason' do
expect { subject }.to change { repository.reload.migration_skipped_at } expect { subject }.to change { repository.reload.migration_skipped_at }
...@@ -329,6 +339,43 @@ RSpec.describe ContainerRepository, :aggregate_failures do ...@@ -329,6 +339,43 @@ RSpec.describe ContainerRepository, :aggregate_failures do
end end
end end
describe '#retry_migration' do
subject { repository.retry_migration }
it 'retries the pre_import' do
expect(repository).to receive(:retry_pre_import).and_return(true)
expect(repository).not_to receive(:retry_import)
expect(subject).to eq(true)
end
context 'when migration is done pre-importing' do
before do
repository.update_columns(migration_pre_import_done_at: Time.zone.now)
end
it 'returns' do
expect(repository).to receive(:retry_import).and_return(true)
expect(repository).not_to receive(:retry_pre_import)
expect(subject).to eq(true)
end
end
context 'when migration is already complete' do
before do
repository.update_columns(migration_import_done_at: Time.zone.now)
end
it 'returns' do
expect(repository).not_to receive(:retry_pre_import)
expect(repository).not_to receive(:retry_import)
expect(subject).to eq(nil)
end
end
end
describe '#tag' do describe '#tag' do
it 'has a test tag' do it 'has a test tag' do
expect(repository.tag('test')).not_to be_nil expect(repository.tag('test')).not_to be_nil
...@@ -524,6 +571,14 @@ RSpec.describe ContainerRepository, :aggregate_failures do ...@@ -524,6 +571,14 @@ RSpec.describe ContainerRepository, :aggregate_failures do
expect(subject).to eq(:error) expect(subject).to eq(:error)
end end
end end
context 'too many imports' do
it 'raises an error when it receives too_many_imports as a response' do
expect(repository.gitlab_api_client)
.to receive(step).with(repository.path).and_return(:too_many_imports)
expect { subject }.to raise_error(described_class::TooManyImportsError)
end
end
end end
describe '#migration_pre_import' do describe '#migration_pre_import' do
...@@ -900,6 +955,61 @@ RSpec.describe ContainerRepository, :aggregate_failures do ...@@ -900,6 +955,61 @@ RSpec.describe ContainerRepository, :aggregate_failures do
end end
end end
describe '#try_import' do
let_it_be_with_reload(:container_repository) { create(:container_repository) }
let(:response) { nil }
subject do
container_repository.try_import do
container_repository.foo
end
end
before do
allow(container_repository).to receive(:foo).and_return(response)
end
context 'successful request' do
let(:response) { :ok }
it { is_expected.to eq(true) }
end
context 'TooManyImportsError' do
before do
stub_application_setting(container_registry_import_start_max_retries: 3)
allow(container_repository).to receive(:foo).and_raise(described_class::TooManyImportsError)
end
it 'tries again exponentially and aborts the migration' do
expect(container_repository).to receive(:sleep).with(a_value_within(0.01).of(0.1))
expect(container_repository).to receive(:sleep).with(a_value_within(0.01).of(0.2))
expect(container_repository).to receive(:sleep).with(a_value_within(0.01).of(0.3))
expect(subject).to eq(false)
expect(container_repository).to be_import_aborted
end
end
context 'other response' do
let(:response) { :error }
it 'aborts the migration' do
expect(subject).to eq(false)
expect(container_repository).to be_import_aborted
end
end
context 'with no block given' do
it 'raises an error' do
expect { container_repository.try_import }.to raise_error(ArgumentError)
end
end
end
context 'with repositories' do context 'with repositories' do
let_it_be_with_reload(:repository) { create(:container_repository, :cleanup_unscheduled) } let_it_be_with_reload(:repository) { create(:container_repository, :cleanup_unscheduled) }
let_it_be(:other_repository) { create(:container_repository, :cleanup_unscheduled) } let_it_be(:other_repository) { create(:container_repository, :cleanup_unscheduled) }
...@@ -951,6 +1061,48 @@ RSpec.describe ContainerRepository, :aggregate_failures do ...@@ -951,6 +1061,48 @@ RSpec.describe ContainerRepository, :aggregate_failures do
it { is_expected.to eq([repository]) } it { is_expected.to eq([repository]) }
end end
end end
describe '.recently_done_migration_step' do
let_it_be(:import_done_repository) { create(:container_repository, :import_done, migration_pre_import_done_at: 3.days.ago, migration_import_done_at: 2.days.ago) }
let_it_be(:import_aborted_repository) { create(:container_repository, :import_aborted, migration_pre_import_done_at: 5.days.ago, migration_aborted_at: 1.day.ago) }
let_it_be(:pre_import_done_repository) { create(:container_repository, :pre_import_done, migration_pre_import_done_at: 1.hour.ago) }
subject { described_class.recently_done_migration_step }
it 'returns completed imports by done_at date' do
expect(subject.to_a).to eq([pre_import_done_repository, import_aborted_repository, import_done_repository])
end
end
describe '.ready_for_import' do
include_context 'importable repositories'
subject { described_class.ready_for_import }
before do
stub_application_setting(container_registry_import_target_plan: project.namespace.actual_plan_name)
end
it 'works' do
expect(subject).to contain_exactly(valid_container_repository, valid_container_repository2)
end
end
describe '#last_import_step_done_at' do
let_it_be(:aborted_at) { Time.zone.now - 1.hour }
let_it_be(:pre_import_done_at) { Time.zone.now - 2.hours }
subject { repository.last_import_step_done_at }
before do
repository.update_columns(
migration_pre_import_done_at: pre_import_done_at,
migration_aborted_at: aborted_at
)
end
it { is_expected.to eq(aborted_at) }
end
end end
describe '.with_stale_migration' do describe '.with_stale_migration' do
......
# frozen_string_literal: true
RSpec.shared_context 'importable repositories' do
let_it_be(:project) { create(:project) }
let_it_be(:valid_container_repository) { create(:container_repository, project: project, created_at: 2.days.ago) }
let_it_be(:valid_container_repository2) { create(:container_repository, project: project, created_at: 1.year.ago) }
let_it_be(:importing_container_repository) { create(:container_repository, :importing, project: project, created_at: 2.days.ago) }
let_it_be(:new_container_repository) { create(:container_repository, project: project) }
let_it_be(:denied_group) { create(:group) }
let_it_be(:denied_project) { create(:project, group: denied_group) }
let_it_be(:denied_container_repository) { create(:container_repository, project: denied_project, created_at: 2.days.ago) }
before do
stub_application_setting(container_registry_import_created_before: 1.day.ago)
stub_feature_flags(
container_registry_phase_2_deny_list: false,
container_registry_migration_limit_gitlab_org: false
)
Feature::FlipperGate.create!(
feature_key: 'container_registry_phase_2_deny_list',
key: 'actors',
value: "Group:#{denied_group.id}"
)
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe ContainerRegistry::Migration::EnqueuerWorker, :aggregate_failures do
let_it_be_with_reload(:container_repository) { create(:container_repository, created_at: 2.days.ago) }
let(:worker) { described_class.new }
before do
stub_container_registry_config(enabled: true)
stub_application_setting(container_registry_import_created_before: 1.day.ago)
stub_container_registry_tags(repository: container_repository.path, tags: %w(tag1 tag2 tag3), with_manifest: true)
end
describe '#perform' do
subject { worker.perform }
shared_examples 'no action' do
it 'does not queue or change any repositories' do
subject
expect(container_repository.reload).to be_default
end
end
shared_examples 're-enqueuing based on capacity' do
context 'below capacity' do
before do
allow(ContainerRegistry::Migration).to receive(:capacity).and_return(9999)
end
it 're-enqueues the worker' do
expect(ContainerRegistry::Migration::EnqueuerWorker).to receive(:perform_async)
subject
end
end
context 'above capacity' do
before do
allow(ContainerRegistry::Migration).to receive(:capacity).and_return(-1)
end
it 'does not re-enqueue the worker' do
expect(ContainerRegistry::Migration::EnqueuerWorker).not_to receive(:perform_async)
subject
end
end
end
context 'with qualified repository' do
it 'starts the pre-import for the next qualified repository' do
method = worker.method(:next_repository)
allow(worker).to receive(:next_repository) do
next_qualified_repository = method.call
allow(next_qualified_repository).to receive(:migration_pre_import).and_return(:ok)
next_qualified_repository
end
expect(worker).to receive(:log_extra_metadata_on_done)
.with(:container_repository_id, container_repository.id)
expect(worker).to receive(:log_extra_metadata_on_done)
.with(:import_type, 'next')
subject
expect(container_repository.reload).to be_pre_importing
end
it_behaves_like 're-enqueuing based on capacity'
end
context 'migrations are disabled' do
before do
allow(ContainerRegistry::Migration).to receive(:enabled?).and_return(false)
end
it_behaves_like 'no action'
end
context 'above capacity' do
before do
create(:container_repository, :importing)
create(:container_repository, :importing)
allow(ContainerRegistry::Migration).to receive(:capacity).and_return(1)
end
it_behaves_like 'no action'
it 'does not re-enqueue the worker' do
expect(ContainerRegistry::Migration::EnqueuerWorker).not_to receive(:perform_async)
subject
end
end
context 'too soon before previous completed import step' do
before do
create(:container_repository, :import_done, migration_import_done_at: 1.minute.ago)
allow(ContainerRegistry::Migration).to receive(:enqueue_waiting_time).and_return(1.hour)
end
it_behaves_like 'no action'
end
context 'when an aborted import is available' do
let_it_be(:aborted_repository) { create(:container_repository, :import_aborted) }
it 'retries the import for the aborted repository' do
method = worker.method(:next_aborted_repository)
allow(worker).to receive(:next_aborted_repository) do
next_aborted_repository = method.call
allow(next_aborted_repository).to receive(:migration_import).and_return(:ok)
next_aborted_repository
end
expect(worker).to receive(:log_extra_metadata_on_done)
.with(:container_repository_id, aborted_repository.id)
expect(worker).to receive(:log_extra_metadata_on_done)
.with(:import_type, 'retry')
subject
expect(aborted_repository.reload).to be_importing
expect(container_repository.reload).to be_default
end
it_behaves_like 're-enqueuing based on capacity'
end
context 'when no repository qualifies' do
include_examples 'an idempotent worker' do
before do
allow(ContainerRepository).to receive(:ready_for_import).and_return(ContainerRepository.none)
end
it_behaves_like 'no action'
end
end
context 'over max tag count' do
before do
stub_application_setting(container_registry_import_max_tags_count: 2)
end
it 'skips the repository' do
subject
expect(container_repository.reload).to be_import_skipped
expect(container_repository.migration_skipped_reason).to eq('too_many_tags')
expect(container_repository.migration_skipped_at).not_to be_nil
end
it_behaves_like 're-enqueuing based on capacity'
end
context 'when an error occurs' do
before do
allow(ContainerRegistry::Migration).to receive(:max_tags_count).and_raise(StandardError)
end
it 'aborts the import' do
expect(Gitlab::ErrorTracking).to receive(:log_exception).with(
instance_of(StandardError),
next_repository_id: container_repository.id,
next_aborted_repository_id: nil
)
subject
expect(container_repository.reload).to be_import_aborted
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment