Commit 06a1cd85 authored by Mike Kozono's avatar Mike Kozono

Remove geo_lfs_registry_ssot_sync feature flag

Improve query performance and fix DB timeouts in certain cases, while
finding LFS objects to sync, especially on PostgreSQL 9.6, by using the
registry tables as the single source of truth, instead of doing
cross-database joins e.g. to exclude registry entries that are not
selected for syncing.
parent cb3b14bf
......@@ -14,16 +14,7 @@ class Geo::LfsObjectRegistry < Geo::BaseRegistry
scope :never, -> { where(success: false, retry_count: nil) }
def self.failed
if registry_consistency_worker_enabled?
where(success: false).where.not(retry_count: nil)
else
# Would do `super` except it doesn't work with an included scope
where(success: false)
end
end
def self.registry_consistency_worker_enabled?
Feature.enabled?(:geo_lfs_registry_ssot_sync)
where(success: false).where.not(retry_count: nil)
end
def self.finder_class
......
......@@ -12,13 +12,9 @@ module Geo
end
def find_unsynced_jobs(batch_size:)
if Geo::LfsObjectRegistry.registry_consistency_worker_enabled?
convert_registry_relation_to_job_args(
registry_finder.find_never_synced_registries(find_batch_params(batch_size))
)
else
super
end
convert_registry_relation_to_job_args(
registry_finder.find_never_synced_registries(find_batch_params(batch_size))
)
end
end
end
......
......@@ -48,7 +48,12 @@ module Geo
end
def registry_classes
@registry_classes ||= REGISTRY_CLASSES.select(&:registry_consistency_worker_enabled?)
@registry_classes ||= REGISTRY_CLASSES.select do |registry_class|
# Defaults on. This check gives registry classes the opportunity to
# disable this worker, e.g. with a feature flag.
!registry_class.respond_to?(:registry_consistency_worker_enabled?) ||
registry_class.registry_consistency_worker_enabled?
end
end
end
end
......
---
title: 'Geo: Make the LFS registry the SSOT to optimize query performance'
merge_request: 27154
author:
type: performance
......@@ -313,122 +313,48 @@ describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw, :use_sql_query_cache_f
stub_lfs_object_storage
end
context 'with geo_lfs_registry_ssot_sync feature enabled' do
context 'with files missing on the primary' do
let!(:lfs_object_file_missing_on_primary) { create(:lfs_object, :with_file) }
context 'with files missing on the primary' do
let!(:lfs_object_file_missing_on_primary) { create(:lfs_object, :with_file) }
context 'with lfs_object_registry entries' do
before do
create(:geo_lfs_object_registry, :never_synced, lfs_object: lfs_object_local_store)
create(:geo_lfs_object_registry, :failed, lfs_object: lfs_object_remote_store)
Geo::LfsObjectRegistry.create!(lfs_object_id: lfs_object_file_missing_on_primary.id, bytes: 1234, success: true, missing_on_primary: true)
end
it 'enqueues file downloads if there is spare capacity' do
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_local_store.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_file_missing_on_primary.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_remote_store.id)
subject.perform
end
it 'does not retry those files if there is no spare capacity' do
expect(subject).to receive(:db_retrieve_batch_size).and_return(1).twice
expect(Geo::FileDownloadWorker).to receive(:perform_async).once
subject.perform
end
it 'does not retry those files if they are already scheduled' do
scheduled_jobs = [{ type: 'lfs', id: lfs_object_file_missing_on_primary.id, job_id: 'foo' }]
expect(subject).to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(1)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_local_store.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_remote_store.id)
subject.perform
end
context 'with lfs_object_registry entries' do
before do
create(:geo_lfs_object_registry, :never_synced, lfs_object: lfs_object_local_store)
create(:geo_lfs_object_registry, :failed, lfs_object: lfs_object_remote_store)
Geo::LfsObjectRegistry.create!(lfs_object_id: lfs_object_file_missing_on_primary.id, bytes: 1234, success: true, missing_on_primary: true)
end
context 'with no lfs_object_registry entries' do
it 'does not enqueue file downloads' do
expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
it 'enqueues file downloads if there is spare capacity' do
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_local_store.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_file_missing_on_primary.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_remote_store.id)
subject.perform
end
subject.perform
end
end
end
context 'with geo_lfs_registry_ssot_sync feature disabled' do
before do
stub_feature_flags(geo_lfs_registry_ssot_sync: false)
end
context 'with files missing on the primary' do
let!(:lfs_object_file_missing_on_primary) { create(:lfs_object, :with_file) }
context 'with lfs_object_registry entries' do
before do
create(:geo_lfs_object_registry, :never_synced, lfs_object: lfs_object_local_store)
create(:geo_lfs_object_registry, :failed, lfs_object: lfs_object_remote_store)
Geo::LfsObjectRegistry.create!(lfs_object_id: lfs_object_file_missing_on_primary.id, bytes: 1234, success: true, missing_on_primary: true)
end
it 'enqueues file downloads if there is spare capacity' do
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_local_store.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_file_missing_on_primary.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_remote_store.id)
subject.perform
end
it 'does not enqueue file downloads if there is no spare capacity' do
expect(subject).to receive(:db_retrieve_batch_size).and_return(1).twice
expect(Geo::FileDownloadWorker).to receive(:perform_async).once
subject.perform
end
it 'does not enqueue file downloads if they are already scheduled' do
scheduled_jobs = [{ type: 'lfs', id: lfs_object_file_missing_on_primary.id, job_id: 'foo' }]
expect(subject).to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(1)
it 'does not retry those files if there is no spare capacity' do
expect(subject).to receive(:db_retrieve_batch_size).and_return(1).twice
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_local_store.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_remote_store.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).once
subject.perform
end
subject.perform
end
context 'with no lfs_object_registry entries' do
it 'enqueues file downloads if there is spare capacity' do
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_local_store.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_file_missing_on_primary.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_remote_store.id)
subject.perform
end
it 'does not enqueue file downloads if there is no spare capacity' do
expect(subject).to receive(:db_retrieve_batch_size).and_return(1).twice
expect(Geo::FileDownloadWorker).to receive(:perform_async).once
it 'does not retry those files if they are already scheduled' do
scheduled_jobs = [{ type: 'lfs', id: lfs_object_file_missing_on_primary.id, job_id: 'foo' }]
expect(subject).to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(1)
subject.perform
end
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_local_store.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_remote_store.id)
it 'does not enqueue file downloads if they are already scheduled' do
scheduled_jobs = [{ type: 'lfs', id: lfs_object_file_missing_on_primary.id, job_id: 'foo' }]
expect(subject).to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(1)
subject.perform
end
end
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_local_store.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_remote_store.id)
context 'with no lfs_object_registry entries' do
it 'does not enqueue file downloads' do
expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
subject.perform
end
subject.perform
end
end
end
......@@ -711,22 +637,6 @@ describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw, :use_sql_query_cache_f
allow(::GeoNode).to receive(:current_node).and_return(secondary)
end
context 'when geo_lfs_registry_ssot_sync feature is disabled' do
before do
stub_feature_flags(geo_lfs_registry_ssot_sync: false)
end
it 'does not perform Geo::FileDownloadWorker for LFS object that does not belong to selected namespaces to replicate' do
lfs_object_in_synced_group = create(:lfs_objects_project, project: project_in_synced_group)
create(:lfs_objects_project, project: unsynced_project)
expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with('lfs', lfs_object_in_synced_group.lfs_object_id).once.and_return(spy)
subject.perform
end
end
context 'when geo_job_artifact_registry_ssot_sync feature is disabled' do
before do
stub_feature_flags(geo_job_artifact_registry_ssot_sync: false)
......
......@@ -91,27 +91,6 @@ describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
expect(Geo::UploadRegistry.where(file_id: upload.id).count).to eq(1)
end
context 'when geo_lfs_registry_ssot_sync is disabled' do
let_it_be(:lfs_object) { create(:lfs_object) }
before do
stub_feature_flags(geo_lfs_registry_ssot_sync: false)
end
it 'returns false' do
expect(subject.perform).to be_falsey
end
it 'does not execute RegistryConsistencyService for LFS objects' do
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::JobArtifactRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::UploadRegistry, batch_size: 1000).and_call_original
expect(Geo::RegistryConsistencyService).not_to receive(:new).with(Geo::LfsObjectRegistry, batch_size: 1000)
subject.perform
end
end
context 'when geo_job_artifact_registry_ssot_sync is disabled' do
let_it_be(:job_artifact) { create(:ci_job_artifact) }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment