Treat registry as SSOT to sync container registries

This feature is behind FF geo_container_registry_ssot_sync
parent 6fa0d0ed
......@@ -3,17 +3,15 @@
module Geo
class ContainerRepositoryRegistryFinder < RegistryFinder
def count_syncable
container_repositories.count
current_node_non_fdw.container_repositories.count
end
def count_synced
registries_for_container_repositories
.merge(Geo::ContainerRepositoryRegistry.synced).count
registries.merge(Geo::ContainerRepositoryRegistry.synced).count
end
def count_failed
registries_for_container_repositories
.merge(Geo::ContainerRepositoryRegistry.failed).count
registries.merge(Geo::ContainerRepositoryRegistry.failed).count
end
def count_registry
......@@ -21,7 +19,7 @@ module Geo
end
def find_registry_differences(range)
source_ids = Gitlab::Geo.current_node.container_repositories.id_in(range).pluck_primary_key
source_ids = current_node_non_fdw.container_repositories.id_in(range).pluck_primary_key
tracked_ids = Geo::ContainerRepositoryRegistry.pluck_model_ids_in_range(range)
untracked_ids = source_ids - tracked_ids
......@@ -68,26 +66,27 @@ module Geo
# Find limited amount of non replicated container repositories.
#
# You can pass a list with `except_repository_ids:` so you can exclude items you
# You can pass a list with `except_ids:` so you can exclude items you
# already scheduled but haven't finished and aren't persisted to the database yet
#
# @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_repository_ids ids that will be ignored from the query
# @param [Array<Integer>] except_ids ids that will be ignored from the query
# rubocop:disable CodeReuse/ActiveRecord
def find_unsynced(batch_size:, except_repository_ids: [])
container_repositories
def find_unsynced(batch_size:, except_ids: [])
current_node_fdw
.container_repositories
.missing_container_repository_registry
.id_not_in(except_repository_ids)
.id_not_in(except_ids)
.limit(batch_size)
end
# rubocop:enable CodeReuse/ActiveRecord
# rubocop:disable CodeReuse/ActiveRecord
def find_retryable_failed_ids(batch_size:, except_repository_ids: [])
def find_retryable_failed_ids(batch_size:, except_ids: [])
Geo::ContainerRepositoryRegistry
.failed
.retry_due
.model_id_not_in(except_repository_ids)
.model_id_not_in(except_ids)
.limit(batch_size)
.pluck_container_repository_key
end
......@@ -95,13 +94,12 @@ module Geo
private
def container_repositories
current_node.container_repositories
end
def registries_for_container_repositories
container_repositories
.inner_join_container_repository_registry
def registries
if Geo::ContainerRepositoryRegistry.registry_consistency_worker_enabled?
Geo::ContainerRepositoryRegistry.all
else
current_node_fdw.container_repositories.inner_join_container_repository_registry
end
end
end
end
......@@ -37,10 +37,28 @@ class Geo::ContainerRepositoryRegistry < Geo::BaseRegistry
end
end
def self.finder_class
::Geo::ContainerRepositoryRegistryFinder
end
def self.find_registry_differences(range)
finder_class.new(current_node_id: Gitlab::Geo.current_node.id).find_registry_differences(range)
end
def self.delete_for_model_ids(container_repository_ids)
where(container_repository_id: container_repository_ids).delete_all
container_repository_ids
end
def self.pluck_container_repository_key
where(nil).pluck(:container_repository_id)
end
def self.registry_consistency_worker_enabled?
Feature.enabled?(:geo_container_registry_ssot_sync, default_enabled: true)
end
def self.replication_enabled?
Gitlab.config.geo.registry_replication.enabled
end
......
......@@ -28,35 +28,49 @@ module Geo
{ id: repository_id, job_id: job_id } if job_id
end
def scheduled_repository_ids
scheduled_jobs.map { |data| data[:id] }
end
# Pools for new resources to be transferred
#
# @return [Array] resources to be transferred
def load_pending_resources
resources = find_unsynced_repositories(batch_size: db_retrieve_batch_size)
resources = find_container_repository_ids_not_synced(batch_size: db_retrieve_batch_size)
remaining_capacity = db_retrieve_batch_size - resources.size
if remaining_capacity.zero?
resources
else
resources + find_retryable_failed_repositories(batch_size: remaining_capacity)
resources + find_retryable_container_registry_ids(batch_size: remaining_capacity)
end
end
def find_unsynced_repositories(batch_size:)
Geo::ContainerRepositoryRegistryFinder
.new(current_node_id: current_node.id)
.find_unsynced(batch_size: batch_size, except_repository_ids: scheduled_repository_ids)
.pluck_primary_key
def find_container_repository_ids_not_synced(batch_size:)
if Geo::ContainerRepositoryRegistry.registry_consistency_worker_enabled?
registry_finder
.find_never_synced_registries(batch_size: batch_size, except_ids: scheduled_repository_ids)
.pluck_model_foreign_key
else
registry_finder
.find_unsynced(batch_size: batch_size, except_ids: scheduled_repository_ids)
.pluck_primary_key
end
end
def find_retryable_failed_repositories(batch_size:)
Geo::ContainerRepositoryRegistryFinder
.new(current_node_id: current_node.id)
.find_retryable_failed_ids(batch_size: batch_size, except_repository_ids: scheduled_repository_ids)
def find_retryable_container_registry_ids(batch_size:)
if Geo::ContainerRepositoryRegistry.registry_consistency_worker_enabled?
registry_finder
.find_retryable_dirty_registries(batch_size: batch_size, except_ids: scheduled_repository_ids)
.pluck_model_foreign_key
else
registry_finder
.find_retryable_failed_ids(batch_size: batch_size, except_ids: scheduled_repository_ids)
end
end
def scheduled_repository_ids
scheduled_jobs.map { |data| data[:id] }
def registry_finder
@registry_finder ||= Geo::ContainerRepositoryRegistryFinder.new(current_node_id: current_node.id)
end
end
end
......@@ -16,6 +16,7 @@ module Geo
feature_category :geo_replication
REGISTRY_CLASSES = [
Geo::ContainerRepositoryRegistry,
Geo::DesignRegistry,
Geo::JobArtifactRegistry,
Geo::LfsObjectRegistry,
......
# frozen_string_literal: true
FactoryBot.define do
factory :container_repository_registry, class: 'Geo::ContainerRepositoryRegistry' do
factory :geo_container_repository_registry, aliases: [:container_repository_registry], class: 'Geo::ContainerRepositoryRegistry' do
container_repository
last_sync_failure { nil }
last_synced_at { nil }
......
......@@ -4,12 +4,11 @@ require 'spec_helper'
RSpec.describe Geo::ContainerRepositoryRegistryFinder, :geo do
include ::EE::GeoHelpers
context 'count all the things', :geo_fdw do
context 'when geo_container_registry_ssot_sync is disabled', :geo_fdw do
let!(:secondary) { create(:geo_node) }
let!(:container_repository) { create(:container_repository) }
let!(:failed_registry) { create(:container_repository_registry, :sync_failed) }
let!(:synced_registry) { create(:container_repository_registry, :synced) }
let(:synced_group) { create(:group) }
let(:unsynced_group) { create(:group) }
let(:synced_project) { create(:project, group: synced_group) }
......@@ -19,57 +18,32 @@ RSpec.describe Geo::ContainerRepositoryRegistryFinder, :geo do
before do
stub_current_geo_node(secondary)
stub_feature_flags(geo_container_registry_ssot_sync: false)
end
describe '#count_syncable' do
it 'returns number of container repositories' do
result = subject.count_syncable
expect(result).to eq(3)
expect(subject.count_syncable).to eq(3)
end
end
describe '#count_synced' do
it 'returns only synced registry' do
result = subject.count_synced
expect(result).to eq(1)
expect(subject.count_synced).to eq(1)
end
end
describe '#count_failed' do
it 'returns only failed registry' do
result = subject.count_failed
expect(result).to eq(1)
expect(subject.count_failed).to eq(1)
end
end
describe '#count_registry' do
it 'returns number of all registries' do
result = subject.count_registry
expect(result).to eq(2)
expect(subject.count_registry).to eq(2)
end
end
end
context 'find all the things', :geo_fdw do
let!(:secondary) { create(:geo_node) }
let!(:container_repository) { create(:container_repository) }
let!(:failed_registry) { create(:container_repository_registry, :sync_failed) }
let!(:synced_registry) { create(:container_repository_registry, :synced) }
let(:synced_group) { create(:group) }
let(:unsynced_group) { create(:group) }
let(:synced_project) { create(:project, group: synced_group) }
let(:unsynced_project) { create(:project, :broken_storage, group: unsynced_group) }
subject { described_class.new(current_node_id: secondary.id) }
before do
stub_current_geo_node(secondary)
end
describe '#find_unsynced' do
it 'returns repositories without an entry in the tracking database' do
......@@ -80,7 +54,7 @@ RSpec.describe Geo::ContainerRepositoryRegistryFinder, :geo do
it 'returns repositories without an entry in the tracking database, excluding exception list' do
except_repository = create(:container_repository)
repositories = subject.find_unsynced(batch_size: 10, except_repository_ids: [except_repository.id])
repositories = subject.find_unsynced(batch_size: 10, except_ids: [except_repository.id])
expect(repositories).to match_ids(container_repository)
end
......@@ -92,7 +66,7 @@ RSpec.describe Geo::ContainerRepositoryRegistryFinder, :geo do
except_repository = create(:container_repository, project: synced_project)
repository = create(:container_repository, project: synced_project, name: 'second')
repositories = subject.find_unsynced(batch_size: 10, except_repository_ids: [except_repository.id])
repositories = subject.find_unsynced(batch_size: 10, except_ids: [except_repository.id])
expect(repositories).to match_ids(repository)
end
......@@ -122,14 +96,14 @@ RSpec.describe Geo::ContainerRepositoryRegistryFinder, :geo do
except_repository = create(:container_repository)
create(:container_repository_registry, :sync_failed, container_repository: except_repository)
result = subject.find_retryable_failed_ids(batch_size: 10, except_repository_ids: [except_repository.id])
result = subject.find_retryable_failed_ids(batch_size: 10, except_ids: [except_repository.id])
expect(result).to eq([failed_registry.container_repository_id])
end
end
end
context 'non-fdw queries' do
context 'when geo_container_registry_ssot_sync is enabled' do
let_it_be(:secondary) { create(:geo_node) }
let_it_be(:synced_group) { create(:group) }
let_it_be(:nested_group) { create(:group, parent: synced_group) }
......@@ -147,6 +121,40 @@ RSpec.describe Geo::ContainerRepositoryRegistryFinder, :geo do
before do
stub_current_geo_node(secondary)
stub_feature_flags(geo_container_registry_ssot_sync: true)
end
describe '#count_syncable' do
it 'returns number of container repositories' do
expect(subject.count_syncable).to eq(6)
end
end
describe '#count_synced' do
it 'returns only synced registry' do
create(:container_repository_registry, :synced, container_repository_id: container_repository_1.id)
create(:container_repository_registry, :sync_failed, container_repository_id: container_repository_3.id)
expect(subject.count_synced).to eq(1)
end
end
describe '#count_failed' do
it 'returns only failed registry' do
create(:container_repository_registry, :synced, container_repository_id: container_repository_1.id)
create(:container_repository_registry, :sync_failed, container_repository_id: container_repository_3.id)
expect(subject.count_failed).to eq(1)
end
end
describe '#count_registry' do
it 'returns number of all registries' do
create(:container_repository_registry, :synced, container_repository_id: container_repository_1.id)
create(:container_repository_registry, :sync_failed, container_repository_id: container_repository_3.id)
expect(subject.count_registry).to eq(2)
end
end
describe '#find_registry_differences' do
......
......@@ -2,20 +2,21 @@
require 'spec_helper'
RSpec.describe Geo::ContainerRepositorySyncDispatchWorker, :geo, :geo_fdw, :use_sql_query_cache_for_tracking_db do
RSpec.describe Geo::ContainerRepositorySyncDispatchWorker, :geo, :use_sql_query_cache_for_tracking_db do
include ::EE::GeoHelpers
include ExclusiveLeaseHelpers
let(:primary) { create(:geo_node, :primary) }
let(:primary) { create(:geo_node, :primary) }
let(:secondary) { create(:geo_node) }
before do
stub_current_geo_node(secondary)
stub_exclusive_lease(renew: true)
stub_registry_replication_config(enabled: true)
allow_next_instance_of(described_class) do |instance|
allow(instance).to receive(:over_time?).and_return(false)
end
stub_registry_replication_config(enabled: true)
end
it 'does not schedule anything when tracking database is not configured' do
......@@ -33,30 +34,39 @@ RSpec.describe Geo::ContainerRepositorySyncDispatchWorker, :geo, :geo_fdw, :use_
end
it 'does not schedule anything when node is disabled' do
secondary.update!(enabled: false)
create(:container_repository)
secondary.enabled = false
secondary.save
expect(Geo::ContainerRepositorySyncWorker).not_to receive(:perform_async)
subject.perform
end
context 'Sync condition' do
let(:container_repository) { create(:container_repository) }
it 'does not schedule anything when registry replication is disabled' do
stub_registry_replication_config(enabled: false)
create(:container_repository)
expect(Geo::ContainerRepositorySyncWorker).not_to receive(:perform_async)
end
context 'when geo_container_registry_ssot_sync is disabled', :geo_fdw do
before do
stub_feature_flags(geo_container_registry_ssot_sync: false)
end
it 'performs Geo::ContainerRepositorySyncWorker' do
container_repository = create(:container_repository)
expect(Geo::ContainerRepositorySyncWorker).to receive(:perform_async).with(container_repository.id)
subject.perform
end
it 'performs Geo::ContainerRepositorySyncWorker for failed syncs' do
container_repository_registry = create(:container_repository_registry, :sync_failed)
registry = create(:container_repository_registry, :sync_failed)
expect(Geo::ContainerRepositorySyncWorker).to receive(:perform_async)
.with(container_repository_registry.container_repository_id).once.and_return(spy)
.with(registry.container_repository_id).once.and_return(spy)
subject.perform
end
......@@ -70,15 +80,14 @@ RSpec.describe Geo::ContainerRepositorySyncDispatchWorker, :geo, :geo_fdw, :use_
end
context 'with a failed sync' do
let(:failed_registry) { create(:container_repository_registry, :sync_failed) }
it 'does not stall backfill' do
unsynced = create(:container_repository)
failed_registry = create(:container_repository_registry, :sync_failed)
unsynced_container_repository = create(:container_repository)
stub_const('Geo::Scheduler::SchedulerWorker::DB_RETRIEVE_BATCH_SIZE', 1)
expect(Geo::ContainerRepositorySyncWorker).not_to receive(:perform_async).with(failed_registry.container_repository_id)
expect(Geo::ContainerRepositorySyncWorker).to receive(:perform_async).with(unsynced.id)
expect(Geo::ContainerRepositorySyncWorker).to receive(:perform_async).with(unsynced_container_repository.id)
subject.perform
end
......@@ -101,30 +110,88 @@ RSpec.describe Geo::ContainerRepositorySyncDispatchWorker, :geo, :geo_fdw, :use_
subject.perform
end
end
end
context 'when node has namespace restrictions', :request_store do
let(:synced_group) { create(:group) }
let(:project_in_synced_group) { create(:project, group: synced_group) }
let(:unsynced_project) { create(:project) }
context 'when node has namespace restrictions', :request_store do
let(:synced_group) { create(:group) }
let(:project_in_synced_group) { create(:project, group: synced_group) }
let(:unsynced_project) { create(:project) }
before do
secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
end
it 'does not perform Geo::ContainerRepositorySyncWorker for repositories that does not belong to selected namespaces' do
container_repository = create(:container_repository, project: project_in_synced_group)
create(:container_repository, project: unsynced_project)
expect(Geo::ContainerRepositorySyncWorker).to receive(:perform_async)
.with(container_repository.id).once.and_return(spy)
subject.perform
end
end
end
context 'when geo_container_registry_ssot_sync is enabled' do
before do
secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
stub_feature_flags(geo_container_registry_ssot_sync: true)
end
it 'performs Geo::ContainerRepositorySyncWorker' do
registry = create(:container_repository_registry)
expect(Geo::ContainerRepositorySyncWorker).to receive(:perform_async).with(registry.container_repository_id)
allow(ProjectCacheWorker).to receive(:perform_async).and_return(true)
allow(::Gitlab::Geo).to receive(:current_node).and_call_original
Rails.cache.write(:current_node, secondary.to_json)
allow(::GeoNode).to receive(:current_node).and_return(secondary)
subject.perform
end
it 'does not perform Geo::ContainerRepositorySyncWorker for repositories that does not belong to selected namespaces ' do
container_repository = create(:container_repository, project: project_in_synced_group)
create(:container_repository, project: unsynced_project)
it 'performs Geo::ContainerRepositorySyncWorker for failed syncs' do
registry = create(:container_repository_registry, :sync_failed)
expect(Geo::ContainerRepositorySyncWorker).to receive(:perform_async)
.with(container_repository.id).once.and_return(spy)
.with(registry.container_repository_id).once.and_return(spy)
subject.perform
end
it 'does not perform Geo::ContainerRepositorySyncWorker for synced repositories' do
create(:container_repository_registry, :synced)
expect(Geo::ContainerRepositorySyncWorker).not_to receive(:perform_async)
subject.perform
end
context 'with a failed sync' do
it 'does not stall backfill' do
failed_registry = create(:container_repository_registry, :sync_failed)
unsynced_registry = create(:container_repository_registry)
stub_const('Geo::Scheduler::SchedulerWorker::DB_RETRIEVE_BATCH_SIZE', 1)
expect(Geo::ContainerRepositorySyncWorker).not_to receive(:perform_async).with(failed_registry.container_repository_id)
expect(Geo::ContainerRepositorySyncWorker).to receive(:perform_async).with(unsynced_registry.container_repository_id)
subject.perform
end
it 'does not retry failed files when retry_at is tomorrow' do
failed_registry = create(:container_repository_registry, :sync_failed, retry_at: Date.tomorrow)
expect(Geo::ContainerRepositorySyncWorker)
.not_to receive(:perform_async).with( failed_registry.container_repository_id)
subject.perform
end
it 'retries failed files when retry_at is in the past' do
failed_registry = create(:container_repository_registry, :sync_failed, retry_at: Date.yesterday)
expect(Geo::ContainerRepositorySyncWorker)
.to receive(:perform_async).with(failed_registry.container_repository_id)
subject.perform
end
end
end
end
......@@ -2,20 +2,20 @@
require 'spec_helper'
RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo do
include EE::GeoHelpers
include ExclusiveLeaseHelpers
let(:primary) { create(:geo_node, :primary) }
let(:secondary) { create(:geo_node) }
let_it_be(:primary) { create(:geo_node, :primary) }
let_it_be(:secondary) { create(:geo_node) }
let(:worker_class) { described_class }
let(:batch_size) { described_class::BATCH_SIZE }
before do
stub_current_geo_node(secondary)
end
let(:worker_class) { described_class }
let(:batch_size) { described_class::BATCH_SIZE }
it_behaves_like 'reenqueuer'
it 'uses a cronjob queue' do
......@@ -84,6 +84,7 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
upload = create(:upload)
package_file = create(:conan_package_file, :conan_package)
vulnerability_export = create(:vulnerability_export, :with_csv_file)
container_repository = create(:container_repository, project: project)
expect(Geo::LfsObjectRegistry.where(lfs_object_id: lfs_object.id).count).to eq(0)
expect(Geo::JobArtifactRegistry.where(artifact_id: job_artifact.id).count).to eq(0)
......@@ -92,6 +93,7 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
expect(Geo::UploadRegistry.where(file_id: upload.id).count).to eq(0)
expect(Geo::PackageFileRegistry.where(package_file_id: package_file.id).count).to eq(0)
expect(Geo::VulnerabilityExportRegistry.where(vulnerability_export_id: vulnerability_export.id).count).to eq(0)
expect(Geo::ContainerRepositoryRegistry.where(container_repository_id: container_repository.id).count).to eq(0)
subject.perform
......@@ -102,6 +104,7 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
expect(Geo::UploadRegistry.where(file_id: upload.id).count).to eq(1)
expect(Geo::PackageFileRegistry.where(package_file_id: package_file.id).count).to eq(1)
expect(Geo::VulnerabilityExportRegistry.where(vulnerability_export_id: vulnerability_export.id).count).to eq(1)
expect(Geo::ContainerRepositoryRegistry.where(container_repository_id: container_repository.id).count).to eq(1)
end
context 'when geo_design_registry_ssot_sync is disabled' do
......@@ -120,6 +123,7 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::UploadRegistry, batch_size: batch_size).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::ProjectRegistry, batch_size: batch_size).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::VulnerabilityExportRegistry, batch_size: batch_size).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::ContainerRepositoryRegistry, batch_size: batch_size).and_call_original
expect(Geo::RegistryConsistencyService).not_to receive(:new).with(Geo::DesignRegistry, batch_size: batch_size)
......@@ -127,6 +131,30 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
end
end
context 'when geo_container_registry_ssot_sync is disabled' do
before do
stub_feature_flags(geo_container_registry_ssot_sync: false)
end
it 'returns false' do
expect(subject.perform).to be_falsey
end
it 'does not execute RegistryConsistencyService for container repositories' do
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::JobArtifactRegistry, batch_size: batch_size).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::LfsObjectRegistry, batch_size: batch_size).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::PackageFileRegistry, batch_size: batch_size).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::UploadRegistry, batch_size: batch_size).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::ProjectRegistry, batch_size: batch_size).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::DesignRegistry, batch_size: batch_size).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::VulnerabilityExportRegistry, batch_size: batch_size).and_call_original
expect(Geo::RegistryConsistencyService).not_to receive(:new).with(Geo::ContainerRepositoryRegistry, batch_size: batch_size)
subject.perform
end
end
context 'when the current Geo node is disabled or primary' do
before do
stub_primary_node
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment