Commit b4d2e1f3 authored by Toon Claes's avatar Toon Claes Committed by Nick Thomas

Geo: Ignore remote stored objects when calculating counts

parent ddf6889a
...@@ -12,6 +12,7 @@ class Upload < ActiveRecord::Base ...@@ -12,6 +12,7 @@ class Upload < ActiveRecord::Base
validates :uploader, presence: true validates :uploader, presence: true
scope :with_files_stored_locally, -> { where(store: [nil, ObjectStorage::Store::LOCAL]) } scope :with_files_stored_locally, -> { where(store: [nil, ObjectStorage::Store::LOCAL]) }
scope :with_files_stored_remotely, -> { where(store: ObjectStorage::Store::REMOTE) }
before_save :calculate_checksum!, if: :foreground_checksummable? before_save :calculate_checksum!, if: :foreground_checksummable?
after_commit :schedule_checksum, if: :checksummable? after_commit :schedule_checksum, if: :checksummable?
......
module Geo module Geo
class AttachmentRegistryFinder < FileRegistryFinder class AttachmentRegistryFinder < FileRegistryFinder
def attachments def attachments
relation = if selective_sync?
if selective_sync? Upload.where(group_uploads.or(project_uploads).or(other_uploads))
Upload.where(group_uploads.or(project_uploads).or(other_uploads)) else
else Upload.all
Upload.all end
end end
relation.with_files_stored_locally def local_attachments
attachments.with_files_stored_locally
end end
def count_attachments def count_local_attachments
attachments.count local_attachments.count
end end
def count_synced_attachments def count_synced_attachments
...@@ -49,20 +50,20 @@ module Geo ...@@ -49,20 +50,20 @@ module Geo
# Find limited amount of non replicated attachments. # Find limited amount of non replicated attachments.
# #
# You can pass a list with `except_registry_ids:` so you can exclude items you # You can pass a list with `except_file_ids:` so you can exclude items you
# already scheduled but haven't finished and persisted to the database yet # already scheduled but haven't finished and aren't persisted to the database yet
# #
# TODO: Alternative here is to use some sort of window function with a cursor instead # TODO: Alternative here is to use some sort of window function with a cursor instead
# of simply limiting the query and passing a list of items we don't want # of simply limiting the query and passing a list of items we don't want
# #
# @param [Integer] batch_size used to limit the results returned # @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_registry_ids ids that will be ignored from the query # @param [Array<Integer>] except_file_ids ids that will be ignored from the query
def find_unsynced_attachments(batch_size:, except_registry_ids: []) def find_unsynced_attachments(batch_size:, except_file_ids: [])
relation = relation =
if use_legacy_queries? if use_legacy_queries?
legacy_find_unsynced_attachments(except_registry_ids: except_registry_ids) legacy_find_unsynced_attachments(except_file_ids: except_file_ids)
else else
fdw_find_unsynced_attachments(except_registry_ids: except_registry_ids) fdw_find_unsynced_attachments(except_file_ids: except_file_ids)
end end
relation.limit(batch_size) relation.limit(batch_size)
...@@ -106,31 +107,40 @@ module Geo ...@@ -106,31 +107,40 @@ module Geo
# #
def fdw_find_synced_attachments def fdw_find_synced_attachments
fdw_find_attachments.merge(Geo::FileRegistry.synced) fdw_find_local_attachments.merge(Geo::FileRegistry.synced)
end end
def fdw_find_failed_attachments def fdw_find_failed_attachments
fdw_find_attachments.merge(Geo::FileRegistry.failed) fdw_find_local_attachments.merge(Geo::FileRegistry.failed)
end end
def fdw_find_attachments def fdw_find_local_attachments
fdw_table = Geo::Fdw::Upload.table_name fdw_attachments.joins("INNER JOIN file_registry ON file_registry.file_id = #{fdw_attachments_table}.id")
Geo::Fdw::Upload.joins("INNER JOIN file_registry ON file_registry.file_id = #{fdw_table}.id")
.with_files_stored_locally .with_files_stored_locally
.merge(Geo::FileRegistry.attachments) .merge(Geo::FileRegistry.attachments)
end end
def fdw_find_unsynced_attachments(except_registry_ids:) def fdw_find_unsynced_attachments(except_file_ids:)
fdw_table = Geo::Fdw::Upload.table_name
upload_types = Geo::FileService::DEFAULT_OBJECT_TYPES.map { |val| "'#{val}'" }.join(',') upload_types = Geo::FileService::DEFAULT_OBJECT_TYPES.map { |val| "'#{val}'" }.join(',')
Geo::Fdw::Upload.joins("LEFT OUTER JOIN file_registry fdw_attachments.joins("LEFT OUTER JOIN file_registry
ON file_registry.file_id = #{fdw_table}.id ON file_registry.file_id = #{fdw_attachments_table}.id
AND file_registry.file_type IN (#{upload_types})") AND file_registry.file_type IN (#{upload_types})")
.with_files_stored_locally .with_files_stored_locally
.where(file_registry: { id: nil }) .where(file_registry: { id: nil })
.where.not(id: except_registry_ids) .where.not(id: except_file_ids)
end
def fdw_attachments
if selective_sync?
Geo::Fdw::Upload.where(group_uploads.or(project_uploads).or(other_uploads))
else
Geo::Fdw::Upload.all
end
end
def fdw_attachments_table
Geo::Fdw::Upload.table_name
end end
# #
...@@ -139,7 +149,7 @@ module Geo ...@@ -139,7 +149,7 @@ module Geo
def legacy_find_synced_attachments def legacy_find_synced_attachments
legacy_inner_join_registry_ids( legacy_inner_join_registry_ids(
attachments, local_attachments,
Geo::FileRegistry.attachments.synced.pluck(:file_id), Geo::FileRegistry.attachments.synced.pluck(:file_id),
Upload Upload
) )
...@@ -147,18 +157,18 @@ module Geo ...@@ -147,18 +157,18 @@ module Geo
def legacy_find_failed_attachments def legacy_find_failed_attachments
legacy_inner_join_registry_ids( legacy_inner_join_registry_ids(
attachments, local_attachments,
Geo::FileRegistry.attachments.failed.pluck(:file_id), Geo::FileRegistry.attachments.failed.pluck(:file_id),
Upload Upload
) )
end end
def legacy_find_unsynced_attachments(except_registry_ids:) def legacy_find_unsynced_attachments(except_file_ids:)
registry_ids = legacy_pluck_registry_ids(file_types: Geo::FileService::DEFAULT_OBJECT_TYPES, except_registry_ids: except_registry_ids) registry_file_ids = legacy_pluck_registry_file_ids(file_types: Geo::FileService::DEFAULT_OBJECT_TYPES) | except_file_ids
legacy_left_outer_join_registry_ids( legacy_left_outer_join_registry_ids(
attachments, local_attachments,
registry_ids, registry_file_ids,
Upload Upload
) )
end end
......
...@@ -6,9 +6,8 @@ module Geo ...@@ -6,9 +6,8 @@ module Geo
protected protected
def legacy_pluck_registry_ids(file_types:, except_registry_ids:) def legacy_pluck_registry_file_ids(file_types:)
ids = Geo::FileRegistry.where(file_type: file_types).pluck(:file_id) Geo::FileRegistry.where(file_type: file_types).pluck(:file_id)
(ids + except_registry_ids).uniq
end end
end end
end end
module Geo module Geo
class JobArtifactRegistryFinder < FileRegistryFinder class JobArtifactRegistryFinder < FileRegistryFinder
def count_job_artifacts def count_job_artifacts
job_artifacts.count local_job_artifacts.count
end end
def count_synced_job_artifacts def count_synced_job_artifacts
relation = if aggregate_pushdown_supported?
if selective_sync? find_synced_job_artifacts.count
legacy_find_synced_job_artifacts else
else legacy_find_synced_job_artifacts.count
find_synced_job_artifacts_registries end
end
relation.count
end end
def count_failed_job_artifacts def count_failed_job_artifacts
relation = if aggregate_pushdown_supported?
if selective_sync? find_failed_job_artifacts.count
legacy_find_failed_job_artifacts else
else legacy_find_failed_job_artifacts.count
find_failed_job_artifacts_registries end
end
relation.count
end end
# Find limited amount of non replicated lfs objects. # Find limited amount of non replicated lfs objects.
# #
# You can pass a list with `except_registry_ids:` so you can exclude items you # You can pass a list with `except_file_ids:` so you can exclude items you
# already scheduled but haven't finished and persisted to the database yet # already scheduled but haven't finished and aren't persisted to the database yet
# #
# TODO: Alternative here is to use some sort of window function with a cursor instead # TODO: Alternative here is to use some sort of window function with a cursor instead
# of simply limiting the query and passing a list of items we don't want # of simply limiting the query and passing a list of items we don't want
# #
# @param [Integer] batch_size used to limit the results returned # @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_registry_ids ids that will be ignored from the query # @param [Array<Integer>] except_file_ids ids that will be ignored from the query
def find_unsynced_job_artifacts(batch_size:, except_registry_ids: []) def find_unsynced_job_artifacts(batch_size:, except_file_ids: [])
relation = relation =
if use_legacy_queries? if use_legacy_queries?
legacy_find_unsynced_job_artifacts(except_registry_ids: except_registry_ids) legacy_find_unsynced_job_artifacts(except_file_ids: except_file_ids)
else else
fdw_find_unsynced_job_artifacts(except_registry_ids: except_registry_ids) fdw_find_unsynced_job_artifacts(except_file_ids: except_file_ids)
end end
relation.limit(batch_size) relation.limit(batch_size)
end end
def job_artifacts def job_artifacts
relation = if selective_sync?
if selective_sync? Ci::JobArtifact.joins(:project).where(projects: { id: current_node.projects })
Ci::JobArtifact.joins(:project).where(projects: { id: current_node.projects }) else
else Ci::JobArtifact.all
Ci::JobArtifact.all end
end end
relation.with_files_stored_locally def local_job_artifacts
job_artifacts.with_files_stored_locally
end end
private private
def find_synced_job_artifacts_registries def find_synced_job_artifacts
Geo::FileRegistry.job_artifacts.synced if use_legacy_queries?
legacy_find_synced_job_artifacts
else
fdw_find_job_artifacts.merge(Geo::FileRegistry.synced)
end
end end
def find_failed_job_artifacts_registries def find_failed_job_artifacts
Geo::FileRegistry.job_artifacts.failed if use_legacy_queries?
legacy_find_failed_job_artifacts
else
fdw_find_job_artifacts.merge(Geo::FileRegistry.failed)
end
end end
# #
# FDW accessors # FDW accessors
# #
def fdw_find_unsynced_job_artifacts(except_registry_ids:) def fdw_find_job_artifacts
fdw_table = Geo::Fdw::Ci::JobArtifact.table_name fdw_job_artifacts.joins("INNER JOIN file_registry ON file_registry.file_id = #{fdw_jobs_artifacts_table}.id")
.with_files_stored_locally
.merge(Geo::FileRegistry.job_artifacts)
end
Geo::Fdw::Ci::JobArtifact.joins("LEFT OUTER JOIN file_registry def fdw_find_unsynced_job_artifacts(except_file_ids:)
ON file_registry.file_id = #{fdw_table}.id fdw_job_artifacts.joins("LEFT OUTER JOIN file_registry
AND file_registry.file_type = 'job_artifact'") ON file_registry.file_id = #{fdw_job_artifacts_table}.id
AND file_registry.file_type = 'job_artifact'")
.with_files_stored_locally .with_files_stored_locally
.where(file_registry: { id: nil }) .where(file_registry: { id: nil })
.where.not(id: except_registry_ids) .where.not(id: except_file_ids)
end
def fdw_job_artifacts
if selective_sync?
Geo::Fdw::Ci::JobArtifact.joins(:project).where(projects: { id: current_node.projects })
else
Geo::Fdw::Ci::JobArtifact.all
end
end
def fdw_job_artifacts_table
Geo::Fdw::Ci::JobArtifact.table_name
end end
# #
...@@ -89,26 +108,26 @@ module Geo ...@@ -89,26 +108,26 @@ module Geo
def legacy_find_synced_job_artifacts def legacy_find_synced_job_artifacts
legacy_inner_join_registry_ids( legacy_inner_join_registry_ids(
job_artifacts, local_job_artifacts,
find_synced_job_artifacts_registries.pluck(:file_id), Geo::FileRegistry.job_artifacts.synced.pluck(:file_id),
Ci::JobArtifact Ci::JobArtifact
) )
end end
def legacy_find_failed_job_artifacts def legacy_find_failed_job_artifacts
legacy_inner_join_registry_ids( legacy_inner_join_registry_ids(
job_artifacts, local_job_artifacts,
find_failed_job_artifacts_registries.pluck(:file_id), Geo::FileRegistry.job_artifacts.failed.pluck(:file_id),
Ci::JobArtifact Ci::JobArtifact
) )
end end
def legacy_find_unsynced_job_artifacts(except_registry_ids:) def legacy_find_unsynced_job_artifacts(except_file_ids:)
registry_ids = legacy_pluck_registry_ids(file_types: :job_artifact, except_registry_ids: except_registry_ids) registry_file_ids = legacy_pluck_registry_file_ids(file_types: :job_artifact) | except_file_ids
legacy_left_outer_join_registry_ids( legacy_left_outer_join_registry_ids(
job_artifacts, local_job_artifacts,
registry_ids, registry_file_ids,
Ci::JobArtifact Ci::JobArtifact
) )
end end
......
module Geo module Geo
class LfsObjectRegistryFinder < FileRegistryFinder class LfsObjectRegistryFinder < FileRegistryFinder
def count_lfs_objects def count_lfs_objects
lfs_objects.count local_lfs_objects.count
end end
def count_synced_lfs_objects def count_synced_lfs_objects
relation = if aggregate_pushdown_supported?
if selective_sync? find_synced_lfs_objects.count
legacy_find_synced_lfs_objects else
else legacy_find_synced_lfs_objects.count
find_synced_lfs_objects_registries end
end
relation.count
end end
def count_failed_lfs_objects def count_failed_lfs_objects
relation = if aggregate_pushdown_supported?
if selective_sync? find_failed_lfs_objects.count
legacy_find_failed_lfs_objects else
else legacy_find_failed_lfs_objects.count
find_failed_lfs_objects_registries end
end
relation.count
end end
# Find limited amount of non replicated lfs objects. # Find limited amount of non replicated lfs objects.
# #
# You can pass a list with `except_registry_ids:` so you can exclude items you # You can pass a list with `except_file_ids:` so you can exclude items you
# already scheduled but haven't finished and persisted to the database yet # already scheduled but haven't finished and aren't persisted to the database yet
# #
# TODO: Alternative here is to use some sort of window function with a cursor instead # TODO: Alternative here is to use some sort of window function with a cursor instead
# of simply limiting the query and passing a list of items we don't want # of simply limiting the query and passing a list of items we don't want
# #
# @param [Integer] batch_size used to limit the results returned # @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_registry_ids ids that will be ignored from the query # @param [Array<Integer>] except_file_ids ids that will be ignored from the query
def find_unsynced_lfs_objects(batch_size:, except_registry_ids: []) def find_unsynced_lfs_objects(batch_size:, except_file_ids: [])
relation = relation =
if use_legacy_queries? if use_legacy_queries?
legacy_find_unsynced_lfs_objects(except_registry_ids: except_registry_ids) legacy_find_unsynced_lfs_objects(except_file_ids: except_file_ids)
else else
fdw_find_unsynced_lfs_objects(except_registry_ids: except_registry_ids) fdw_find_unsynced_lfs_objects(except_file_ids: except_file_ids)
end end
relation.limit(batch_size) relation.limit(batch_size)
end end
def lfs_objects def lfs_objects
relation = if selective_sync?
if selective_sync? LfsObject.joins(:projects).where(projects: { id: current_node.projects })
LfsObject.joins(:projects).where(projects: { id: current_node.projects }) else
else LfsObject.all
LfsObject.all end
end end
relation.with_files_stored_locally def local_lfs_objects
lfs_objects.with_files_stored_locally
end end
private private
def find_synced_lfs_objects_registries def find_synced_lfs_objects
Geo::FileRegistry.lfs_objects.synced if use_legacy_queries?
legacy_find_synced_lfs_objects
else
fdw_find_lfs_objects.merge(Geo::FileRegistry.synced)
end
end end
def find_failed_lfs_objects_registries def find_failed_lfs_objects
Geo::FileRegistry.lfs_objects.failed if use_legacy_queries?
legacy_find_failed_lfs_objects
else
fdw_find_lfs_objects.merge(Geo::FileRegistry.failed)
end
end end
# #
# FDW accessors # FDW accessors
# #
def fdw_find_unsynced_lfs_objects(except_registry_ids:) def fdw_find_lfs_objects
fdw_table = Geo::Fdw::LfsObject.table_name fdw_lfs_objects.joins("INNER JOIN file_registry ON file_registry.file_id = #{fdw_lfs_objects_table}.id")
.with_files_stored_locally
.merge(Geo::FileRegistry.lfs_objects)
end
# Filter out objects in object storage (this is done in GeoNode#lfs_objects) def fdw_find_unsynced_lfs_objects(except_file_ids:)
Geo::Fdw::LfsObject.joins("LEFT OUTER JOIN file_registry fdw_lfs_objects.joins("LEFT OUTER JOIN file_registry
ON file_registry.file_id = #{fdw_table}.id ON file_registry.file_id = #{fdw_lfs_objects_table}.id
AND file_registry.file_type = 'lfs'") AND file_registry.file_type = 'lfs'")
.with_files_stored_locally .with_files_stored_locally
.where(file_registry: { id: nil }) .where(file_registry: { id: nil })
.where.not(id: except_registry_ids) .where.not(id: except_file_ids)
end
def fdw_lfs_objects
if selective_sync?
Geo::Fdw::LfsObject.joins(:project).where(projects: { id: current_node.projects })
else
Geo::Fdw::LfsObject.all
end
end
def fdw_lfs_objects_table
Geo::Fdw::LfsObject.table_name
end end
# #
...@@ -90,26 +108,26 @@ module Geo ...@@ -90,26 +108,26 @@ module Geo
def legacy_find_synced_lfs_objects def legacy_find_synced_lfs_objects
legacy_inner_join_registry_ids( legacy_inner_join_registry_ids(
lfs_objects, local_lfs_objects,
find_synced_lfs_objects_registries.pluck(:file_id), Geo::FileRegistry.lfs_objects.synced.pluck(:file_id),
LfsObject LfsObject
) )
end end
def legacy_find_failed_lfs_objects def legacy_find_failed_lfs_objects
legacy_inner_join_registry_ids( legacy_inner_join_registry_ids(
lfs_objects, local_lfs_objects,
find_failed_lfs_objects_registries.pluck(:file_id), Geo::FileRegistry.lfs_objects.failed.pluck(:file_id),
LfsObject LfsObject
) )
end end
def legacy_find_unsynced_lfs_objects(except_registry_ids:) def legacy_find_unsynced_lfs_objects(except_file_ids:)
registry_ids = legacy_pluck_registry_ids(file_types: :lfs, except_registry_ids: except_registry_ids) registry_file_ids = legacy_pluck_registry_file_ids(file_types: :lfs) | except_file_ids
legacy_left_outer_join_registry_ids( legacy_left_outer_join_registry_ids(
lfs_objects, local_lfs_objects,
registry_ids, registry_file_ids,
LfsObject LfsObject
) )
end end
......
...@@ -12,6 +12,7 @@ module EE ...@@ -12,6 +12,7 @@ module EE
after_destroy :log_geo_event after_destroy :log_geo_event
scope :with_files_stored_locally, -> { where(file_store: [nil, LfsObjectUploader::Store::LOCAL]) } scope :with_files_stored_locally, -> { where(file_store: [nil, LfsObjectUploader::Store::LOCAL]) }
scope :with_files_stored_remotely, -> { where(file_store: ObjectStorage::Store::REMOTE) }
end end
def local_store? def local_store?
......
...@@ -4,6 +4,7 @@ module Geo ...@@ -4,6 +4,7 @@ module Geo
self.table_name = Gitlab::Geo::Fdw.table('lfs_objects') self.table_name = Gitlab::Geo::Fdw.table('lfs_objects')
scope :with_files_stored_locally, -> { where(file_store: [nil, LfsObjectUploader::Store::LOCAL]) } scope :with_files_stored_locally, -> { where(file_store: [nil, LfsObjectUploader::Store::LOCAL]) }
scope :with_files_stored_remotely, -> { where(file_store: LfsObjectUploader::Store::REMOTE) }
end end
end end
end end
...@@ -4,6 +4,7 @@ module Geo ...@@ -4,6 +4,7 @@ module Geo
self.table_name = Gitlab::Geo::Fdw.table('uploads') self.table_name = Gitlab::Geo::Fdw.table('uploads')
scope :with_files_stored_locally, -> { where(store: [nil, ObjectStorage::Store::LOCAL]) } scope :with_files_stored_locally, -> { where(store: [nil, ObjectStorage::Store::LOCAL]) }
scope :with_files_stored_remotely, -> { where(store: ObjectStorage::Store::REMOTE) }
end end
end end
end end
...@@ -5,4 +5,5 @@ class Geo::FileRegistry < Geo::BaseRegistry ...@@ -5,4 +5,5 @@ class Geo::FileRegistry < Geo::BaseRegistry
scope :lfs_objects, -> { where(file_type: :lfs) } scope :lfs_objects, -> { where(file_type: :lfs) }
scope :job_artifacts, -> { where(file_type: :job_artifact) } scope :job_artifacts, -> { where(file_type: :job_artifact) }
scope :attachments, -> { where(file_type: Geo::FileService::DEFAULT_OBJECT_TYPES) } scope :attachments, -> { where(file_type: Geo::FileService::DEFAULT_OBJECT_TYPES) }
scope :stored_locally, -> { where(store: [nil, ObjectStorage::Store::LOCAL]) }
end end
...@@ -105,7 +105,7 @@ class GeoNodeStatus < ActiveRecord::Base ...@@ -105,7 +105,7 @@ class GeoNodeStatus < ActiveRecord::Base
self.wikis_count = projects_finder.count_wikis self.wikis_count = projects_finder.count_wikis
self.lfs_objects_count = lfs_objects_finder.count_lfs_objects self.lfs_objects_count = lfs_objects_finder.count_lfs_objects
self.job_artifacts_count = job_artifacts_finder.count_job_artifacts self.job_artifacts_count = job_artifacts_finder.count_job_artifacts
self.attachments_count = attachments_finder.count_attachments self.attachments_count = attachments_finder.count_local_attachments
self.last_successful_status_check_at = Time.now self.last_successful_status_check_at = Time.now
self.storage_shards = StorageShard.all self.storage_shards = StorageShard.all
......
...@@ -53,19 +53,19 @@ module Geo ...@@ -53,19 +53,19 @@ module Geo
end end
def find_unsynced_lfs_objects_ids(batch_size:) def find_unsynced_lfs_objects_ids(batch_size:)
lfs_objects_finder.find_unsynced_lfs_objects(batch_size: batch_size, except_registry_ids: scheduled_file_ids(:lfs)) lfs_objects_finder.find_unsynced_lfs_objects(batch_size: batch_size, except_file_ids: scheduled_file_ids(:lfs))
.pluck(:id) .pluck(:id)
.map { |id| [id, :lfs] } .map { |id| [id, :lfs] }
end end
def find_unsynced_attachments_ids(batch_size:) def find_unsynced_attachments_ids(batch_size:)
attachments_finder.find_unsynced_attachments(batch_size: batch_size, except_registry_ids: scheduled_file_ids(Geo::FileService::DEFAULT_OBJECT_TYPES)) attachments_finder.find_unsynced_attachments(batch_size: batch_size, except_file_ids: scheduled_file_ids(Geo::FileService::DEFAULT_OBJECT_TYPES))
.pluck(:id, :uploader) .pluck(:id, :uploader)
.map { |id, uploader| [id, uploader.sub(/Uploader\z/, '').underscore] } .map { |id, uploader| [id, uploader.sub(/Uploader\z/, '').underscore] }
end end
def find_unsynced_job_artifacts_ids(batch_size:) def find_unsynced_job_artifacts_ids(batch_size:)
job_artifacts_finder.find_unsynced_job_artifacts(batch_size: batch_size, except_registry_ids: scheduled_file_ids(:job_artifact)) job_artifacts_finder.find_unsynced_job_artifacts(batch_size: batch_size, except_file_ids: scheduled_file_ids(:job_artifact))
.pluck(:id) .pluck(:id)
.map { |id| [id, :job_artifact] } .map { |id| [id, :job_artifact] }
end end
......
---
title: 'Geo: Ignore remote stored objects when calculating counts'
merge_request: 4864
author:
type: fixed
...@@ -20,8 +20,8 @@ describe Geo::JobArtifactRegistryFinder, :geo do ...@@ -20,8 +20,8 @@ describe Geo::JobArtifactRegistryFinder, :geo do
end end
describe '#count_synced_job_artifacts' do describe '#count_synced_job_artifacts' do
it 'delegates to #find_synced_job_artifacts_registries' do it 'delegates to #legacy_find_synced_job_artifacts' do
expect(subject).to receive(:find_synced_job_artifacts_registries).and_call_original expect(subject).to receive(:legacy_find_synced_job_artifacts).and_call_original
subject.count_synced_job_artifacts subject.count_synced_job_artifacts
end end
...@@ -34,6 +34,15 @@ describe Geo::JobArtifactRegistryFinder, :geo do ...@@ -34,6 +34,15 @@ describe Geo::JobArtifactRegistryFinder, :geo do
expect(subject.count_synced_job_artifacts).to eq 2 expect(subject.count_synced_job_artifacts).to eq 2
end end
it 'ignores remote job artifacts' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_2.id)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id)
job_artifact_1.update!(file_store: ObjectStorage::Store::REMOTE)
expect(subject.count_synced_job_artifacts).to eq 2
end
context 'with selective sync' do context 'with selective sync' do
before do before do
secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group]) secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
...@@ -52,12 +61,21 @@ describe Geo::JobArtifactRegistryFinder, :geo do ...@@ -52,12 +61,21 @@ describe Geo::JobArtifactRegistryFinder, :geo do
expect(subject.count_synced_job_artifacts).to eq 1 expect(subject.count_synced_job_artifacts).to eq 1
end end
it 'ignores remote job artifacts' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_2.id)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id)
job_artifact_1.update!(file_store: ObjectStorage::Store::REMOTE)
expect(subject.count_synced_job_artifacts).to eq 1
end
end end
end end
describe '#count_failed_job_artifacts' do describe '#count_failed_job_artifacts' do
it 'delegates to #find_failed_job_artifacts_registries' do it 'delegates to #legacy_find_failed_job_artifacts' do
expect(subject).to receive(:find_failed_job_artifacts_registries).and_call_original expect(subject).to receive(:legacy_find_failed_job_artifacts).and_call_original
subject.count_failed_job_artifacts subject.count_failed_job_artifacts
end end
...@@ -70,6 +88,15 @@ describe Geo::JobArtifactRegistryFinder, :geo do ...@@ -70,6 +88,15 @@ describe Geo::JobArtifactRegistryFinder, :geo do
expect(subject.count_failed_job_artifacts).to eq 2 expect(subject.count_failed_job_artifacts).to eq 2
end end
it 'ignores remote job artifacts' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id, success: false)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_2.id, success: false)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id, success: false)
job_artifact_1.update!(file_store: ObjectStorage::Store::REMOTE)
expect(subject.count_failed_job_artifacts).to eq 2
end
context 'with selective sync' do context 'with selective sync' do
before do before do
secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group]) secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
...@@ -93,19 +120,22 @@ describe Geo::JobArtifactRegistryFinder, :geo do ...@@ -93,19 +120,22 @@ describe Geo::JobArtifactRegistryFinder, :geo do
expect(subject.count_failed_job_artifacts).to eq 0 expect(subject.count_failed_job_artifacts).to eq 0
end end
end
end
# Disable transactions via :delete method because a foreign table it 'ignores remote job artifacts' do
# can't see changes inside a transaction of a different connection. create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id, success: false)
context 'FDW', :delete do create(:geo_file_registry, :job_artifact, file_id: job_artifact_2.id, success: false)
before do create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id, success: false)
skip('FDW is not configured') if Gitlab::Database.postgresql? && !Gitlab::Geo::Fdw.enabled? job_artifact_1.update!(file_store: ObjectStorage::Store::REMOTE)
expect(subject.count_failed_job_artifacts).to eq 1
end
end end
end
shared_examples 'finds all the things' do
describe '#find_unsynced_job_artifacts' do describe '#find_unsynced_job_artifacts' do
it 'delegates to #fdw_find_unsynced_job_artifacts' do it 'delegates to the correct method' do
expect(subject).to receive(:fdw_find_unsynced_job_artifacts).and_call_original expect(subject).to receive("#{method_prefix}_find_unsynced_job_artifacts".to_sym).and_call_original
subject.find_unsynced_job_artifacts(batch_size: 10) subject.find_unsynced_job_artifacts(batch_size: 10)
end end
...@@ -116,49 +146,39 @@ describe Geo::JobArtifactRegistryFinder, :geo do ...@@ -116,49 +146,39 @@ describe Geo::JobArtifactRegistryFinder, :geo do
job_artifacts = subject.find_unsynced_job_artifacts(batch_size: 10) job_artifacts = subject.find_unsynced_job_artifacts(batch_size: 10)
expect(job_artifacts.map(&:id)).to match_array([job_artifact_2.id, job_artifact_4.id]) expect(job_artifacts).to match_ids(job_artifact_2, job_artifact_4)
end end
it 'excludes job artifacts without an entry on the tracking database' do it 'excludes job artifacts without an entry on the tracking database' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id, success: true) create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id, success: true)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id, success: false) create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id, success: false)
job_artifacts = subject.find_unsynced_job_artifacts(batch_size: 10, except_registry_ids: [job_artifact_2.id]) job_artifacts = subject.find_unsynced_job_artifacts(batch_size: 10, except_file_ids: [job_artifact_2.id])
expect(job_artifacts.map(&:id)).to match_array([job_artifact_4.id]) expect(job_artifacts).to match_ids(job_artifact_4)
end end
end end
end end
context 'Legacy' do # Disable transactions via :delete method because a foreign table
# can't see changes inside a transaction of a different connection.
context 'FDW', :delete do
before do before do
allow(Gitlab::Geo::Fdw).to receive(:enabled?).and_return(false) skip('FDW is not configured') if Gitlab::Database.postgresql? && !Gitlab::Geo::Fdw.enabled?
end end
describe '#find_unsynced_job_artifacts' do include_examples 'finds all the things' do
it 'delegates to #legacy_find_unsynced_job_artifacts' do let(:method_prefix) { 'fdw' }
expect(subject).to receive(:legacy_find_unsynced_job_artifacts).and_call_original end
end
subject.find_unsynced_job_artifacts(batch_size: 10)
end
it 'returns job artifacts without an entry on the tracking database' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id, success: true)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id, success: false)
job_artifacts = subject.find_unsynced_job_artifacts(batch_size: 10)
expect(job_artifacts).to match_array([job_artifact_2, job_artifact_4])
end
it 'excludes job artifacts without an entry on the tracking database' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id, success: true)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id, success: false)
job_artifacts = subject.find_unsynced_job_artifacts(batch_size: 10, except_registry_ids: [job_artifact_2.id]) context 'Legacy' do
before do
allow(Gitlab::Geo::Fdw).to receive(:enabled?).and_return(false)
end
expect(job_artifacts).to match_array([job_artifact_4]) include_examples 'finds all the things' do
end let(:method_prefix) { 'legacy' }
end end
end end
end end
...@@ -20,8 +20,8 @@ describe Geo::LfsObjectRegistryFinder, :geo do ...@@ -20,8 +20,8 @@ describe Geo::LfsObjectRegistryFinder, :geo do
end end
describe '#count_synced_lfs_objects' do describe '#count_synced_lfs_objects' do
it 'delegates to #find_synced_lfs_objects_registries' do it 'delegates to #legacy_find_synced_lfs_objects' do
expect(subject).to receive(:find_synced_lfs_objects_registries).and_call_original expect(subject).to receive(:legacy_find_synced_lfs_objects).and_call_original
subject.count_synced_lfs_objects subject.count_synced_lfs_objects
end end
...@@ -34,8 +34,23 @@ describe Geo::LfsObjectRegistryFinder, :geo do ...@@ -34,8 +34,23 @@ describe Geo::LfsObjectRegistryFinder, :geo do
expect(subject.count_synced_lfs_objects).to eq 2 expect(subject.count_synced_lfs_objects).to eq 2
end end
it 'ignores remote LFS objects' do
create(:geo_file_registry, :lfs, file_id: lfs_object_1.id)
create(:geo_file_registry, :lfs, file_id: lfs_object_2.id)
create(:geo_file_registry, :lfs, file_id: lfs_object_3.id)
lfs_object_1.update!(file_store: ObjectStorage::Store::REMOTE)
expect(subject.count_synced_lfs_objects).to eq 2
end
context 'with selective sync' do context 'with selective sync' do
before do before do
allow_any_instance_of(LfsObjectsProject).to receive(:update_project_statistics).and_return(nil)
create(:lfs_objects_project, project: synced_project, lfs_object: lfs_object_1)
create(:lfs_objects_project, project: synced_project, lfs_object: lfs_object_2)
create(:lfs_objects_project, project: unsynced_project, lfs_object: lfs_object_3)
secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group]) secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
end end
...@@ -46,15 +61,18 @@ describe Geo::LfsObjectRegistryFinder, :geo do ...@@ -46,15 +61,18 @@ describe Geo::LfsObjectRegistryFinder, :geo do
end end
it 'counts LFS objects that has been synced' do it 'counts LFS objects that has been synced' do
allow_any_instance_of(LfsObjectsProject).to receive(:update_project_statistics).and_return(nil) create(:geo_file_registry, :lfs, file_id: lfs_object_1.id, success: false)
create(:geo_file_registry, :lfs, file_id: lfs_object_2.id)
create(:geo_file_registry, :lfs, file_id: lfs_object_3.id)
create(:lfs_objects_project, project: synced_project, lfs_object: lfs_object_1) expect(subject.count_synced_lfs_objects).to eq 1
create(:lfs_objects_project, project: synced_project, lfs_object: lfs_object_2) end
create(:lfs_objects_project, project: unsynced_project, lfs_object: lfs_object_3)
create(:geo_file_registry, :lfs, file_id: lfs_object_1.id, success: false) it 'ignores remote LFS objects' do
create(:geo_file_registry, :lfs, file_id: lfs_object_1.id)
create(:geo_file_registry, :lfs, file_id: lfs_object_2.id) create(:geo_file_registry, :lfs, file_id: lfs_object_2.id)
create(:geo_file_registry, :lfs, file_id: lfs_object_3.id) create(:geo_file_registry, :lfs, file_id: lfs_object_3.id)
lfs_object_1.update!(file_store: ObjectStorage::Store::REMOTE)
expect(subject.count_synced_lfs_objects).to eq 1 expect(subject.count_synced_lfs_objects).to eq 1
end end
...@@ -62,8 +80,8 @@ describe Geo::LfsObjectRegistryFinder, :geo do ...@@ -62,8 +80,8 @@ describe Geo::LfsObjectRegistryFinder, :geo do
end end
describe '#count_failed_lfs_objects' do describe '#count_failed_lfs_objects' do
it 'delegates to #find_failed_lfs_objects_registries' do it 'delegates to #legacy_find_failed_lfs_objects' do
expect(subject).to receive(:find_failed_lfs_objects_registries).and_call_original expect(subject).to receive(:legacy_find_failed_lfs_objects).and_call_original
subject.count_failed_lfs_objects subject.count_failed_lfs_objects
end end
...@@ -76,8 +94,23 @@ describe Geo::LfsObjectRegistryFinder, :geo do ...@@ -76,8 +94,23 @@ describe Geo::LfsObjectRegistryFinder, :geo do
expect(subject.count_failed_lfs_objects).to eq 2 expect(subject.count_failed_lfs_objects).to eq 2
end end
it 'ignores remote LFS objects' do
create(:geo_file_registry, :lfs, file_id: lfs_object_1.id, success: false)
create(:geo_file_registry, :lfs, file_id: lfs_object_2.id, success: false)
create(:geo_file_registry, :lfs, file_id: lfs_object_3.id, success: false)
lfs_object_1.update!(file_store: ObjectStorage::Store::REMOTE)
expect(subject.count_failed_lfs_objects).to eq 2
end
context 'with selective sync' do context 'with selective sync' do
before do before do
allow_any_instance_of(LfsObjectsProject).to receive(:update_project_statistics).and_return(nil)
create(:lfs_objects_project, project: synced_project, lfs_object: lfs_object_1)
create(:lfs_objects_project, project: synced_project, lfs_object: lfs_object_2)
create(:lfs_objects_project, project: unsynced_project, lfs_object: lfs_object_3)
secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group]) secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
end end
...@@ -88,31 +121,28 @@ describe Geo::LfsObjectRegistryFinder, :geo do ...@@ -88,31 +121,28 @@ describe Geo::LfsObjectRegistryFinder, :geo do
end end
it 'counts LFS objects that sync has failed' do it 'counts LFS objects that sync has failed' do
allow_any_instance_of(LfsObjectsProject).to receive(:update_project_statistics).and_return(nil) create(:geo_file_registry, :lfs, file_id: lfs_object_1.id, success: false)
create(:geo_file_registry, :lfs, file_id: lfs_object_2.id)
create(:geo_file_registry, :lfs, file_id: lfs_object_3.id, success: false)
create(:lfs_objects_project, project: synced_project, lfs_object: lfs_object_1) expect(subject.count_failed_lfs_objects).to eq 1
create(:lfs_objects_project, project: synced_project, lfs_object: lfs_object_2) end
create(:lfs_objects_project, project: unsynced_project, lfs_object: lfs_object_3)
it 'ignores remote LFS objects' do
create(:geo_file_registry, :lfs, file_id: lfs_object_1.id, success: false) create(:geo_file_registry, :lfs, file_id: lfs_object_1.id, success: false)
create(:geo_file_registry, :lfs, file_id: lfs_object_2.id) create(:geo_file_registry, :lfs, file_id: lfs_object_2.id, success: false)
create(:geo_file_registry, :lfs, file_id: lfs_object_3.id, success: false) create(:geo_file_registry, :lfs, file_id: lfs_object_3.id, success: false)
lfs_object_1.update!(file_store: ObjectStorage::Store::REMOTE)
expect(subject.count_failed_lfs_objects).to eq 1 expect(subject.count_failed_lfs_objects).to eq 1
end end
end end
end end
# Disable transactions via :delete method because a foreign table shared_examples 'finds all the things' do
# can't see changes inside a transaction of a different connection.
context 'FDW', :delete do
before do
skip('FDW is not configured') if Gitlab::Database.postgresql? && !Gitlab::Geo::Fdw.enabled?
end
describe '#find_unsynced_lfs_objects' do describe '#find_unsynced_lfs_objects' do
it 'delegates to #fdw_find_unsynced_lfs_objects' do it 'delegates to the correct method' do
expect(subject).to receive(:fdw_find_unsynced_lfs_objects).and_call_original expect(subject).to receive("#{method_prefix}_find_unsynced_lfs_objects".to_sym).and_call_original
subject.find_unsynced_lfs_objects(batch_size: 10) subject.find_unsynced_lfs_objects(batch_size: 10)
end end
...@@ -123,49 +153,39 @@ describe Geo::LfsObjectRegistryFinder, :geo do ...@@ -123,49 +153,39 @@ describe Geo::LfsObjectRegistryFinder, :geo do
lfs_objects = subject.find_unsynced_lfs_objects(batch_size: 10) lfs_objects = subject.find_unsynced_lfs_objects(batch_size: 10)
expect(lfs_objects.map(&:id)).to match_array([lfs_object_2.id, lfs_object_4.id]) expect(lfs_objects).to match_ids(lfs_object_2, lfs_object_4)
end end
it 'excludes LFS objects without an entry on the tracking database' do it 'excludes LFS objects without an entry on the tracking database' do
create(:geo_file_registry, :lfs, file_id: lfs_object_1.id, success: true) create(:geo_file_registry, :lfs, file_id: lfs_object_1.id, success: true)
create(:geo_file_registry, :lfs, file_id: lfs_object_3.id, success: false) create(:geo_file_registry, :lfs, file_id: lfs_object_3.id, success: false)
lfs_objects = subject.find_unsynced_lfs_objects(batch_size: 10, except_registry_ids: [lfs_object_2.id]) lfs_objects = subject.find_unsynced_lfs_objects(batch_size: 10, except_file_ids: [lfs_object_2.id])
expect(lfs_objects.map(&:id)).to match_array([lfs_object_4.id]) expect(lfs_objects).to match_ids(lfs_object_4)
end end
end end
end end
context 'Legacy' do # Disable transactions via :delete method because a foreign table
# can't see changes inside a transaction of a different connection.
context 'FDW', :delete do
before do before do
allow(Gitlab::Geo::Fdw).to receive(:enabled?).and_return(false) skip('FDW is not configured') if Gitlab::Database.postgresql? && !Gitlab::Geo::Fdw.enabled?
end end
describe '#find_unsynced_lfs_objects' do include_examples 'finds all the things' do
it 'delegates to #legacy_find_unsynced_lfs_objects' do let(:method_prefix) { 'fdw' }
expect(subject).to receive(:legacy_find_unsynced_lfs_objects).and_call_original end
end
subject.find_unsynced_lfs_objects(batch_size: 10)
end
it 'returns LFS objects without an entry on the tracking database' do
create(:geo_file_registry, :lfs, file_id: lfs_object_1.id, success: true)
create(:geo_file_registry, :lfs, file_id: lfs_object_3.id, success: false)
lfs_objects = subject.find_unsynced_lfs_objects(batch_size: 10)
expect(lfs_objects).to match_array([lfs_object_2, lfs_object_4])
end
it 'excludes LFS objects without an entry on the tracking database' do
create(:geo_file_registry, :lfs, file_id: lfs_object_1.id, success: true)
create(:geo_file_registry, :lfs, file_id: lfs_object_3.id, success: false)
lfs_objects = subject.find_unsynced_lfs_objects(batch_size: 10, except_registry_ids: [lfs_object_2.id]) context 'Legacy' do
before do
allow(Gitlab::Geo::Fdw).to receive(:enabled?).and_return(false)
end
expect(lfs_objects).to match_array([lfs_object_4]) include_examples 'finds all the things' do
end let(:method_prefix) { 'legacy' }
end end
end end
end end
RSpec::Matchers.define :match_ids do |*expected|
match do |actual|
actual_ids = map_ids(actual)
expected_ids = map_ids(expected)
expect(actual_ids).to match_array(expected_ids)
end
description do
'matches elements by ids'
end
def map_ids(elements)
elements = elements.flatten if elements.respond_to?(:flatten)
if elements.respond_to?(:map)
elements.map(&:id)
elsif elements.respond_to?(:id)
[elements.id]
else
raise ArgumentError, "could not map elements to ids: #{elements}"
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment