Commit 0f99117d authored by Douglas Barbosa Alexandre's avatar Douglas Barbosa Alexandre

Merge branch 'mk/fix-geo-db-timeouts-finding-lfs-on-pg-9' into 'master'

Geo: Fix tracking DB timeouts while finding LFS objects to sync, especially on PostgreSQL 9.6

See merge request gitlab-org/gitlab!27154
parents 71fe9d07 06a1cd85
...@@ -14,16 +14,7 @@ class Geo::LfsObjectRegistry < Geo::BaseRegistry ...@@ -14,16 +14,7 @@ class Geo::LfsObjectRegistry < Geo::BaseRegistry
scope :never, -> { where(success: false, retry_count: nil) } scope :never, -> { where(success: false, retry_count: nil) }
def self.failed def self.failed
if registry_consistency_worker_enabled? where(success: false).where.not(retry_count: nil)
where(success: false).where.not(retry_count: nil)
else
# Would do `super` except it doesn't work with an included scope
where(success: false)
end
end
def self.registry_consistency_worker_enabled?
Feature.enabled?(:geo_lfs_registry_ssot_sync)
end end
def self.finder_class def self.finder_class
......
...@@ -12,13 +12,9 @@ module Geo ...@@ -12,13 +12,9 @@ module Geo
end end
def find_unsynced_jobs(batch_size:) def find_unsynced_jobs(batch_size:)
if Geo::LfsObjectRegistry.registry_consistency_worker_enabled? convert_registry_relation_to_job_args(
convert_registry_relation_to_job_args( registry_finder.find_never_synced_registries(find_batch_params(batch_size))
registry_finder.find_never_synced_registries(find_batch_params(batch_size)) )
)
else
super
end
end end
end end
end end
......
...@@ -48,7 +48,12 @@ module Geo ...@@ -48,7 +48,12 @@ module Geo
end end
def registry_classes def registry_classes
@registry_classes ||= REGISTRY_CLASSES.select(&:registry_consistency_worker_enabled?) @registry_classes ||= REGISTRY_CLASSES.select do |registry_class|
# Defaults on. This check gives registry classes the opportunity to
# disable this worker, e.g. with a feature flag.
!registry_class.respond_to?(:registry_consistency_worker_enabled?) ||
registry_class.registry_consistency_worker_enabled?
end
end end
end end
end end
......
---
title: 'Geo: Make the LFS registry the SSOT to optimize query performance'
merge_request: 27154
author:
type: performance
...@@ -313,122 +313,48 @@ describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw, :use_sql_query_cache_f ...@@ -313,122 +313,48 @@ describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw, :use_sql_query_cache_f
stub_lfs_object_storage stub_lfs_object_storage
end end
context 'with geo_lfs_registry_ssot_sync feature enabled' do context 'with files missing on the primary' do
context 'with files missing on the primary' do let!(:lfs_object_file_missing_on_primary) { create(:lfs_object, :with_file) }
let!(:lfs_object_file_missing_on_primary) { create(:lfs_object, :with_file) }
context 'with lfs_object_registry entries' do context 'with lfs_object_registry entries' do
before do before do
create(:geo_lfs_object_registry, :never_synced, lfs_object: lfs_object_local_store) create(:geo_lfs_object_registry, :never_synced, lfs_object: lfs_object_local_store)
create(:geo_lfs_object_registry, :failed, lfs_object: lfs_object_remote_store) create(:geo_lfs_object_registry, :failed, lfs_object: lfs_object_remote_store)
Geo::LfsObjectRegistry.create!(lfs_object_id: lfs_object_file_missing_on_primary.id, bytes: 1234, success: true, missing_on_primary: true) Geo::LfsObjectRegistry.create!(lfs_object_id: lfs_object_file_missing_on_primary.id, bytes: 1234, success: true, missing_on_primary: true)
end
it 'enqueues file downloads if there is spare capacity' do
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_local_store.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_file_missing_on_primary.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_remote_store.id)
subject.perform
end
it 'does not retry those files if there is no spare capacity' do
expect(subject).to receive(:db_retrieve_batch_size).and_return(1).twice
expect(Geo::FileDownloadWorker).to receive(:perform_async).once
subject.perform
end
it 'does not retry those files if they are already scheduled' do
scheduled_jobs = [{ type: 'lfs', id: lfs_object_file_missing_on_primary.id, job_id: 'foo' }]
expect(subject).to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(1)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_local_store.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_remote_store.id)
subject.perform
end
end end
context 'with no lfs_object_registry entries' do it 'enqueues file downloads if there is spare capacity' do
it 'does not enqueue file downloads' do expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_local_store.id)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async) expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_file_missing_on_primary.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_remote_store.id)
subject.perform subject.perform
end
end end
end
end
context 'with geo_lfs_registry_ssot_sync feature disabled' do
before do
stub_feature_flags(geo_lfs_registry_ssot_sync: false)
end
context 'with files missing on the primary' do
let!(:lfs_object_file_missing_on_primary) { create(:lfs_object, :with_file) }
context 'with lfs_object_registry entries' do
before do
create(:geo_lfs_object_registry, :never_synced, lfs_object: lfs_object_local_store)
create(:geo_lfs_object_registry, :failed, lfs_object: lfs_object_remote_store)
Geo::LfsObjectRegistry.create!(lfs_object_id: lfs_object_file_missing_on_primary.id, bytes: 1234, success: true, missing_on_primary: true)
end
it 'enqueues file downloads if there is spare capacity' do
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_local_store.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_file_missing_on_primary.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_remote_store.id)
subject.perform
end
it 'does not enqueue file downloads if there is no spare capacity' do
expect(subject).to receive(:db_retrieve_batch_size).and_return(1).twice
expect(Geo::FileDownloadWorker).to receive(:perform_async).once
subject.perform
end
it 'does not enqueue file downloads if they are already scheduled' do it 'does not retry those files if there is no spare capacity' do
scheduled_jobs = [{ type: 'lfs', id: lfs_object_file_missing_on_primary.id, job_id: 'foo' }] expect(subject).to receive(:db_retrieve_batch_size).and_return(1).twice
expect(subject).to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(1)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_local_store.id) expect(Geo::FileDownloadWorker).to receive(:perform_async).once
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_remote_store.id)
subject.perform subject.perform
end
end end
context 'with no lfs_object_registry entries' do it 'does not retry those files if they are already scheduled' do
it 'enqueues file downloads if there is spare capacity' do scheduled_jobs = [{ type: 'lfs', id: lfs_object_file_missing_on_primary.id, job_id: 'foo' }]
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_local_store.id) expect(subject).to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(1)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_file_missing_on_primary.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_remote_store.id)
subject.perform
end
it 'does not enqueue file downloads if there is no spare capacity' do
expect(subject).to receive(:db_retrieve_batch_size).and_return(1).twice
expect(Geo::FileDownloadWorker).to receive(:perform_async).once
subject.perform expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_local_store.id)
end expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_remote_store.id)
it 'does not enqueue file downloads if they are already scheduled' do subject.perform
scheduled_jobs = [{ type: 'lfs', id: lfs_object_file_missing_on_primary.id, job_id: 'foo' }] end
expect(subject).to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(1) end
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_local_store.id) context 'with no lfs_object_registry entries' do
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_remote_store.id) it 'does not enqueue file downloads' do
expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
subject.perform subject.perform
end
end end
end end
end end
...@@ -711,22 +637,6 @@ describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw, :use_sql_query_cache_f ...@@ -711,22 +637,6 @@ describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw, :use_sql_query_cache_f
allow(::GeoNode).to receive(:current_node).and_return(secondary) allow(::GeoNode).to receive(:current_node).and_return(secondary)
end end
context 'when geo_lfs_registry_ssot_sync feature is disabled' do
before do
stub_feature_flags(geo_lfs_registry_ssot_sync: false)
end
it 'does not perform Geo::FileDownloadWorker for LFS object that does not belong to selected namespaces to replicate' do
lfs_object_in_synced_group = create(:lfs_objects_project, project: project_in_synced_group)
create(:lfs_objects_project, project: unsynced_project)
expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with('lfs', lfs_object_in_synced_group.lfs_object_id).once.and_return(spy)
subject.perform
end
end
context 'when geo_job_artifact_registry_ssot_sync feature is disabled' do context 'when geo_job_artifact_registry_ssot_sync feature is disabled' do
before do before do
stub_feature_flags(geo_job_artifact_registry_ssot_sync: false) stub_feature_flags(geo_job_artifact_registry_ssot_sync: false)
......
...@@ -91,27 +91,6 @@ describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do ...@@ -91,27 +91,6 @@ describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
expect(Geo::UploadRegistry.where(file_id: upload.id).count).to eq(1) expect(Geo::UploadRegistry.where(file_id: upload.id).count).to eq(1)
end end
context 'when geo_lfs_registry_ssot_sync is disabled' do
let_it_be(:lfs_object) { create(:lfs_object) }
before do
stub_feature_flags(geo_lfs_registry_ssot_sync: false)
end
it 'returns false' do
expect(subject.perform).to be_falsey
end
it 'does not execute RegistryConsistencyService for LFS objects' do
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::JobArtifactRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::UploadRegistry, batch_size: 1000).and_call_original
expect(Geo::RegistryConsistencyService).not_to receive(:new).with(Geo::LfsObjectRegistry, batch_size: 1000)
subject.perform
end
end
context 'when geo_job_artifact_registry_ssot_sync is disabled' do context 'when geo_job_artifact_registry_ssot_sync is disabled' do
let_it_be(:job_artifact) { create(:ci_job_artifact) } let_it_be(:job_artifact) { create(:ci_job_artifact) }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment