Commit a4fe627e authored by David Fernandez's avatar David Fernandez

Guard worker will check ongoing migrations

By pinging the Container Registry API. If the status returned is
coeherent with an ongoing migration, the job will skip the migration.
The migration is aborted otherwise.
parent 88e71952
...@@ -274,9 +274,7 @@ class ContainerRepository < ApplicationRecord ...@@ -274,9 +274,7 @@ class ContainerRepository < ApplicationRecord
def retry_aborted_migration def retry_aborted_migration
return unless migration_state == 'import_aborted' return unless migration_state == 'import_aborted'
import_status = gitlab_api_client.import_status(self.path) case external_import_status
case import_status
when 'native' when 'native'
raise NativeImportError raise NativeImportError
when 'import_in_progress' when 'import_in_progress'
...@@ -322,6 +320,12 @@ class ContainerRepository < ApplicationRecord ...@@ -322,6 +320,12 @@ class ContainerRepository < ApplicationRecord
[migration_pre_import_done_at, migration_import_done_at, migration_aborted_at].compact.max [migration_pre_import_done_at, migration_import_done_at, migration_aborted_at].compact.max
end end
def external_import_status
strong_memoize(:import_status) do
gitlab_api_client.import_status(self.path)
end
end
# rubocop: disable CodeReuse/ServiceClass # rubocop: disable CodeReuse/ServiceClass
def registry def registry
@registry ||= begin @registry ||= begin
......
...@@ -21,18 +21,68 @@ module ContainerRegistry ...@@ -21,18 +21,68 @@ module ContainerRegistry
repositories = ::ContainerRepository.with_stale_migration(step_before_timestamp) repositories = ::ContainerRepository.with_stale_migration(step_before_timestamp)
.limit(max_capacity) .limit(max_capacity)
aborts_count = 0
long_running_migration_ids = []
# the #to_a is safe as the amount of entries is limited. # the #to_a is safe as the amount of entries is limited.
# In addition, we're calling #each in the next line and we don't want two different SQL queries for these two lines # In addition, we're calling #each in the next line and we don't want two different SQL queries for these two lines
log_extra_metadata_on_done(:stale_migrations_count, repositories.to_a.size) log_extra_metadata_on_done(:stale_migrations_count, repositories.to_a.size)
repositories.each do |repository| repositories.each do |repository|
if abortable?(repository)
repository.abort_import repository.abort_import
aborts_count += 1
else
long_running_migration_ids << repository.id if long_running_migration?(repository)
end
end
log_extra_metadata_on_done(:aborted_stale_migrations_count, aborts_count)
if long_running_migration_ids.any?
log_extra_metadata_on_done(:long_running_stale_migration_container_repository_ids, long_running_migration_ids)
end end
end end
private private
# This can ping the Container Registry API.
# We loop on a set of repositories to calls this function (see #perform)
# In the worst case scenario, we have a n+1 API calls situation here.
#
# This is reasonable because the maximum amount of repositories looped
# on is `25`. See ::ContainerRegistry::Migration.capacity.
#
# TODO We can remove this n+1 situation by having a Container Registry API
# endpoint that accepts multiple repository paths at once. This is issue
# https://gitlab.com/gitlab-org/container-registry/-/issues/582
def abortable?(repository)
# early return to save one Container Registry API request
return true unless repository.importing? || repository.pre_importing?
return true unless external_migration_in_progress?(repository)
false
end
def long_running_migration?(repository)
migration_start_timestamp(repository).before?(long_running_migration_threshold)
end
def external_migration_in_progress?(repository)
status = repository.external_import_status
(status == 'pre_import_in_progress' && repository.pre_importing?) ||
(status == 'import_in_progress' && repository.importing?)
end
def migration_start_timestamp(repository)
if repository.pre_importing?
repository.migration_pre_import_started_at
else
repository.migration_import_started_at
end
end
def step_before_timestamp def step_before_timestamp
::ContainerRegistry::Migration.max_step_duration.seconds.ago ::ContainerRegistry::Migration.max_step_duration.seconds.ago
end end
...@@ -42,6 +92,10 @@ module ContainerRegistry ...@@ -42,6 +92,10 @@ module ContainerRegistry
# is not properly applied # is not properly applied
::ContainerRegistry::Migration.capacity * 2 ::ContainerRegistry::Migration.capacity * 2
end end
def long_running_migration_threshold
@threshold ||= 30.minutes.ago
end
end end
end end
end end
...@@ -25,6 +25,7 @@ module ContainerRegistry ...@@ -25,6 +25,7 @@ module ContainerRegistry
end end
end end
# https://gitlab.com/gitlab-org/container-registry/-/blob/master/docs-gitlab/api.md#compliance-check
def supports_gitlab_api? def supports_gitlab_api?
strong_memoize(:supports_gitlab_api) do strong_memoize(:supports_gitlab_api) do
registry_features = Gitlab::CurrentSettings.container_registry_features || [] registry_features = Gitlab::CurrentSettings.container_registry_features || []
...@@ -35,16 +36,19 @@ module ContainerRegistry ...@@ -35,16 +36,19 @@ module ContainerRegistry
end end
end end
# https://gitlab.com/gitlab-org/container-registry/-/blob/master/docs-gitlab/api.md#import-repository
def pre_import_repository(path) def pre_import_repository(path)
response = start_import_for(path, pre: true) response = start_import_for(path, pre: true)
IMPORT_RESPONSES.fetch(response.status, :error) IMPORT_RESPONSES.fetch(response.status, :error)
end end
# https://gitlab.com/gitlab-org/container-registry/-/blob/master/docs-gitlab/api.md#import-repository
def import_repository(path) def import_repository(path)
response = start_import_for(path, pre: false) response = start_import_for(path, pre: false)
IMPORT_RESPONSES.fetch(response.status, :error) IMPORT_RESPONSES.fetch(response.status, :error)
end end
# https://gitlab.com/gitlab-org/container-registry/-/blob/master/docs-gitlab/api.md#get-repository-import-status
def import_status(path) def import_status(path)
body_hash = response_body(faraday.get(import_url_for(path))) body_hash = response_body(faraday.get(import_url_for(path)))
body_hash['status'] || 'error' body_hash['status'] || 'error'
......
...@@ -34,6 +34,11 @@ module ContainerRegistry ...@@ -34,6 +34,11 @@ module ContainerRegistry
end end
def self.capacity def self.capacity
# Increasing capacity numbers will increase the n+1 API calls we can have
# in ContainerRegistry::Migration::GuardWorker#external_migration_in_progress?
#
# TODO: See https://gitlab.com/gitlab-org/container-registry/-/issues/582
#
return 25 if Feature.enabled?(:container_registry_migration_phase2_capacity_25) return 25 if Feature.enabled?(:container_registry_migration_phase2_capacity_25)
return 10 if Feature.enabled?(:container_registry_migration_phase2_capacity_10) return 10 if Feature.enabled?(:container_registry_migration_phase2_capacity_10)
return 1 if Feature.enabled?(:container_registry_migration_phase2_capacity_1) return 1 if Feature.enabled?(:container_registry_migration_phase2_capacity_1)
......
...@@ -1179,6 +1179,16 @@ RSpec.describe ContainerRepository, :aggregate_failures do ...@@ -1179,6 +1179,16 @@ RSpec.describe ContainerRepository, :aggregate_failures do
end end
end end
describe '#external_import_status' do
subject { repository.external_import_status }
it 'returns the response from the client' do
expect(repository.gitlab_api_client).to receive(:import_status).with(repository.path).and_return('test')
expect(subject).to eq('test')
end
end
describe '.with_stale_migration' do describe '.with_stale_migration' do
let_it_be(:repository) { create(:container_repository) } let_it_be(:repository) { create(:container_repository) }
let_it_be(:stale_pre_importing_old_timestamp) { create(:container_repository, :pre_importing, migration_pre_import_started_at: 10.minutes.ago) } let_it_be(:stale_pre_importing_old_timestamp) { create(:container_repository, :pre_importing, migration_pre_import_started_at: 10.minutes.ago) }
......
...@@ -26,11 +26,30 @@ RSpec.describe ContainerRegistry::Migration::GuardWorker, :aggregate_failures do ...@@ -26,11 +26,30 @@ RSpec.describe ContainerRegistry::Migration::GuardWorker, :aggregate_failures do
allow(::Gitlab).to receive(:com?).and_return(true) allow(::Gitlab).to receive(:com?).and_return(true)
end end
shared_examples 'not aborting any migration' do
it 'will not abort the migration' do
expect(worker).to receive(:log_extra_metadata_on_done).with(:stale_migrations_count, 1)
expect(worker).to receive(:log_extra_metadata_on_done).with(:aborted_stale_migrations_count, 0)
expect(worker).to receive(:log_extra_metadata_on_done).with(:long_running_stale_migration_container_repository_ids, [stale_migration.id])
expect { subject }
.to not_change(pre_importing_migrations, :count)
.and not_change(pre_import_done_migrations, :count)
.and not_change(importing_migrations, :count)
.and not_change(import_done_migrations, :count)
.and not_change(import_aborted_migrations, :count)
.and not_change { stale_migration.reload.migration_state }
.and not_change { ongoing_migration.migration_state }
end
end
context 'with no stale migrations' do context 'with no stale migrations' do
it_behaves_like 'an idempotent worker' it_behaves_like 'an idempotent worker'
it 'will not update any migration state' do it 'will not update any migration state' do
expect(worker).to receive(:log_extra_metadata_on_done).with(:stale_migrations_count, 0) expect(worker).to receive(:log_extra_metadata_on_done).with(:stale_migrations_count, 0)
expect(worker).to receive(:log_extra_metadata_on_done).with(:aborted_stale_migrations_count, 0)
expect { subject } expect { subject }
.to not_change(pre_importing_migrations, :count) .to not_change(pre_importing_migrations, :count)
.and not_change(pre_import_done_migrations, :count) .and not_change(pre_import_done_migrations, :count)
...@@ -41,10 +60,19 @@ RSpec.describe ContainerRegistry::Migration::GuardWorker, :aggregate_failures do ...@@ -41,10 +60,19 @@ RSpec.describe ContainerRegistry::Migration::GuardWorker, :aggregate_failures do
context 'with pre_importing stale migrations' do context 'with pre_importing stale migrations' do
let(:ongoing_migration) { create(:container_repository, :pre_importing) } let(:ongoing_migration) { create(:container_repository, :pre_importing) }
let(:stale_migration) { create(:container_repository, :pre_importing, migration_pre_import_started_at: 10.minutes.ago) } let(:stale_migration) { create(:container_repository, :pre_importing, migration_pre_import_started_at: 35.minutes.ago) }
let(:import_status) { 'test' }
before do
allow_next_instance_of(ContainerRegistry::GitlabApiClient) do |client|
allow(client).to receive(:import_status).and_return(import_status)
end
end
it 'will abort the migration' do it 'will abort the migration' do
expect(worker).to receive(:log_extra_metadata_on_done).with(:stale_migrations_count, 1) expect(worker).to receive(:log_extra_metadata_on_done).with(:stale_migrations_count, 1)
expect(worker).to receive(:log_extra_metadata_on_done).with(:aborted_stale_migrations_count, 1)
expect { subject } expect { subject }
.to change(pre_importing_migrations, :count).by(-1) .to change(pre_importing_migrations, :count).by(-1)
.and not_change(pre_import_done_migrations, :count) .and not_change(pre_import_done_migrations, :count)
...@@ -54,18 +82,26 @@ RSpec.describe ContainerRegistry::Migration::GuardWorker, :aggregate_failures do ...@@ -54,18 +82,26 @@ RSpec.describe ContainerRegistry::Migration::GuardWorker, :aggregate_failures do
.and change { stale_migration.reload.migration_state }.from('pre_importing').to('import_aborted') .and change { stale_migration.reload.migration_state }.from('pre_importing').to('import_aborted')
.and not_change { ongoing_migration.migration_state } .and not_change { ongoing_migration.migration_state }
end end
context 'the client returns pre_import_in_progress' do
let(:import_status) { 'pre_import_in_progress' }
it_behaves_like 'not aborting any migration'
end
end end
context 'with pre_import_done stale migrations' do context 'with pre_import_done stale migrations' do
let(:ongoing_migration) { create(:container_repository, :pre_import_done) } let(:ongoing_migration) { create(:container_repository, :pre_import_done) }
let(:stale_migration) { create(:container_repository, :pre_import_done, migration_pre_import_done_at: 10.minutes.ago) } let(:stale_migration) { create(:container_repository, :pre_import_done, migration_pre_import_done_at: 35.minutes.ago) }
before do before do
allow(::ContainerRegistry::Migration).to receive(:max_step_duration).and_return(5.minutes) allow(::ContainerRegistry::Migration).to receive(:max_step_duration).and_return(5.minutes)
expect(worker).to receive(:log_extra_metadata_on_done).with(:stale_migrations_count, 1)
end end
it 'will abort the migration' do it 'will abort the migration' do
expect(worker).to receive(:log_extra_metadata_on_done).with(:stale_migrations_count, 1)
expect(worker).to receive(:log_extra_metadata_on_done).with(:aborted_stale_migrations_count, 1)
expect { subject } expect { subject }
.to not_change(pre_importing_migrations, :count) .to not_change(pre_importing_migrations, :count)
.and change(pre_import_done_migrations, :count).by(-1) .and change(pre_import_done_migrations, :count).by(-1)
...@@ -79,14 +115,19 @@ RSpec.describe ContainerRegistry::Migration::GuardWorker, :aggregate_failures do ...@@ -79,14 +115,19 @@ RSpec.describe ContainerRegistry::Migration::GuardWorker, :aggregate_failures do
context 'with importing stale migrations' do context 'with importing stale migrations' do
let(:ongoing_migration) { create(:container_repository, :importing) } let(:ongoing_migration) { create(:container_repository, :importing) }
let(:stale_migration) { create(:container_repository, :importing, migration_import_started_at: 10.minutes.ago) } let(:stale_migration) { create(:container_repository, :importing, migration_import_started_at: 35.minutes.ago) }
let(:import_status) { 'test' }
before do before do
allow(::ContainerRegistry::Migration).to receive(:max_step_duration).and_return(5.minutes) allow_next_instance_of(ContainerRegistry::GitlabApiClient) do |client|
expect(worker).to receive(:log_extra_metadata_on_done).with(:stale_migrations_count, 1) allow(client).to receive(:import_status).and_return(import_status)
end
end end
it 'will abort the migration' do it 'will abort the migration' do
expect(worker).to receive(:log_extra_metadata_on_done).with(:stale_migrations_count, 1)
expect(worker).to receive(:log_extra_metadata_on_done).with(:aborted_stale_migrations_count, 1)
expect { subject } expect { subject }
.to not_change(pre_importing_migrations, :count) .to not_change(pre_importing_migrations, :count)
.and not_change(pre_import_done_migrations, :count) .and not_change(pre_import_done_migrations, :count)
...@@ -96,6 +137,12 @@ RSpec.describe ContainerRegistry::Migration::GuardWorker, :aggregate_failures do ...@@ -96,6 +137,12 @@ RSpec.describe ContainerRegistry::Migration::GuardWorker, :aggregate_failures do
.and change { stale_migration.reload.migration_state }.from('importing').to('import_aborted') .and change { stale_migration.reload.migration_state }.from('importing').to('import_aborted')
.and not_change { ongoing_migration.migration_state } .and not_change { ongoing_migration.migration_state }
end end
context 'the client returns import_in_progress' do
let(:import_status) { 'import_in_progress' }
it_behaves_like 'not aborting any migration'
end
end end
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment