Create missing registries for untracked Job Artifacts

The new finder method `find_registry_differences` also
returns tracked IDs that should not be synced, in
anticipation of a future where RegistryConsistencyService
cleans up registries that should not exist.
parent 7ba3b7b9
...@@ -31,6 +31,33 @@ module Geo ...@@ -31,6 +31,33 @@ module Geo
Ci::JobArtifact.not_expired Ci::JobArtifact.not_expired
end end
# Returns untracked IDs as well as tracked IDs that are unused.
#
# Untracked IDs are model IDs that are supposed to be synced but don't yet
# have a registry entry.
#
# Unused tracked IDs are model IDs that are not supposed to be synced but
# already have a registry entry. For example:
#
# - orphaned registries
# - records that became excluded from selective sync
# - records that are in object storage, and `sync_object_storage` became
# disabled
#
# We compute both sets in this method to reduce the number of DB queries
# performed.
#
# @return [Array] the first element is an Array of untracked IDs, and the second element is an Array of tracked IDs that are unused
def find_registry_differences(range)
source_ids = job_artifacts(fdw: false).where(id: range).pluck(::Ci::JobArtifact.arel_table[:id]) # rubocop:disable CodeReuse/ActiveRecord
tracked_ids = Geo::JobArtifactRegistry.pluck_model_ids_in_range(range)
untracked_ids = source_ids - tracked_ids
unused_tracked_ids = tracked_ids - source_ids
[untracked_ids, unused_tracked_ids]
end
# Returns Geo::JobArtifactRegistry records that have never been synced. # Returns Geo::JobArtifactRegistry records that have never been synced.
# #
# Does not care about selective sync, because it considers the Registry # Does not care about selective sync, because it considers the Registry
...@@ -57,7 +84,7 @@ module Geo ...@@ -57,7 +84,7 @@ module Geo
# rubocop:enable CodeReuse/ActiveRecord # rubocop:enable CodeReuse/ActiveRecord
# Deprecated in favor of the process using # Deprecated in favor of the process using
# #find_missing_registry_ids and #find_never_synced_registries # #find_registry_differences and #find_never_synced_registries
# #
# Find limited amount of non replicated job artifacts. # Find limited amount of non replicated job artifacts.
# #
...@@ -112,12 +139,12 @@ module Geo ...@@ -112,12 +139,12 @@ module Geo
private private
def job_artifacts def job_artifacts(fdw: true)
local_storage_only? ? all_job_artifacts.with_files_stored_locally : all_job_artifacts local_storage_only?(fdw: fdw) ? all_job_artifacts(fdw: fdw).with_files_stored_locally : all_job_artifacts(fdw: fdw)
end end
def all_job_artifacts def all_job_artifacts(fdw: true)
current_node.job_artifacts current_node(fdw: fdw).job_artifacts
end end
def registries_for_job_artifacts def registries_for_job_artifacts
......
...@@ -83,7 +83,7 @@ module Geo ...@@ -83,7 +83,7 @@ module Geo
# rubocop:enable CodeReuse/ActiveRecord # rubocop:enable CodeReuse/ActiveRecord
# Deprecated in favor of the process using # Deprecated in favor of the process using
# #find_missing_registry_ids and #find_never_synced_registries # #find_registry_differences and #find_never_synced_registries
# #
# Find limited amount of non replicated lfs objects. # Find limited amount of non replicated lfs objects.
# #
......
...@@ -20,7 +20,7 @@ module EE ...@@ -20,7 +20,7 @@ module EE
DAST_REPORT_TYPES = %w[dast].freeze DAST_REPORT_TYPES = %w[dast].freeze
scope :not_expired, -> { where('expire_at IS NULL OR expire_at > ?', Time.current) } scope :not_expired, -> { where('expire_at IS NULL OR expire_at > ?', Time.current) }
scope :project_id_in, ->(ids) { joins(:project).merge(::Project.id_in(ids)) } scope :project_id_in, ->(ids) { where(project_id: ids) }
scope :with_files_stored_remotely, -> { where(file_store: ::JobArtifactUploader::Store::REMOTE) } scope :with_files_stored_remotely, -> { where(file_store: ::JobArtifactUploader::Store::REMOTE) }
scope :security_reports, -> do scope :security_reports, -> do
......
...@@ -9,11 +9,25 @@ class Geo::JobArtifactRegistry < Geo::BaseRegistry ...@@ -9,11 +9,25 @@ class Geo::JobArtifactRegistry < Geo::BaseRegistry
scope :never, -> { where(success: false, retry_count: nil) } scope :never, -> { where(success: false, retry_count: nil) }
def self.failed def self.failed
if Feature.enabled?(:geo_job_artifact_registry_ssot_sync) if registry_consistency_worker_enabled?
where(success: false).where.not(retry_count: nil) where(success: false).where.not(retry_count: nil)
else else
# Would do `super` except it doesn't work with an included scope # Would do `super` except it doesn't work with an included scope
where(success: false) where(success: false)
end end
end end
def self.registry_consistency_worker_enabled?
Feature.enabled?(:geo_job_artifact_registry_ssot_sync)
end
def self.finder_class
::Geo::JobArtifactRegistryFinder
end
# When false, RegistryConsistencyService will frequently check the end of the
# table to quickly handle new replicables.
def self.has_create_events?
false
end
end end
...@@ -221,7 +221,20 @@ class GeoNode < ApplicationRecord ...@@ -221,7 +221,20 @@ class GeoNode < ApplicationRecord
def job_artifacts def job_artifacts
return Ci::JobArtifact.all unless selective_sync? return Ci::JobArtifact.all unless selective_sync?
Ci::JobArtifact.project_id_in(projects) query = Ci::JobArtifact.project_id_in(projects).select(:id)
cte = Gitlab::SQL::CTE.new(:restricted_job_artifacts, query)
job_artifact_table = Ci::JobArtifact.arel_table
inner_join_restricted_job_artifacts =
cte.table
.join(job_artifact_table, Arel::Nodes::InnerJoin)
.on(cte.table[:id].eq(job_artifact_table[:id]))
.join_sources
Ci::JobArtifact
.with(cte.to_arel)
.from(cte.table)
.joins(inner_join_restricted_job_artifacts)
end end
def container_repositories def container_repositories
......
...@@ -12,7 +12,7 @@ module Geo ...@@ -12,7 +12,7 @@ module Geo
end end
def find_unsynced_jobs(batch_size:) def find_unsynced_jobs(batch_size:)
if Feature.enabled?(:geo_job_artifact_registry_ssot_sync) if Geo::JobArtifactRegistry.registry_consistency_worker_enabled?
convert_registry_relation_to_job_args( convert_registry_relation_to_job_args(
registry_finder.find_never_synced_registries(find_batch_params(batch_size)) registry_finder.find_never_synced_registries(find_batch_params(batch_size))
) )
......
...@@ -16,7 +16,7 @@ module Geo ...@@ -16,7 +16,7 @@ module Geo
feature_category :geo_replication feature_category :geo_replication
# This is probably not the best place to "register" replicables for this functionality # This is probably not the best place to "register" replicables for this functionality
REGISTRY_CLASSES = [Geo::LfsObjectRegistry].freeze REGISTRY_CLASSES = [Geo::JobArtifactRegistry, Geo::LfsObjectRegistry].freeze
BATCH_SIZE = 1000 BATCH_SIZE = 1000
# @return [Boolean] true if at least 1 registry was created, else false # @return [Boolean] true if at least 1 registry was created, else false
...@@ -40,13 +40,15 @@ module Geo ...@@ -40,13 +40,15 @@ module Geo
def backfill def backfill
log_info("Backfill registries", registry_classes: registry_classes.map(&:to_s), batch_size: BATCH_SIZE) log_info("Backfill registries", registry_classes: registry_classes.map(&:to_s), batch_size: BATCH_SIZE)
registry_classes.any? do |registry_class| registry_classes.map { |registry_class| registry_service(registry_class).execute }.any?
Geo::RegistryConsistencyService.new(registry_class, batch_size: BATCH_SIZE).execute end
end
def registry_service(registry_class)
Geo::RegistryConsistencyService.new(registry_class, batch_size: BATCH_SIZE)
end end
def registry_classes def registry_classes
@registry_classes = REGISTRY_CLASSES.select(&:registry_consistency_worker_enabled?) @registry_classes ||= REGISTRY_CLASSES.select(&:registry_consistency_worker_enabled?)
end end
end end
end end
......
...@@ -229,6 +229,157 @@ describe Geo::JobArtifactRegistryFinder, :geo_fdw do ...@@ -229,6 +229,157 @@ describe Geo::JobArtifactRegistryFinder, :geo_fdw do
end end
context 'finds all the things' do context 'finds all the things' do
describe '#find_registry_differences' do
context 'untracked IDs' do
before do
create(:geo_job_artifact_registry, artifact_id: job_artifact_synced_project.id)
create(:geo_job_artifact_registry, :failed, artifact_id: job_artifact_broken_storage_1.id)
create(:geo_job_artifact_registry, artifact_id: job_artifact_remote_unsynced_project.id)
create(:geo_job_artifact_registry, artifact_id: job_artifact_expired_broken_storage.id)
end
it 'includes Job Artifact IDs without an entry on the tracking database' do
untracked_ids, _ = subject.find_registry_differences(Ci::JobArtifact.first.id..Ci::JobArtifact.last.id)
expect(untracked_ids).to match_array(
[job_artifact_unsynced_project.id, job_artifact_remote_synced_project.id,
job_artifact_broken_storage_2.id, job_artifact_expired_synced_project.id,
job_artifact_remote_broken_storage.id])
end
it 'excludes Job Artifacts outside the ID range' do
untracked_ids, _ = subject.find_registry_differences(job_artifact_unsynced_project.id..job_artifact_broken_storage_2.id)
expect(untracked_ids).to match_array(
[job_artifact_unsynced_project.id, job_artifact_broken_storage_2.id])
end
context 'with selective sync by namespace' do
let(:secondary) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) }
it 'excludes Job Artifacts that are not in selectively synced projects' do
untracked_ids, _ = subject.find_registry_differences(Ci::JobArtifact.first.id..Ci::JobArtifact.last.id)
expect(untracked_ids).to match_array([job_artifact_expired_synced_project.id, job_artifact_remote_synced_project.id])
end
end
context 'with selective sync by shard' do
let(:secondary) { create(:geo_node, selective_sync_type: 'shards', selective_sync_shards: ['broken']) }
it 'excludes Job Artifacts that are not in selectively synced projects' do
untracked_ids, _ = subject.find_registry_differences(Ci::JobArtifact.first.id..Ci::JobArtifact.last.id)
expect(untracked_ids).to match_array([job_artifact_broken_storage_2.id, job_artifact_remote_broken_storage.id])
end
end
context 'with object storage sync disabled' do
let(:secondary) { create(:geo_node, :local_storage_only) }
it 'excludes Job Artifacts in object storage' do
untracked_ids, _ = subject.find_registry_differences(Ci::JobArtifact.first.id..Ci::JobArtifact.last.id)
expect(untracked_ids).to match_array(
[job_artifact_unsynced_project.id, job_artifact_broken_storage_2.id,
job_artifact_expired_synced_project.id])
end
end
end
context 'unused tracked IDs' do
context 'with an orphaned registry' do
let!(:orphaned) { create(:geo_job_artifact_registry, artifact_id: 1234567) }
it 'includes tracked IDs that do not exist in the model table' do
_, unused_tracked_ids = subject.find_registry_differences(1234567..1234567)
expect(unused_tracked_ids).to match_array([1234567])
end
it 'excludes IDs outside the ID range' do
_, unused_tracked_ids = subject.find_registry_differences(1..1000)
expect(unused_tracked_ids).to be_empty
end
end
context 'with selective sync by namespace' do
let(:secondary) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) }
context 'with a tracked Job Artifact' do
it 'includes tracked Job Artifact IDs that exist but are not in a selectively synced project' do
create(:geo_job_artifact_registry, artifact_id: job_artifact_synced_project.id)
create(:geo_job_artifact_registry, artifact_id: job_artifact_unsynced_project.id)
_, unused_tracked_ids = subject.find_registry_differences(job_artifact_synced_project.id..job_artifact_unsynced_project.id)
expect(unused_tracked_ids).to match_array([job_artifact_unsynced_project.id])
end
end
context 'without a tracked Job Artifact' do
it 'returns empty' do
_, unused_tracked_ids = subject.find_registry_differences(job_artifact_synced_project.id..job_artifact_unsynced_project.id)
expect(unused_tracked_ids).to be_empty
end
end
end
context 'with selective sync by shard' do
let(:secondary) { create(:geo_node, selective_sync_type: 'shards', selective_sync_shards: ['broken']) }
context 'with a tracked Job Artifact' do
it 'includes tracked Job Artifact IDs that exist but are not in a selectively synced project' do
create(:geo_job_artifact_registry, artifact_id: job_artifact_synced_project.id)
create(:geo_job_artifact_registry, artifact_id: job_artifact_broken_storage_1.id)
_, unused_tracked_ids = subject.find_registry_differences(job_artifact_synced_project.id..job_artifact_broken_storage_1.id)
expect(unused_tracked_ids).to match_array([job_artifact_synced_project.id])
end
end
context 'without a tracked Job Artifact' do
it 'returns empty' do
_, unused_tracked_ids = subject.find_registry_differences(job_artifact_synced_project.id..job_artifact_broken_storage_1.id)
expect(unused_tracked_ids).to be_empty
end
end
end
context 'with object storage sync disabled' do
let(:secondary) { create(:geo_node, :local_storage_only) }
context 'with a tracked Job Artifact' do
context 'in object storage' do
it 'includes tracked Job Artifact IDs that are in object storage' do
create(:geo_job_artifact_registry, artifact_id: job_artifact_remote_synced_project.id)
range = job_artifact_remote_synced_project.id..job_artifact_remote_synced_project.id
_, unused_tracked_ids = subject.find_registry_differences(range)
expect(unused_tracked_ids).to match_array([job_artifact_remote_synced_project.id])
end
end
context 'not in object storage' do
it 'excludes tracked Job Artifact IDs that are not in object storage' do
create(:geo_lfs_object_registry, lfs_object_id: job_artifact_synced_project.id)
range = job_artifact_synced_project.id..job_artifact_synced_project.id
_, unused_tracked_ids = subject.find_registry_differences(range)
expect(unused_tracked_ids).to be_empty
end
end
end
end
end
end
describe '#find_never_synced_registries' do describe '#find_never_synced_registries' do
let!(:registry_job_artifact_1) { create(:geo_job_artifact_registry, :never_synced, artifact_id: job_artifact_synced_project.id) } let!(:registry_job_artifact_1) { create(:geo_job_artifact_registry, :never_synced, artifact_id: job_artifact_synced_project.id) }
let!(:registry_job_artifact_2) { create(:geo_job_artifact_registry, :never_synced, artifact_id: job_artifact_unsynced_project.id) } let!(:registry_job_artifact_2) { create(:geo_job_artifact_registry, :never_synced, artifact_id: job_artifact_unsynced_project.id) }
...@@ -236,14 +387,13 @@ describe Geo::JobArtifactRegistryFinder, :geo_fdw do ...@@ -236,14 +387,13 @@ describe Geo::JobArtifactRegistryFinder, :geo_fdw do
let!(:registry_job_artifact_4) { create(:geo_job_artifact_registry, :failed, artifact_id: job_artifact_broken_storage_2.id) } let!(:registry_job_artifact_4) { create(:geo_job_artifact_registry, :failed, artifact_id: job_artifact_broken_storage_2.id) }
let!(:registry_job_artifact_remote_1) { create(:geo_job_artifact_registry, :never_synced, artifact_id: job_artifact_remote_synced_project.id) } let!(:registry_job_artifact_remote_1) { create(:geo_job_artifact_registry, :never_synced, artifact_id: job_artifact_remote_synced_project.id) }
it 'returns registries for LFS objects that have never been synced' do it 'returns registries for Job Artifacts that have never been synced' do
registries = subject.find_never_synced_registries(batch_size: 10) registries = subject.find_never_synced_registries(batch_size: 10)
expect(registries).to match_ids(registry_job_artifact_1, registry_job_artifact_2, registry_job_artifact_remote_1) expect(registries).to match_ids(registry_job_artifact_1, registry_job_artifact_2, registry_job_artifact_remote_1)
end end
end end
describe '#find_unsynced' do describe '#find_unsynced' do
before do before do
create(:geo_job_artifact_registry, artifact_id: job_artifact_synced_project.id, success: false) create(:geo_job_artifact_registry, artifact_id: job_artifact_synced_project.id, success: false)
......
...@@ -747,22 +747,18 @@ describe GeoNode, :request_store, :geo, type: :model do ...@@ -747,22 +747,18 @@ describe GeoNode, :request_store, :geo, type: :model do
describe '#job_artifacts' do describe '#job_artifacts' do
context 'when selective sync is enabled' do context 'when selective sync is enabled' do
it 'applies project restriction' do it 'applies a CTE statement' do
node.update!(selective_sync_type: 'namespaces') node.update!(selective_sync_type: 'namespaces')
expect(Ci::JobArtifact).to receive(:project_id_in).once.and_call_original expect(node.job_artifacts.to_sql).to match(/WITH .+restricted_job_artifacts/)
node.job_artifacts
end end
end end
context 'when selective sync is disabled' do context 'when selective sync is disabled' do
it 'does not apply project restriction' do it 'doest not apply a CTE statement' do
node.update!(selective_sync_type: nil) node.update!(selective_sync_type: nil)
expect(Ci::JobArtifact).not_to receive(:project_id_in) expect(node.job_artifacts.to_sql).not_to match(/WITH .+restricted_job_artifacts/)
node.job_artifacts
end end
end end
end end
......
...@@ -79,13 +79,20 @@ describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do ...@@ -79,13 +79,20 @@ describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
# Somewhat of an integration test # Somewhat of an integration test
it 'creates missing registries for each registry class' do it 'creates missing registries for each registry class' do
lfs_object = create(:lfs_object) lfs_object = create(:lfs_object)
job_artifact = create(:ci_job_artifact)
expect do expect(Geo::LfsObjectRegistry.where(lfs_object_id: lfs_object.id).count).to eq(0)
subject.perform expect(Geo::JobArtifactRegistry.where(artifact_id: job_artifact.id).count).to eq(0)
end.to change { Geo::LfsObjectRegistry.where(lfs_object_id: lfs_object.id).count }.from(0).to(1)
subject.perform
expect(Geo::LfsObjectRegistry.where(lfs_object_id: lfs_object.id).count).to eq(1)
expect(Geo::JobArtifactRegistry.where(artifact_id: job_artifact.id).count).to eq(1)
end end
context 'when geo_lfs_registry_ssot_sync is disabled' do context 'when geo_lfs_registry_ssot_sync is disabled' do
let_it_be(:lfs_object) { create(:lfs_object) }
before do before do
stub_feature_flags(geo_lfs_registry_ssot_sync: false) stub_feature_flags(geo_lfs_registry_ssot_sync: false)
end end
...@@ -94,8 +101,30 @@ describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do ...@@ -94,8 +101,30 @@ describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
expect(subject.perform).to be_falsey expect(subject.perform).to be_falsey
end end
it 'does not execute RegistryConsistencyService' do it 'does not execute RegistryConsistencyService for LFS objects' do
expect(Geo::RegistryConsistencyService).not_to receive(:new) allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::JobArtifactRegistry, batch_size: 1000).and_call_original
expect(Geo::RegistryConsistencyService).not_to receive(:new).with(Geo::LfsObjectRegistry, batch_size: 1000)
subject.perform
end
end
context 'when geo_job_artifact_registry_ssot_sync is disabled' do
let_it_be(:job_artifact) { create(:ci_job_artifact) }
before do
stub_feature_flags(geo_job_artifact_registry_ssot_sync: false)
end
it 'returns false' do
expect(subject.perform).to be_falsey
end
it 'does not execute RegistryConsistencyService for Job Artifacts' do
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::LfsObjectRegistry, batch_size: 1000).and_call_original
expect(Geo::RegistryConsistencyService).not_to receive(:new).with(Geo::JobArtifactRegistry, batch_size: 1000)
subject.perform subject.perform
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment