Commit 442bb2f9 authored by Valery Sizov's avatar Valery Sizov Committed by Michael Kozono

Add PackageFileRegistry to RegistryConsistencyWorker

Implement find_registry_differences for PackageFileRegistry.
Hook up RegistryConsistencyWorker with Package Files.
parent 1fbf76cc
......@@ -51,4 +51,18 @@ class Geo::BaseRegistry < Geo::TrackingBase
def model_record_id
read_attribute(self.class::MODEL_FOREIGN_KEY)
end
def self.find_registry_differences(range)
source_ids = self::MODEL_CLASS
.replicables_for_geo_node
.id_in(range)
.pluck(self::MODEL_CLASS.arel_table[:id])
tracked_ids = self.pluck_model_ids_in_range(range)
untracked_ids = source_ids - tracked_ids
unused_tracked_ids = tracked_ids - source_ids
[untracked_ids, unused_tracked_ids]
end
end
......@@ -29,6 +29,10 @@ class Geo::JobArtifactRegistry < Geo::BaseRegistry
::Geo::FileRegistryRemovalWorker
end
def self.find_registry_differences(range)
finder_class.new(current_node_id: Gitlab::Geo.current_node.id).find_registry_differences(range)
end
# When false, RegistryConsistencyService will frequently check the end of the
# table to quickly handle new replicables.
def self.has_create_events?
......
......@@ -25,6 +25,10 @@ class Geo::LfsObjectRegistry < Geo::BaseRegistry
::Geo::FileRegistryRemovalWorker
end
def self.find_registry_differences(range)
finder_class.new(current_node_id: Gitlab::Geo.current_node.id).find_registry_differences(range)
end
# If false, RegistryConsistencyService will frequently check the end of the
# table to quickly handle new replicables.
def self.has_create_events?
......
......@@ -4,6 +4,7 @@ class Geo::PackageFileRegistry < Geo::BaseRegistry
include ::Delay
include ShaAttribute
MODEL_CLASS = ::Packages::PackageFile
MODEL_FOREIGN_KEY = :package_file_id
def self.declarative_policy_class
......@@ -81,6 +82,10 @@ class Geo::PackageFileRegistry < Geo::BaseRegistry
STATE_VALUES[state_string]
end
def self.has_create_events?
true
end
# Override state machine failed! event method to record a failure message at
# the same time.
#
......@@ -92,4 +97,9 @@ class Geo::PackageFileRegistry < Geo::BaseRegistry
super()
end
def self.delete_for_model_ids(package_file_ids)
# TODO: https://gitlab.com/gitlab-org/gitlab/-/issues/222635
[]
end
end
......@@ -26,6 +26,10 @@ class Geo::UploadRegistry < Geo::BaseRegistry
::Geo::FileRegistryRemovalWorker
end
def self.find_registry_differences(range)
finder_class.new(current_node_id: Gitlab::Geo.current_node.id).find_registry_differences(range)
end
# If false, RegistryConsistencyService will frequently check the end of the
# table to quickly handle new replicables.
def self.has_create_events?
......
......@@ -40,6 +40,25 @@ class Packages::PackageFile < ApplicationRecord
update_project_statistics project_statistics_name: :packages_size
def self.replicables_for_geo_node
return self.all unless Gitlab::Geo.current_node.selective_sync?
query = ::Packages::Package.where(project_id: Gitlab::Geo.current_node.projects).select(:id)
cte = Gitlab::SQL::CTE.new(:restricted_packages, query)
replicable_table = self.arel_table
inner_join_restricted_packages =
cte.table
.join(replicable_table, Arel::Nodes::InnerJoin)
.on(cte.table[:id].eq(replicable_table[:package_id]))
.join_sources
self
.with(cte.to_arel)
.from(cte.table)
.joins(inner_join_restricted_packages)
end
def update_file_metadata
# The file.object_store is set during `uploader.store!`
# which happens after object is inserted/updated
......
......@@ -40,14 +40,6 @@ module Geo
"registry_consistency:#{registry_class.name.parameterize}"
end
def find_registry_differences(range)
finder.find_registry_differences(range)
end
def finder
@finder ||= registry_class.finder_class.new(current_node_id: Gitlab::Geo.current_node.id)
end
def handle_differences_in_range(range)
untracked, unused = find_registry_differences(range)
......@@ -74,6 +66,10 @@ module Geo
registry_class.delete_for_model_ids(delete_unused_in_range)
end
def find_registry_differences(range)
registry_class.find_registry_differences(range)
end
# This hack is used to sync new files soon after they are created.
#
# This is not needed for replicables that have already implemented
......
......@@ -153,7 +153,7 @@
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent:
:idempotent: true
:tags: []
- :name: cronjob:geo_sidekiq_cron_config
:feature_category: :geo_replication
......
......@@ -5,7 +5,7 @@ module Geo
# Iterates over syncable records and creates the corresponding registry
# records which are missing. Then, the workers that actually schedule the
# sync work only have to query the registry table for never-synced records.
class RegistryConsistencyWorker # rubocop:disable Scalability/IdempotentWorker
class RegistryConsistencyWorker
include ApplicationWorker
prepend Reenqueuer
include ::Gitlab::Geo::LogHelpers
......@@ -15,10 +15,17 @@ module Geo
feature_category :geo_replication
# This is probably not the best place to "register" replicables for this functionality
REGISTRY_CLASSES = [Geo::JobArtifactRegistry, Geo::LfsObjectRegistry, Geo::UploadRegistry].freeze
REGISTRY_CLASSES = [
Geo::JobArtifactRegistry,
Geo::LfsObjectRegistry,
Geo::UploadRegistry,
Geo::PackageFileRegistry
].freeze
BATCH_SIZE = 1000
idempotent!
# @return [Boolean] true if at least 1 registry was created, else false
def perform
return false unless registry_classes.any? # May as well remove this check after one registry no longer feature flags this
......
# frozen_string_literal: true
FactoryBot.define do
factory :package_file_registry, class: 'Geo::PackageFileRegistry' do
factory :geo_package_file_registry, class: 'Geo::PackageFileRegistry' do
association :package_file, factory: [:package_file, :npm]
state { Geo::PackageFileRegistry.state_value(:pending) }
......
......@@ -249,6 +249,8 @@ FactoryBot.define do
trait(:checksum_failure) do
verification_failure { 'Could not calculate the checksum' }
end
factory :package_file_with_file, traits: [:jar]
end
factory :maven_metadatum, class: 'Packages::Maven::Metadatum' do
......
......@@ -3,5 +3,5 @@
require 'spec_helper'
RSpec.describe Geo::PackageFileRegistryFinder do
it_behaves_like 'a framework registry finder', :package_file_registry
it_behaves_like 'a framework registry finder', :geo_package_file_registry
end
......@@ -3,5 +3,5 @@
require 'spec_helper'
RSpec.describe Resolvers::Geo::PackageFileRegistriesResolver do
it_behaves_like 'a Geo registries resolver', :package_file_registry
it_behaves_like 'a Geo registries resolver', :geo_package_file_registry
end
......@@ -3,10 +3,80 @@
require 'spec_helper'
RSpec.describe Geo::PackageFileRegistry, :geo, type: :model do
include ::EE::GeoHelpers
it_behaves_like 'a BulkInsertSafe model', Geo::PackageFileRegistry do
let(:valid_items_for_bulk_insertion) { build_list(:package_file_registry, 10, created_at: Time.zone.now) }
let(:valid_items_for_bulk_insertion) { build_list(:geo_package_file_registry, 10, created_at: Time.zone.now) }
let(:invalid_items_for_bulk_insertion) { [] } # class does not have any validations defined
end
include_examples 'a Geo framework registry'
describe '.find_registry_differences' do
let(:synced_group) { create(:group) }
let(:synced_subgroup) { create(:group, parent: synced_group) }
let(:unsynced_group) { create(:group) }
let(:synced_project) { create(:project, group: synced_group) }
let(:synced_project_in_nested_group) { create(:project, group: synced_subgroup) }
let(:project_on_broken_shard) { create(:project, :broken_storage, group: unsynced_group) }
let!(:package_file) { create(:conan_package_file, :conan_package) }
subject { described_class }
before do
stub_current_geo_node(secondary)
create(:geo_package_file_registry, package_file_id: package_file.id)
create(:geo_package_file_registry, package_file_id: non_existing_record_id)
end
context 'with selective sync disabled' do
let(:secondary) { create(:geo_node) }
it 'finds unused and untracked items' do
package_file1 = create(:conan_package_file, :conan_package)
range = 1..non_existing_record_id
untracked, unused = subject.find_registry_differences(range)
expect(untracked).to match_array([package_file1.id])
expect(unused).to match_array([non_existing_record_id])
end
end
context 'with selective sync by shard' do
let(:secondary) { create(:geo_node, selective_sync_type: 'shards', selective_sync_shards: ['broken']) }
it 'finds unused and untracked items' do
package = create(:conan_package, without_package_files: true, project: project_on_broken_shard)
package_file1 = create(:conan_package_file, :conan_package, package: package)
range = 1..non_existing_record_id
untracked, unused = subject.find_registry_differences(range)
expect(untracked).to match_array([package_file1.id])
expect(unused).to match_array([non_existing_record_id, package_file.id])
end
end
context 'with selective sync by namespace' do
let(:secondary) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) }
it 'finds unused and untracked items' do
package = create(:conan_package, without_package_files: true, project: synced_project)
package_file1 = create(:conan_package_file, :conan_package, package: package)
range = 1..non_existing_record_id
untracked, unused = subject.find_registry_differences(range)
expect(untracked).to match_array([package_file1.id])
expect(unused).to match_array([package_file.id, non_existing_record_id])
end
end
end
end
......@@ -1252,9 +1252,9 @@ RSpec.describe GeoNodeStatus, :geo, :geo_fdw do
describe 'package files secondary counters' do
context 'when package registries available' do
before do
create(:package_file_registry, :failed)
create(:package_file_registry, :failed)
create(:package_file_registry, :synced)
create(:geo_package_file_registry, :failed)
create(:geo_package_file_registry, :failed)
create(:geo_package_file_registry, :synced)
end
it 'returns the right number of failed and synced repos' do
......
......@@ -3,7 +3,7 @@
require 'spec_helper'
RSpec.describe Geo::RegistryPolicy do
let!(:registry) { create(:package_file_registry) }
let!(:registry) { create(:geo_package_file_registry) }
subject(:policy) { described_class.new(current_user, registry) }
......
......@@ -7,5 +7,5 @@ RSpec.describe Geo::PackageFileReplicator do
include_examples 'a blob replicator'
include_examples 'secondary counters', :package_file_registry
include_examples 'secondary counters', :geo_package_file_registry
end
......@@ -6,7 +6,7 @@ RSpec.describe 'Gets registries' do
it_behaves_like 'gets registries for', {
field_name: 'packageFileRegistries',
registry_class_name: 'PackageFileRegistry',
registry_factory: :package_file_registry,
registry_factory: :geo_package_file_registry,
registry_foreign_key_field_name: 'packageFileId'
}
end
......@@ -14,7 +14,7 @@ RSpec.describe Geo::BlobVerificationSecondaryService, :geo do
describe '#execute' do
let_it_be(:package_file) { create(:conan_package_file, :conan_recipe_file, verification_checksum: '62fc1ec4ce60') }
let_it_be(:registry) { create(:package_file_registry, :synced, package_file: package_file) }
let_it_be(:registry) { create(:geo_package_file_registry, :synced, package_file: package_file) }
subject(:service) { described_class.new(package_file.replicator) }
......
......@@ -11,11 +11,19 @@ RSpec.describe Geo::RegistryConsistencyService, :geo, :use_clean_rails_memory_st
stub_current_geo_node(secondary)
end
::Geo::Secondary::RegistryConsistencyWorker::REGISTRY_CLASSES.each do |klass|
def model_class_factory_name(model_class)
if model_class == ::Packages::PackageFile
:package_file_with_file
else
model_class.underscore.tr('/', '_').to_sym
end
end
shared_examples 'registry consistency service' do |klass|
let(:registry_class) { klass }
let(:registry_class_factory) { registry_class.underscore.tr('/', '_').to_sym }
let(:model_class) { registry_class::MODEL_CLASS }
let(:model_class_factory) { model_class.underscore.tr('/', '_').to_sym }
let(:model_class_factory) { model_class_factory_name(model_class) }
let(:model_foreign_key) { registry_class::MODEL_FOREIGN_KEY }
let(:batch_size) { 2 }
......@@ -38,10 +46,6 @@ RSpec.describe Geo::RegistryConsistencyService, :geo, :use_clean_rails_memory_st
expect(registry_class).to respond_to(:delete_for_model_ids)
end
it 'responds to .finder_class' do
expect(registry_class).to respond_to(:finder_class)
end
it 'responds to .has_create_events?' do
expect(registry_class).to respond_to(:has_create_events?)
end
......@@ -148,79 +152,81 @@ RSpec.describe Geo::RegistryConsistencyService, :geo, :use_clean_rails_memory_st
end
end
context 'when there are unused registries' do
context 'with no replicable records' do
let(:records) { create_list(model_class_factory, batch_size) }
let(:unused_model_ids) { records.map(&:id) }
unless klass == Geo::PackageFileRegistry
context 'when there are unused registries' do
context 'with no replicable records' do
let(:records) { create_list(model_class_factory, batch_size) }
let(:unused_model_ids) { records.map(&:id) }
let!(:registries) do
records.map do |record|
create(registry_class_factory, model_foreign_key => record.id)
let!(:registries) do
records.map do |record|
create(registry_class_factory, model_foreign_key => record.id)
end
end
end
before do
model_class.where(id: unused_model_ids).delete_all
end
before do
model_class.where(id: unused_model_ids).delete_all
end
it 'deletes unused registries', :sidekiq_inline do
subject.execute
it 'deletes unused registries', :sidekiq_inline do
subject.execute
expect(registry_class.where(model_foreign_key => unused_model_ids)).to be_empty
end
expect(registry_class.where(model_foreign_key => unused_model_ids)).to be_empty
end
it 'returns truthy' do
expect(subject.execute).to be_truthy
it 'returns truthy' do
expect(subject.execute).to be_truthy
end
end
end
context 'when the unused registry foreign key ids are lower than the first replicable model id' do
let(:records) { create_list(model_class_factory, batch_size) }
let(:unused_registry_ids) { [records.first].map(&:id) }
context 'when the unused registry foreign key ids are lower than the first replicable model id' do
let(:records) { create_list(model_class_factory, batch_size) }
let(:unused_registry_ids) { [records.first].map(&:id) }
let!(:registries) do
records.map do |record|
create(registry_class_factory, model_foreign_key => record.id)
let!(:registries) do
records.map do |record|
create(registry_class_factory, model_foreign_key => record.id)
end
end
end
before do
model_class.where(id: unused_registry_ids).delete_all
end
before do
model_class.where(id: unused_registry_ids).delete_all
end
it 'deletes unused registries', :sidekiq_inline do
subject.execute
it 'deletes unused registries', :sidekiq_inline do
subject.execute
expect(registry_class.where(model_foreign_key => unused_registry_ids)).to be_empty
end
expect(registry_class.where(model_foreign_key => unused_registry_ids)).to be_empty
end
it 'returns truthy' do
expect(subject.execute).to be_truthy
it 'returns truthy' do
expect(subject.execute).to be_truthy
end
end
end
context 'when the unused registry foreign key ids are greater than the last replicable model id' do
let(:records) { create_list(model_class_factory, batch_size) }
let(:unused_registry_ids) { [records.last].map(&:id) }
context 'when the unused registry foreign key ids are greater than the last replicable model id' do
let(:records) { create_list(model_class_factory, batch_size) }
let(:unused_registry_ids) { [records.last].map(&:id) }
let!(:registries) do
records.map do |record|
create(registry_class_factory, model_foreign_key => record.id)
let!(:registries) do
records.map do |record|
create(registry_class_factory, model_foreign_key => record.id)
end
end
end
before do
model_class.where(id: unused_registry_ids).delete_all
end
before do
model_class.where(id: unused_registry_ids).delete_all
end
it 'deletes unused registries', :sidekiq_inline do
subject.execute
it 'deletes unused registries', :sidekiq_inline do
subject.execute
expect(registry_class.where(model_foreign_key => unused_registry_ids)).to be_empty
end
expect(registry_class.where(model_foreign_key => unused_registry_ids)).to be_empty
end
it 'returns truthy' do
expect(subject.execute).to be_truthy
it 'returns truthy' do
expect(subject.execute).to be_truthy
end
end
end
end
......@@ -258,4 +264,8 @@ RSpec.describe Geo::RegistryConsistencyService, :geo, :use_clean_rails_memory_st
end
end
end
::Geo::Secondary::RegistryConsistencyWorker::REGISTRY_CLASSES.each do |klass|
it_behaves_like 'registry consistency service', klass
end
end
# frozen_string_literal: true
shared_examples 'a Geo framework registry' do
let(:registry_class_factory) { described_class.underscore.tr('/', '_').sub('geo_', '').to_sym }
let(:registry_class_factory) { described_class.underscore.tr('/', '_').to_sym }
let!(:failed_item1) { create(registry_class_factory, :failed) }
let!(:failed_item2) { create(registry_class_factory, :failed) }
let!(:unsynced_item1) { create(registry_class_factory) }
let!(:unsynced_item2) { create(registry_class_factory) }
context 'finders' do
let!(:failed_item1) { create(registry_class_factory, :failed) }
let!(:failed_item2) { create(registry_class_factory, :failed) }
let!(:unsynced_item1) { create(registry_class_factory) }
let!(:unsynced_item2) { create(registry_class_factory) }
describe '.find_unsynced_registries' do
it 'returns unsynced items' do
result = described_class.find_unsynced_registries(batch_size: 10)
describe '.find_unsynced_registries' do
it 'returns unsynced items' do
result = described_class.find_unsynced_registries(batch_size: 10)
expect(result).to include(unsynced_item1, unsynced_item2)
end
expect(result).to include(unsynced_item1, unsynced_item2)
end
it 'returns unsynced items except some specific item ID' do
except_id = unsynced_item1.model_record_id
it 'returns unsynced items except some specific item ID' do
except_id = unsynced_item1.model_record_id
result = described_class.find_unsynced_registries(batch_size: 10, except_ids: [except_id])
result = described_class.find_unsynced_registries(batch_size: 10, except_ids: [except_id])
expect(result).to include(unsynced_item2)
expect(result).not_to include(unsynced_item1)
expect(result).to include(unsynced_item2)
expect(result).not_to include(unsynced_item1)
end
end
end
describe '.find_failed_registries' do
it 'returns failed items' do
result = described_class.find_failed_registries(batch_size: 10)
describe '.find_failed_registries' do
it 'returns failed items' do
result = described_class.find_failed_registries(batch_size: 10)
expect(result).to include(failed_item1, failed_item2)
end
expect(result).to include(failed_item1, failed_item2)
end
it 'returns failed items except some specific item ID' do
except_id = failed_item1.model_record_id
it 'returns failed items except some specific item ID' do
except_id = failed_item1.model_record_id
result = described_class.find_failed_registries(batch_size: 10, except_ids: [except_id])
result = described_class.find_failed_registries(batch_size: 10, except_ids: [except_id])
expect(result).to include(failed_item2)
expect(result).not_to include(failed_item1)
expect(result).to include(failed_item2)
expect(result).not_to include(failed_item1)
end
end
end
end
......@@ -19,7 +19,7 @@ describe Geo::RegistrySyncWorker, :geo, :use_sql_query_cache_for_tracking_db do
end
it 'does not schedule anything when tracking database is not configured' do
create(:package_file_registry)
create(:geo_package_file_registry)
expect(::Geo::EventWorker).not_to receive(:perform_async)
......@@ -29,7 +29,7 @@ describe Geo::RegistrySyncWorker, :geo, :use_sql_query_cache_for_tracking_db do
end
it 'does not schedule anything when node is disabled' do
create(:package_file_registry)
create(:geo_package_file_registry)
secondary.enabled = false
secondary.save!
......@@ -40,8 +40,8 @@ describe Geo::RegistrySyncWorker, :geo, :use_sql_query_cache_for_tracking_db do
end
it 'does not schedule duplicated jobs' do
package_file_1 = create(:package_file_registry)
package_file_2 = create(:package_file_registry)
package_file_1 = create(:geo_package_file_registry)
package_file_2 = create(:geo_package_file_registry)
stub_const('Geo::Scheduler::SchedulerWorker::DB_RETRIEVE_BATCH_SIZE', 5)
secondary.update!(files_max_capacity: 8)
......@@ -63,8 +63,8 @@ describe Geo::RegistrySyncWorker, :geo, :use_sql_query_cache_for_tracking_db do
end
it 'does not schedule duplicated jobs because of query cache' do
package_file_1 = create(:package_file_registry)
package_file_2 = create(:package_file_registry)
package_file_1 = create(:geo_package_file_registry)
package_file_2 = create(:geo_package_file_registry)
# We retrieve all the items in a single batch
stub_const('Geo::Scheduler::SchedulerWorker::DB_RETRIEVE_BATCH_SIZE', 2)
......@@ -105,7 +105,7 @@ describe Geo::RegistrySyncWorker, :geo, :use_sql_query_cache_for_tracking_db do
allow_any_instance_of(::Gitlab::Geo::Replication::BlobDownloader).to receive(:execute).and_return(result_object)
create_list(:package_file_registry, 10)
create_list(:geo_package_file_registry, 10)
expect(::Geo::EventWorker).to receive(:perform_async).exactly(10).times.and_call_original
# For 10 downloads, we expect four database reloads:
......
......@@ -79,16 +79,19 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
lfs_object = create(:lfs_object)
job_artifact = create(:ci_job_artifact)
upload = create(:upload)
package_file = create(:conan_package_file, :conan_package)
expect(Geo::LfsObjectRegistry.where(lfs_object_id: lfs_object.id).count).to eq(0)
expect(Geo::JobArtifactRegistry.where(artifact_id: job_artifact.id).count).to eq(0)
expect(Geo::UploadRegistry.where(file_id: upload.id).count).to eq(0)
expect(Geo::PackageFileRegistry.where(package_file_id: package_file.id).count).to eq(0)
subject.perform
expect(Geo::LfsObjectRegistry.where(lfs_object_id: lfs_object.id).count).to eq(1)
expect(Geo::JobArtifactRegistry.where(artifact_id: job_artifact.id).count).to eq(1)
expect(Geo::UploadRegistry.where(file_id: upload.id).count).to eq(1)
expect(Geo::PackageFileRegistry.where(package_file_id: package_file.id).count).to eq(1)
end
context 'when geo_job_artifact_registry_ssot_sync is disabled' do
......@@ -105,6 +108,7 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
it 'does not execute RegistryConsistencyService for Job Artifacts' do
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::LfsObjectRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::UploadRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::PackageFileRegistry, batch_size: 1000).and_call_original
expect(Geo::RegistryConsistencyService).not_to receive(:new).with(Geo::JobArtifactRegistry, batch_size: 1000)
......@@ -126,6 +130,7 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
it 'does not execute RegistryConsistencyService for Uploads' do
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::JobArtifactRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::LfsObjectRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::PackageFileRegistry, batch_size: 1000).and_call_original
expect(Geo::RegistryConsistencyService).not_to receive(:new).with(Geo::UploadRegistry, batch_size: 1000)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment