Commit 957212b5 authored by Michael Kozono's avatar Michael Kozono

Merge branch '212720-geo-make-registry-tables-as-ssot-for-designs' into 'master'

Geo - Make Registry tables as SSOT for Designs

Closes #212720

See merge request gitlab-org/gitlab!36587
parents 97f71074 757137a3
......@@ -3,30 +3,77 @@
module Geo
class DesignRegistryFinder < RegistryFinder
def count_syncable
GeoNode.find(current_node_id).projects.with_designs.count
current_node(fdw: false).designs.count
end
def count_synced
registries
.merge(Geo::DesignRegistry.synced).count
registries.merge(Geo::DesignRegistry.synced).count
end
def count_failed
registries
.merge(Geo::DesignRegistry.failed).count
registries.merge(Geo::DesignRegistry.failed).count
end
def count_registry
registries.count
end
def find_registry_differences(range)
source_ids = Gitlab::Geo.current_node.designs.id_in(range).pluck_primary_key
tracked_ids = Geo::DesignRegistry.pluck_model_ids_in_range(range)
untracked_ids = source_ids - tracked_ids
unused_tracked_ids = tracked_ids - source_ids
[untracked_ids, unused_tracked_ids]
end
# Returns Geo::DesignRegistry records that have never been synced.
#
# Does not care about selective sync, because it considers the Registry
# table to be the single source of truth. The contract is that other
# processes need to ensure that the table only contains records that should
# be synced.
#
# Any registries that have ever been synced that currently need to be
# resynced will be handled by other find methods (like
# #find_retryable_dirty_registries)
#
# You can pass a list with `except_ids:` so you can exclude items you
# already scheduled but haven't finished and aren't persisted to the database yet
#
# @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_ids ids that will be ignored from the query
# rubocop:disable CodeReuse/ActiveRecord
def find_never_synced_registries(batch_size:, except_ids: [])
Geo::DesignRegistry
.never_synced
.model_id_not_in(except_ids)
.limit(batch_size)
end
# rubocop:enable CodeReuse/ActiveRecord
# rubocop:disable CodeReuse/ActiveRecord
def find_retryable_dirty_registries(batch_size:, except_ids: [])
Geo::DesignRegistry
.updated_recently
.model_id_not_in(except_ids)
.order(Gitlab::Database.nulls_first_order(:last_synced_at))
.limit(batch_size)
end
# rubocop:enable CodeReuse/ActiveRecord
private
def registries
current_node
.projects
.with_designs
.inner_join_design_registry
if Geo::DesignRegistry.registry_consistency_worker_enabled?
Geo::DesignRegistry.all
else
current_node(fdw: true)
.projects
.with_designs
.inner_join_design_registry
end
end
end
end
......@@ -140,7 +140,7 @@ module EE
scope :aimed_for_deletion, -> (date) { where('marked_for_deletion_at <= ?', date).without_deleted }
scope :with_repos_templates, -> { where(namespace_id: ::Gitlab::CurrentSettings.current_application_settings.custom_project_templates_group_id) }
scope :with_groups_level_repos_templates, -> { joins("INNER JOIN namespaces ON projects.namespace_id = namespaces.custom_project_templates_group_id") }
scope :with_designs, -> { where(id: ::DesignManagement::Design.select(:project_id)) }
scope :with_designs, -> { where(id: ::DesignManagement::Design.select(:project_id).distinct) }
scope :with_deleting_user, -> { includes(:deleting_user) }
scope :with_compliance_framework_settings, -> { preload(:compliance_framework_setting) }
scope :has_vulnerabilities, -> { joins(:vulnerabilities).group(:id) }
......
......@@ -10,6 +10,7 @@ class Geo::DesignRegistry < Geo::BaseRegistry
belongs_to :project
scope :never_synced, -> { with_state(:pending).where(last_synced_at: nil) }
scope :pending, -> { with_state(:pending) }
scope :failed, -> { with_state(:failed) }
scope :synced, -> { with_state(:synced) }
......@@ -39,6 +40,28 @@ class Geo::DesignRegistry < Geo::BaseRegistry
end
end
def self.registry_consistency_worker_enabled?
Feature.enabled?(:geo_design_registry_ssot_sync)
end
def self.delete_for_model_ids(project_ids)
# We only need to delete the registry entries here. The design
# repository deletion should happen when a project is destroyed.
#
# See: https://gitlab.com/gitlab-org/gitlab/-/issues/13429
where(project_id: project_ids).delete_all
project_ids
end
def self.finder_class
::Geo::DesignRegistryFinder
end
def self.find_registry_differences(range)
finder_class.new(current_node_id: Gitlab::Geo.current_node.id).find_registry_differences(range)
end
# Search for a list of projects associated with registries,
# based on the query given in `query`.
#
......
......@@ -262,6 +262,10 @@ class GeoNode < ApplicationRecord
ContainerRepository.project_id_in(projects)
end
def designs
projects.with_designs
end
def lfs_objects
return LfsObject.all unless selective_sync?
......
......@@ -10,16 +10,50 @@ module Geo
{ project_id: project_id, job_id: job_id } if job_id
end
# rubocop: disable CodeReuse/ActiveRecord
def find_project_ids_not_synced(except_ids:, batch_size:)
Geo::DesignUnsyncedFinder
.new(scheduled_project_ids: except_ids, shard_name: shard_name, batch_size: batch_size)
.execute
if Geo::DesignRegistry.registry_consistency_worker_enabled?
project_ids =
find_never_synced_project_ids(batch_size: batch_size, except_ids: except_ids)
find_project_ids_within_shard(project_ids, direction: :desc)
else
Geo::DesignUnsyncedFinder
.new(scheduled_project_ids: except_ids, shard_name: shard_name, batch_size: batch_size)
.execute
end
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def find_project_ids_updated_recently(except_ids:, batch_size:)
Geo::DesignUpdatedRecentlyFinder
.new(scheduled_project_ids: except_ids, shard_name: shard_name, batch_size: batch_size)
.execute
if Geo::DesignRegistry.registry_consistency_worker_enabled?
project_ids =
find_retryable_dirty_project_ids(batch_size: batch_size, except_ids: except_ids)
find_project_ids_within_shard(project_ids, direction: :asc)
else
Geo::DesignUpdatedRecentlyFinder
.new(scheduled_project_ids: except_ids, shard_name: shard_name, batch_size: batch_size)
.execute
end
end
# rubocop: enable CodeReuse/ActiveRecord
def find_never_synced_project_ids(batch_size:, except_ids:)
registry_finder
.find_never_synced_registries(batch_size: batch_size, except_ids: except_ids)
.pluck_model_foreign_key
end
def find_retryable_dirty_project_ids(batch_size:, except_ids:)
registry_finder
.find_retryable_dirty_registries(batch_size: batch_size, except_ids: except_ids)
.pluck_model_foreign_key
end
def registry_finder
@registry_finder ||= Geo::DesignRegistryFinder.new
end
end
end
......@@ -16,6 +16,7 @@ module Geo
feature_category :geo_replication
REGISTRY_CLASSES = [
Geo::DesignRegistry,
Geo::JobArtifactRegistry,
Geo::LfsObjectRegistry,
Geo::PackageFileRegistry,
......
......@@ -7,8 +7,9 @@ FactoryBot.define do
last_synced_at { nil }
state { :pending }
after(:create) do |registry, evaluator|
create(:design, project: registry.project)
after(:create) do |registry|
issue = create(:issue, project: registry.project)
create(:design, project: registry.project, issue: issue)
end
trait :synced do
......
......@@ -8,8 +8,6 @@ RSpec.describe Geo::DesignRegistry, :geo do
let(:invalid_items_for_bulk_insertion) { [] } # class does not have any validations defined
end
let!(:design_registry) { create(:geo_design_registry) }
describe 'relationships' do
it { is_expected.to belong_to(:project) }
end
......@@ -19,6 +17,7 @@ RSpec.describe Geo::DesignRegistry, :geo do
end
describe '#search', :geo_fdw do
let!(:design_registry) { create(:geo_design_registry) }
let!(:failed_registry) { create(:geo_design_registry, :sync_failed) }
let!(:synced_registry) { create(:geo_design_registry, :synced) }
......@@ -94,6 +93,8 @@ RSpec.describe Geo::DesignRegistry, :geo do
end
describe '#should_be_redownloaded?' do
let_it_be(:design_registry) { create(:geo_design_registry) }
context 'when force_to_redownload is false' do
it 'returns false' do
expect(design_registry.should_be_redownloaded?).to be false
......
......@@ -11,19 +11,18 @@ RSpec.describe Geo::RegistryConsistencyService, :geo, :use_clean_rails_memory_st
stub_current_geo_node(secondary)
end
def model_class_factory_name(model_class)
if model_class == ::Packages::PackageFile
:package_file_with_file
else
model_class.underscore.tr('/', '_').to_sym
end
def model_class_factory_name(registry_class)
return :project_with_design if registry_class == ::Geo::DesignRegistry
return :package_file_with_file if registry_class == ::Geo::PackageFileRegistry
registry_class::MODEL_CLASS.underscore.tr('/', '_').to_sym
end
shared_examples 'registry consistency service' do |klass|
let(:registry_class) { klass }
let(:registry_class_factory) { registry_factory_name(registry_class) }
let(:model_class) { registry_class::MODEL_CLASS }
let(:model_class_factory) { model_class_factory_name(model_class) }
let(:model_class_factory) { model_class_factory_name(registry_class) }
let(:model_foreign_key) { registry_class::MODEL_FOREIGN_KEY }
let(:batch_size) { 2 }
......
......@@ -17,7 +17,6 @@ RSpec.describe Geo::DesignRepositoryShardSyncWorker, :geo, :geo_fdw, :clean_gitl
describe '#perform' do
let(:restricted_group) { create(:group) }
let(:unsynced_project_in_restricted_group) { create(:project, group: restricted_group) }
let(:unsynced_project) { create(:project) }
......@@ -30,21 +29,6 @@ RSpec.describe Geo::DesignRepositoryShardSyncWorker, :geo, :geo_fdw, :clean_gitl
create(:design, project: unsynced_project)
end
it 'performs Geo::DesignRepositorySyncWorker for each project' do
expect(Geo::DesignRepositorySyncWorker).to receive(:perform_async).twice.and_return(spy)
subject.perform(shard_name)
end
it 'performs Geo::DesignRepositorySyncWorker for designs where last attempt to sync failed' do
create(:geo_design_registry, :sync_failed, project: unsynced_project_in_restricted_group)
create(:geo_design_registry, :synced, project: unsynced_project)
expect(Geo::DesignRepositorySyncWorker).to receive(:perform_async).once.and_return(spy)
subject.perform(shard_name)
end
it 'does not perform Geo::DesignRepositorySyncWorker when shard becomes unhealthy' do
Gitlab::ShardHealthCache.update([])
......@@ -53,28 +37,6 @@ RSpec.describe Geo::DesignRepositoryShardSyncWorker, :geo, :geo_fdw, :clean_gitl
subject.perform(shard_name)
end
it 'performs Geo::DesignRepositorySyncWorker for designs updated recently' do
create(:geo_design_registry, project: unsynced_project_in_restricted_group)
create(:geo_design_registry, :synced, project: unsynced_project)
create(:geo_design_registry)
expect(Geo::DesignRepositorySyncWorker).to receive(:perform_async).twice.and_return(spy)
subject.perform(shard_name)
end
it 'does not schedule a job twice for the same project' do
scheduled_jobs = [
{ job_id: 1, project_id: unsynced_project.id },
{ job_id: 2, project_id: unsynced_project_in_restricted_group.id }
]
is_expected.to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(:once)
is_expected.not_to receive(:schedule_job)
Sidekiq::Testing.inline! { subject.perform(shard_name) }
end
it 'does not perform Geo::DesignRepositorySyncWorker when no geo database is configured' do
allow(Gitlab::Geo).to receive(:geo_database_configured?) { false }
......@@ -103,46 +65,152 @@ RSpec.describe Geo::DesignRepositoryShardSyncWorker, :geo, :geo_fdw, :clean_gitl
subject.perform(shard_name)
end
context 'multiple shards' do
it 'uses two loops to schedule jobs', :sidekiq_might_not_need_inline do
expect(subject).to receive(:schedule_jobs).twice.and_call_original
context 'when geo_design_registry_ssot_sync is disabled' do
before do
stub_feature_flags(geo_design_registry_ssot_sync: false)
end
it 'performs Geo::DesignRepositorySyncWorker for each project' do
expect(Geo::DesignRepositorySyncWorker).to receive(:perform_async).twice.and_return(spy)
subject.perform(shard_name)
end
it 'performs Geo::DesignRepositorySyncWorker for designs where last attempt to sync failed' do
create(:geo_design_registry, :sync_failed, project: unsynced_project_in_restricted_group)
create(:geo_design_registry, :synced, project: unsynced_project)
Gitlab::ShardHealthCache.update([shard_name, 'shard2', 'shard3', 'shard4', 'shard5'])
secondary.update!(repos_max_capacity: 5)
expect(Geo::DesignRepositorySyncWorker).to receive(:perform_async).once.and_return(spy)
subject.perform(shard_name)
end
it 'performs Geo::DesignRepositorySyncWorker for designs updated recently' do
create(:geo_design_registry, project: unsynced_project_in_restricted_group)
create(:geo_design_registry, :synced, project: unsynced_project)
create(:geo_design_registry)
expect(Geo::DesignRepositorySyncWorker).to receive(:perform_async).twice.and_return(spy)
subject.perform(shard_name)
end
it 'does not schedule a job twice for the same project' do
scheduled_jobs = [
{ job_id: 1, project_id: unsynced_project.id },
{ job_id: 2, project_id: unsynced_project_in_restricted_group.id }
]
is_expected.to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(:once)
is_expected.not_to receive(:schedule_job)
Sidekiq::Testing.inline! { subject.perform(shard_name) }
end
context 'multiple shards' do
it 'uses two loops to schedule jobs', :sidekiq_might_not_need_inline do
expect(subject).to receive(:schedule_jobs).twice.and_call_original
Gitlab::ShardHealthCache.update([shard_name, 'shard2', 'shard3', 'shard4', 'shard5'])
secondary.update!(repos_max_capacity: 5)
subject.perform(shard_name)
end
end
context 'when node has namespace restrictions', :request_store do
before do
secondary.update!(selective_sync_type: 'namespaces', namespaces: [restricted_group])
allow(::Gitlab::Geo).to receive(:current_node).and_call_original
Rails.cache.write(:current_node, secondary.to_json)
allow(::GeoNode).to receive(:current_node).and_return(secondary)
end
it 'does not perform Geo::DesignRepositorySyncWorker for projects that do not belong to selected namespaces to replicate' do
expect(Geo::DesignRepositorySyncWorker).to receive(:perform_async)
.with(unsynced_project_in_restricted_group.id)
.once
.and_return(spy)
subject.perform(shard_name)
end
it 'does not perform Geo::DesignRepositorySyncWorker for synced projects updated recently that do not belong to selected namespaces to replicate' do
create(:geo_design_registry, project: unsynced_project_in_restricted_group)
create(:geo_design_registry, project: unsynced_project)
expect(Geo::DesignRepositorySyncWorker).to receive(:perform_async)
.with(unsynced_project_in_restricted_group.id)
.once
.and_return(spy)
subject.perform(shard_name)
end
end
end
context 'when node has namespace restrictions', :request_store do
context 'when geo_design_registry_ssot_sync is enabled' do
before do
secondary.update!(selective_sync_type: 'namespaces', namespaces: [restricted_group])
stub_feature_flags(geo_design_registry_ssot_sync: true)
end
it 'performs Geo::DesignRepositorySyncWorker for each registry' do
create(:geo_design_registry, project: unsynced_project_in_restricted_group)
create(:geo_design_registry, project: unsynced_project)
expect(Geo::DesignRepositorySyncWorker).to receive(:perform_async).twice.and_return(spy)
allow(::Gitlab::Geo).to receive(:current_node).and_call_original
Rails.cache.write(:current_node, secondary.to_json)
allow(::GeoNode).to receive(:current_node).and_return(secondary)
subject.perform(shard_name)
end
it 'does not perform Geo::DesignRepositorySyncWorker for projects that do not belong to selected namespaces to replicate' do
expect(Geo::DesignRepositorySyncWorker).to receive(:perform_async)
.with(unsynced_project_in_restricted_group.id)
.once
.and_return(spy)
it 'performs Geo::DesignRepositorySyncWorker for designs where last attempt to sync failed' do
create(:geo_design_registry, :sync_failed, project: unsynced_project_in_restricted_group)
create(:geo_design_registry, :synced, project: unsynced_project)
expect(Geo::DesignRepositorySyncWorker).to receive(:perform_async).once.and_return(spy)
subject.perform(shard_name)
end
it 'does not perform Geo::DesignRepositorySyncWorker for synced projects updated recently that do not belong to selected namespaces to replicate' do
it 'performs Geo::DesignRepositorySyncWorker for designs updated recently' do
create(:geo_design_registry, project: unsynced_project_in_restricted_group)
create(:geo_design_registry, project: unsynced_project)
create(:geo_design_registry, :synced, project: unsynced_project)
create(:geo_design_registry)
expect(Geo::DesignRepositorySyncWorker).to receive(:perform_async)
.with(unsynced_project_in_restricted_group.id)
.once
.and_return(spy)
expect(Geo::DesignRepositorySyncWorker).to receive(:perform_async).twice.and_return(spy)
subject.perform(shard_name)
end
it 'does not schedule a job twice for the same project' do
create(:geo_design_registry, project: unsynced_project_in_restricted_group)
create(:geo_design_registry, project: unsynced_project)
scheduled_jobs = [
{ job_id: 1, project_id: unsynced_project.id },
{ job_id: 2, project_id: unsynced_project_in_restricted_group.id }
]
is_expected.to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(:once)
is_expected.not_to receive(:schedule_job)
Sidekiq::Testing.inline! { subject.perform(shard_name) }
end
context 'multiple shards' do
it 'uses two loops to schedule jobs', :sidekiq_might_not_need_inline do
expect(subject).to receive(:schedule_jobs).twice.and_call_original
Gitlab::ShardHealthCache.update([shard_name, 'shard2', 'shard3', 'shard4', 'shard5'])
secondary.update!(repos_max_capacity: 5)
create(:geo_design_registry, project: unsynced_project_in_restricted_group)
create(:geo_design_registry, project: unsynced_project)
subject.perform(shard_name)
end
end
end
end
end
......@@ -79,12 +79,14 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
job_artifact = create(:ci_job_artifact)
lfs_object = create(:lfs_object)
project = create(:project)
create(:design, project: project)
upload = create(:upload)
package_file = create(:conan_package_file, :conan_package)
expect(Geo::LfsObjectRegistry.where(lfs_object_id: lfs_object.id).count).to eq(0)
expect(Geo::JobArtifactRegistry.where(artifact_id: job_artifact.id).count).to eq(0)
expect(Geo::ProjectRegistry.where(project_id: project.id).count).to eq(0)
expect(Geo::DesignRegistry.where(project_id: project.id).count).to eq(0)
expect(Geo::UploadRegistry.where(file_id: upload.id).count).to eq(0)
expect(Geo::PackageFileRegistry.where(package_file_id: package_file.id).count).to eq(0)
......@@ -93,6 +95,7 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
expect(Geo::LfsObjectRegistry.where(lfs_object_id: lfs_object.id).count).to eq(1)
expect(Geo::JobArtifactRegistry.where(artifact_id: job_artifact.id).count).to eq(1)
expect(Geo::ProjectRegistry.where(project_id: project.id).count).to eq(1)
expect(Geo::DesignRegistry.where(project_id: project.id).count).to eq(1)
expect(Geo::UploadRegistry.where(file_id: upload.id).count).to eq(1)
expect(Geo::PackageFileRegistry.where(package_file_id: package_file.id).count).to eq(1)
end
......@@ -111,6 +114,7 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::LfsObjectRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::PackageFileRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::UploadRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::DesignRegistry, batch_size: 1000).and_call_original
expect(Geo::RegistryConsistencyService).not_to receive(:new).with(Geo::ProjectRegistry, batch_size: 1000)
......@@ -118,6 +122,28 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
end
end
context 'when geo_design_registry_ssot_sync is disabled' do
before do
stub_feature_flags(geo_design_registry_ssot_sync: false)
end
it 'returns false' do
expect(subject.perform).to be_falsey
end
it 'does not execute RegistryConsistencyService for designs' do
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::JobArtifactRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::LfsObjectRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::PackageFileRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::UploadRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::ProjectRegistry, batch_size: 1000).and_call_original
expect(Geo::RegistryConsistencyService).not_to receive(:new).with(Geo::DesignRegistry, batch_size: 1000)
subject.perform
end
end
context 'when the current Geo node is disabled or primary' do
before do
stub_primary_node
......
......@@ -382,4 +382,11 @@ FactoryBot.define do
)
end
end
factory :project_with_design, parent: :project do
after(:create) do |project|
issue = create(:issue, project: project)
create(:design, project: project, issue: issue)
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment