Commit b7444374 authored by Michael Kozono's avatar Michael Kozono Committed by Nick Thomas

Geo: Refactor FileDownloadDispatchWorker JobFinders and FileRegistryFinders

parent 79f13b9d
module Geo
class AttachmentRegistryFinder < FileRegistryFinder
def attachments
if selective_sync?
Upload.where(group_uploads.or(project_uploads).or(other_uploads))
else
Upload.all
end
def syncable
all.geo_syncable
end
def syncable_attachments
attachments.geo_syncable
def count_syncable
syncable.count
end
def count_syncable_attachments
syncable_attachments.count
end
def count_synced_attachments
def count_synced
if aggregate_pushdown_supported?
find_synced_attachments.count
find_synced.count
else
legacy_find_synced_attachments.count
legacy_find_synced.count
end
end
def count_failed_attachments
def count_failed
if aggregate_pushdown_supported?
find_failed_attachments.count
find_failed.count
else
legacy_find_failed_attachments.count
legacy_find_failed.count
end
end
def count_synced_missing_on_primary_attachments
def count_synced_missing_on_primary
if aggregate_pushdown_supported? && !use_legacy_queries?
fdw_find_synced_missing_on_primary_attachments.count
fdw_find_synced_missing_on_primary.count
else
legacy_find_synced_missing_on_primary_attachments.count
legacy_find_synced_missing_on_primary.count
end
end
def count_registry_attachments
def count_registry
Geo::FileRegistry.attachments.count
end
def find_synced_attachments
if use_legacy_queries?
legacy_find_synced_attachments
else
fdw_find_synced_attachments
end
end
def find_failed_attachments
if use_legacy_queries?
legacy_find_failed_attachments
else
fdw_find_failed_attachments
end
end
# Find limited amount of non replicated attachments.
#
# You can pass a list with `except_file_ids:` so you can exclude items you
......@@ -70,51 +46,51 @@ module Geo
#
# @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_file_ids ids that will be ignored from the query
def find_unsynced_attachments(batch_size:, except_file_ids: [])
def find_unsynced(batch_size:, except_file_ids: [])
relation =
if use_legacy_queries?
legacy_find_unsynced_attachments(except_file_ids: except_file_ids)
legacy_find_unsynced(except_file_ids: except_file_ids)
else
fdw_find_unsynced_attachments(except_file_ids: except_file_ids)
fdw_find_unsynced(except_file_ids: except_file_ids)
end
relation.limit(batch_size)
end
def find_migrated_local_attachments(batch_size:, except_file_ids: [])
def find_migrated_local(batch_size:, except_file_ids: [])
relation =
if use_legacy_queries?
legacy_find_migrated_local_attachments(except_file_ids: except_file_ids)
legacy_find_migrated_local(except_file_ids: except_file_ids)
else
fdw_find_migrated_local_attachments(except_file_ids: except_file_ids)
fdw_find_migrated_local(except_file_ids: except_file_ids)
end
relation.limit(batch_size)
end
def find_retryable_failed_attachments_registries(batch_size:, except_file_ids: [])
find_failed_attachments_registries
def find_retryable_failed_registries(batch_size:, except_file_ids: [])
find_failed_registries
.retry_due
.where.not(file_id: except_file_ids)
.limit(batch_size)
end
def find_retryable_synced_missing_on_primary_attachments_registries(batch_size:, except_file_ids: [])
find_synced_missing_on_primary_attachments_registries
def find_retryable_synced_missing_on_primary_registries(batch_size:, except_file_ids: [])
find_synced_missing_on_primary_registries
.retry_due
.where.not(file_id: except_file_ids)
.limit(batch_size)
end
def find_failed_attachments_registries
Geo::FileRegistry.attachments.failed
end
private
def find_synced_missing_on_primary_attachments_registries
Geo::FileRegistry.attachments.synced.missing_on_primary
def all
if selective_sync?
Upload.where(group_uploads.or(project_uploads).or(other_uploads))
else
Upload.all
end
end
private
def group_uploads
namespace_ids =
......@@ -147,40 +123,68 @@ module Geo
Upload.arel_table
end
def find_synced
if use_legacy_queries?
legacy_find_synced
else
fdw_find_synced
end
end
def find_failed
if use_legacy_queries?
legacy_find_failed
else
fdw_find_failed
end
end
def find_synced_registries
Geo::FileRegistry.attachments.synced
end
def find_failed_registries
Geo::FileRegistry.attachments.failed
end
def find_synced_missing_on_primary_registries
find_synced_registries.missing_on_primary
end
#
# FDW accessors
#
def fdw_find_synced_attachments
fdw_find_syncable_attachments.merge(Geo::FileRegistry.synced)
def fdw_find_synced
fdw_find_syncable.merge(Geo::FileRegistry.synced)
end
def fdw_find_failed_attachments
fdw_find_syncable_attachments.merge(Geo::FileRegistry.failed)
def fdw_find_failed
fdw_find_syncable.merge(Geo::FileRegistry.failed)
end
def fdw_find_syncable_attachments
fdw_attachments.joins("INNER JOIN file_registry ON file_registry.file_id = #{fdw_attachments_table}.id")
def fdw_find_syncable
fdw_all.joins("INNER JOIN file_registry ON file_registry.file_id = #{fdw_table}.id")
.geo_syncable
.merge(Geo::FileRegistry.attachments)
end
def fdw_find_unsynced_attachments(except_file_ids:)
def fdw_find_unsynced(except_file_ids:)
upload_types = Geo::FileService::DEFAULT_OBJECT_TYPES.map { |val| "'#{val}'" }.join(',')
fdw_attachments.joins("LEFT OUTER JOIN file_registry
ON file_registry.file_id = #{fdw_attachments_table}.id
fdw_all.joins("LEFT OUTER JOIN file_registry
ON file_registry.file_id = #{fdw_table}.id
AND file_registry.file_type IN (#{upload_types})")
.geo_syncable
.where(file_registry: { id: nil })
.where.not(id: except_file_ids)
end
def fdw_find_synced_missing_on_primary_attachments
fdw_find_synced_attachments.merge(Geo::FileRegistry.missing_on_primary)
def fdw_find_synced_missing_on_primary
fdw_find_synced.merge(Geo::FileRegistry.missing_on_primary)
end
def fdw_attachments
def fdw_all
if selective_sync?
Geo::Fdw::Upload.where(group_uploads.or(project_uploads).or(other_uploads))
else
......@@ -188,12 +192,12 @@ module Geo
end
end
def fdw_attachments_table
def fdw_table
Geo::Fdw::Upload.table_name
end
def fdw_find_migrated_local_attachments(except_file_ids:)
fdw_attachments.joins("INNER JOIN file_registry ON file_registry.file_id = #{fdw_attachments_table}.id")
def fdw_find_migrated_local(except_file_ids:)
fdw_all.joins("INNER JOIN file_registry ON file_registry.file_id = #{fdw_table}.id")
.with_files_stored_remotely
.merge(Geo::FileRegistry.attachments)
.where.not(id: except_file_ids)
......@@ -203,46 +207,46 @@ module Geo
# Legacy accessors (non FDW)
#
def legacy_find_synced_attachments
def legacy_find_synced
legacy_inner_join_registry_ids(
syncable_attachments,
Geo::FileRegistry.attachments.synced.pluck(:file_id),
syncable,
find_synced_registries.pluck(:file_id),
Upload
)
end
def legacy_find_failed_attachments
def legacy_find_failed
legacy_inner_join_registry_ids(
syncable_attachments,
find_failed_attachments_registries.pluck(:file_id),
syncable,
find_failed_registries.pluck(:file_id),
Upload
)
end
def legacy_find_unsynced_attachments(except_file_ids:)
registry_file_ids = legacy_pluck_registry_file_ids(file_types: Geo::FileService::DEFAULT_OBJECT_TYPES) | except_file_ids
def legacy_find_unsynced(except_file_ids:)
registry_file_ids = Geo::FileRegistry.attachments.pluck(:file_id) | except_file_ids
legacy_left_outer_join_registry_ids(
syncable_attachments,
syncable,
registry_file_ids,
Upload
)
end
def legacy_find_migrated_local_attachments(except_file_ids:)
def legacy_find_migrated_local(except_file_ids:)
registry_file_ids = Geo::FileRegistry.attachments.pluck(:file_id) - except_file_ids
legacy_inner_join_registry_ids(
attachments.with_files_stored_remotely,
all.with_files_stored_remotely,
registry_file_ids,
Upload
)
end
def legacy_find_synced_missing_on_primary_attachments
def legacy_find_synced_missing_on_primary
legacy_inner_join_registry_ids(
syncable_attachments,
Geo::FileRegistry.attachments.synced.missing_on_primary.pluck(:file_id),
syncable,
find_synced_missing_on_primary_registries.pluck(:file_id),
Upload
)
end
......
module Geo
class FileRegistryFinder < RegistryFinder
protected
# @abstract Subclass is expected to implement the declared methods
def legacy_pluck_registry_file_ids(file_types:)
Geo::FileRegistry.where(file_type: file_types).pluck(:file_id)
# @!method syncable
# Return an ActiveRecord::Relation of tracked resource records, filtered
# by selective sync, with files stored locally
def syncable
raise NotImplementedError
end
# @!method count_syncable
# Return a count of tracked resource records, filtered by selective
# sync, with files stored locally
def count_syncable
raise NotImplementedError
end
# @!method count_synced
# Return a count of tracked resource records, filtered by selective
# sync, with files stored locally, and are synced
def count_synced
raise NotImplementedError
end
# @!method count_failed
# Return a count of tracked resource records, filtered by selective
# sync, with files stored locally, and are failed
def count_failed
raise NotImplementedError
end
# @!method count_synced_missing_on_primary
# Return a count of tracked resource records, filtered by selective
# sync, with files stored locally, and are synced and missing on the
# primary
def count_synced_missing_on_primary
raise NotImplementedError
end
# @!method count_registry
# Return a count of the registry records for the tracked file_type(s)
def count_registry
raise NotImplementedError
end
# @!method find_unsynced
# Return an ActiveRecord::Relation of not-yet-tracked resource records,
# filtered by selective sync, with files stored locally, excluding
# specified IDs, limited to batch_size
def find_unsynced
raise NotImplementedError
end
# @!method find_migrated_local
# Return an ActiveRecord::Relation of tracked resource records, filtered
# by selective sync, with files stored remotely, excluding
# specified IDs, limited to batch_size
def find_migrated_local
raise NotImplementedError
end
# @!method find_retryable_failed_registries
# Return an ActiveRecord::Relation of registry records marked as failed,
# which are ready to be retried, excluding specified IDs, limited to
# batch_size
def find_retryable_failed_registries
raise NotImplementedError
end
# @!method find_retryable_synced_missing_on_primary_registries
# Return an ActiveRecord::Relation of registry records marked as synced
# and missing on the primary, which are ready to be retried, excluding
# specified IDs, limited to batch_size
def find_retryable_synced_missing_on_primary_registries
raise NotImplementedError
end
end
end
module Geo
class JobArtifactRegistryFinder < RegistryFinder
def count_syncable_job_artifacts
syncable_job_artifacts.count
def count_syncable
syncable.count
end
def count_synced_job_artifacts
def count_synced
if aggregate_pushdown_supported?
find_synced_job_artifacts.count
find_synced.count
else
legacy_find_synced_job_artifacts.count
legacy_find_synced.count
end
end
def count_failed_job_artifacts
def count_failed
if aggregate_pushdown_supported?
find_failed_job_artifacts.count
find_failed.count
else
legacy_find_failed_job_artifacts.count
legacy_find_failed.count
end
end
def count_synced_missing_on_primary_job_artifacts
def count_synced_missing_on_primary
if aggregate_pushdown_supported?
find_synced_missing_on_primary_job_artifacts.count
find_synced_missing_on_primary.count
else
legacy_find_synced_missing_on_primary_job_artifacts.count
legacy_find_synced_missing_on_primary.count
end
end
def count_registry_job_artifacts
def count_registry
Geo::JobArtifactRegistry.count
end
......@@ -42,117 +42,117 @@ module Geo
#
# @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_artifact_ids ids that will be ignored from the query
def find_unsynced_job_artifacts(batch_size:, except_artifact_ids: [])
def find_unsynced(batch_size:, except_artifact_ids: [])
relation =
if use_legacy_queries?
legacy_find_unsynced_job_artifacts(except_artifact_ids: except_artifact_ids)
legacy_find_unsynced(except_artifact_ids: except_artifact_ids)
else
fdw_find_unsynced_job_artifacts(except_artifact_ids: except_artifact_ids)
fdw_find_unsynced(except_artifact_ids: except_artifact_ids)
end
relation.limit(batch_size)
end
def find_migrated_local_job_artifacts(batch_size:, except_artifact_ids: [])
def find_migrated_local(batch_size:, except_artifact_ids: [])
relation =
if use_legacy_queries?
legacy_find_migrated_local_job_artifacts(except_artifact_ids: except_artifact_ids)
legacy_find_migrated_local(except_artifact_ids: except_artifact_ids)
else
fdw_find_migrated_local_job_artifacts(except_artifact_ids: except_artifact_ids)
fdw_find_migrated_local(except_artifact_ids: except_artifact_ids)
end
relation.limit(batch_size)
end
def job_artifacts
if selective_sync?
Ci::JobArtifact.joins(:project).where(projects: { id: current_node.projects })
else
Ci::JobArtifact.all
end
end
def syncable_job_artifacts
job_artifacts.geo_syncable
def syncable
all.geo_syncable
end
def find_retryable_failed_job_artifacts_registries(batch_size:, except_artifact_ids: [])
find_failed_job_artifacts_registries
def find_retryable_failed_registries(batch_size:, except_artifact_ids: [])
find_failed_registries
.retry_due
.where.not(artifact_id: except_artifact_ids)
.limit(batch_size)
end
def find_retryable_synced_missing_on_primary_job_artifacts_registries(batch_size:, except_artifact_ids: [])
find_synced_missing_on_primary_job_artifacts_registries
def find_retryable_synced_missing_on_primary_registries(batch_size:, except_artifact_ids: [])
find_synced_missing_on_primary_registries
.retry_due
.where.not(artifact_id: except_artifact_ids)
.limit(batch_size)
end
def find_synced_job_artifacts_registries
Geo::JobArtifactRegistry.synced
end
private
def find_synced_missing_on_primary_job_artifacts_registries
Geo::JobArtifactRegistry.synced.missing_on_primary
def all
if selective_sync?
Ci::JobArtifact.joins(:project).where(projects: { id: current_node.projects })
else
Ci::JobArtifact.all
end
def find_failed_job_artifacts_registries
Geo::JobArtifactRegistry.failed
end
private
def find_synced_job_artifacts
def find_synced
if use_legacy_queries?
legacy_find_synced_job_artifacts
legacy_find_synced
else
fdw_find_job_artifacts.merge(find_synced_job_artifacts_registries)
fdw_find.merge(find_synced_registries)
end
end
def find_synced_missing_on_primary_job_artifacts
def find_synced_missing_on_primary
if use_legacy_queries?
legacy_find_synced_missing_on_primary_job_artifacts
legacy_find_synced_missing_on_primary
else
fdw_find_job_artifacts.merge(find_synced_missing_on_primary_job_artifacts_registries)
fdw_find.merge(find_synced_missing_on_primary_registries)
end
end
def find_failed_job_artifacts
def find_failed
if use_legacy_queries?
legacy_find_failed_job_artifacts
legacy_find_failed
else
fdw_find_job_artifacts.merge(find_failed_job_artifacts_registries)
fdw_find.merge(find_failed_registries)
end
end
def find_synced_registries
Geo::JobArtifactRegistry.synced
end
def find_synced_missing_on_primary_registries
find_synced_registries.missing_on_primary
end
def find_failed_registries
Geo::JobArtifactRegistry.failed
end
#
# FDW accessors
#
def fdw_find_job_artifacts
fdw_job_artifacts.joins("INNER JOIN job_artifact_registry ON job_artifact_registry.artifact_id = #{fdw_job_artifacts_table}.id")
def fdw_find
fdw_all.joins("INNER JOIN job_artifact_registry ON job_artifact_registry.artifact_id = #{fdw_table}.id")
.geo_syncable
end
def fdw_find_unsynced_job_artifacts(except_artifact_ids:)
fdw_job_artifacts.joins("LEFT OUTER JOIN job_artifact_registry
ON job_artifact_registry.artifact_id = #{fdw_job_artifacts_table}.id")
def fdw_find_unsynced(except_artifact_ids:)
fdw_all.joins("LEFT OUTER JOIN job_artifact_registry
ON job_artifact_registry.artifact_id = #{fdw_table}.id")
.geo_syncable
.where(job_artifact_registry: { artifact_id: nil })
.where.not(id: except_artifact_ids)
end
def fdw_find_migrated_local_job_artifacts(except_artifact_ids:)
fdw_job_artifacts.joins("INNER JOIN job_artifact_registry ON job_artifact_registry.artifact_id = #{fdw_job_artifacts_table}.id")
def fdw_find_migrated_local(except_artifact_ids:)
fdw_all.joins("INNER JOIN job_artifact_registry ON job_artifact_registry.artifact_id = #{fdw_table}.id")
.with_files_stored_remotely
.where.not(id: except_artifact_ids)
.merge(Geo::JobArtifactRegistry.all)
end
def fdw_job_artifacts
def fdw_all
if selective_sync?
Geo::Fdw::Ci::JobArtifact.joins(:project).where(projects: { id: current_node.projects })
else
......@@ -160,7 +160,7 @@ module Geo
end
end
def fdw_job_artifacts_table
def fdw_table
Geo::Fdw::Ci::JobArtifact.table_name
end
......@@ -168,51 +168,46 @@ module Geo
# Legacy accessors (non FDW)
#
def legacy_find_synced_job_artifacts
def legacy_find_synced
legacy_inner_join_registry_ids(
syncable_job_artifacts,
find_synced_job_artifacts_registries.pluck(:artifact_id),
syncable,
find_synced_registries.pluck(:artifact_id),
Ci::JobArtifact
)
end
def legacy_find_failed_job_artifacts
def legacy_find_failed
legacy_inner_join_registry_ids(
syncable_job_artifacts,
find_failed_job_artifacts_registries.pluck(:artifact_id),
syncable,
find_failed_registries.pluck(:artifact_id),
Ci::JobArtifact
)
end
def legacy_find_unsynced_job_artifacts(except_artifact_ids:)
registry_artifact_ids = legacy_pluck_artifact_ids(include_registry_ids: except_artifact_ids)
def legacy_find_unsynced(except_artifact_ids:)
registry_artifact_ids = Geo::JobArtifactRegistry.pluck(:artifact_id) | except_artifact_ids
legacy_left_outer_join_registry_ids(
syncable_job_artifacts,
syncable,
registry_artifact_ids,
Ci::JobArtifact
)
end
def legacy_pluck_artifact_ids(include_registry_ids:)
ids = Geo::JobArtifactRegistry.pluck(:artifact_id)
(ids + include_registry_ids).uniq
end
def legacy_find_migrated_local_job_artifacts(except_artifact_ids:)
registry_file_ids = Geo::JobArtifactRegistry.pluck(:artifact_id) - except_artifact_ids
def legacy_find_migrated_local(except_artifact_ids:)
registry_artifact_ids = Geo::JobArtifactRegistry.pluck(:artifact_id) - except_artifact_ids
legacy_inner_join_registry_ids(
job_artifacts.with_files_stored_remotely,
registry_file_ids,
all.with_files_stored_remotely,
registry_artifact_ids,
Ci::JobArtifact
)
end
def legacy_find_synced_missing_on_primary_job_artifacts
def legacy_find_synced_missing_on_primary
legacy_inner_join_registry_ids(
syncable_job_artifacts,
find_synced_missing_on_primary_job_artifacts_registries.pluck(:artifact_id),
syncable,
find_synced_missing_on_primary_registries.pluck(:artifact_id),
Ci::JobArtifact
)
end
......
module Geo
class LfsObjectRegistryFinder < FileRegistryFinder
def count_syncable_lfs_objects
syncable_lfs_objects.count
def count_syncable
syncable.count
end
def count_synced_lfs_objects
def count_synced
if aggregate_pushdown_supported?
find_synced_lfs_objects.count
find_synced.count
else
legacy_find_synced_lfs_objects.count
legacy_find_synced.count
end
end
def count_failed_lfs_objects
def count_failed
if aggregate_pushdown_supported?
find_failed_lfs_objects.count
find_failed.count
else
legacy_find_failed_lfs_objects.count
legacy_find_failed.count
end
end
def count_synced_missing_on_primary_lfs_objects
def count_synced_missing_on_primary
if aggregate_pushdown_supported? && !use_legacy_queries?
fdw_find_synced_missing_on_primary_lfs_objects.count
fdw_find_synced_missing_on_primary.count
else
legacy_find_synced_missing_on_primary_lfs_objects.count
legacy_find_synced_missing_on_primary.count
end
end
def count_registry_lfs_objects
def count_registry
Geo::FileRegistry.lfs_objects.count
end
......@@ -42,119 +42,123 @@ module Geo
#
# @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_file_ids ids that will be ignored from the query
def find_unsynced_lfs_objects(batch_size:, except_file_ids: [])
def find_unsynced(batch_size:, except_file_ids: [])
relation =
if use_legacy_queries?
legacy_find_unsynced_lfs_objects(except_file_ids: except_file_ids)
legacy_find_unsynced(except_file_ids: except_file_ids)
else
fdw_find_unsynced_lfs_objects(except_file_ids: except_file_ids)
fdw_find_unsynced(except_file_ids: except_file_ids)
end
relation.limit(batch_size)
end
def find_migrated_local_lfs_objects(batch_size:, except_file_ids: [])
def find_migrated_local(batch_size:, except_file_ids: [])
relation =
if use_legacy_queries?
legacy_find_migrated_local_lfs_objects(except_file_ids: except_file_ids)
legacy_find_migrated_local(except_file_ids: except_file_ids)
else
fdw_find_migrated_local_lfs_objects(except_file_ids: except_file_ids)
fdw_find_migrated_local(except_file_ids: except_file_ids)
end
relation.limit(batch_size)
end
def lfs_objects
if selective_sync?
LfsObject.joins(:projects).where(projects: { id: current_node.projects })
else
LfsObject.all
end
end
def syncable_lfs_objects
lfs_objects.geo_syncable
def syncable
all.geo_syncable
end
def find_retryable_failed_lfs_objects_registries(batch_size:, except_file_ids: [])
find_failed_lfs_objects_registries
def find_retryable_failed_registries(batch_size:, except_file_ids: [])
find_failed_registries
.retry_due
.where.not(file_id: except_file_ids)
.limit(batch_size)
end
def find_retryable_synced_missing_on_primary_lfs_objects_registries(batch_size:, except_file_ids: [])
find_synced_missing_on_primary_lfs_objects_registries
def find_retryable_synced_missing_on_primary_registries(batch_size:, except_file_ids: [])
find_synced_missing_on_primary_registries
.retry_due
.where.not(file_id: except_file_ids)
.limit(batch_size)
end
def find_failed_lfs_objects_registries
Geo::FileRegistry.lfs_objects.failed
end
private
def find_synced_missing_on_primary_lfs_objects_registries
Geo::FileRegistry.lfs_objects.synced.missing_on_primary
def all
if selective_sync?
LfsObject.joins(:projects).where(projects: { id: current_node.projects })
else
LfsObject.all
end
end
private
def find_synced_lfs_objects
def find_synced
if use_legacy_queries?
legacy_find_synced_lfs_objects
legacy_find_synced
else
fdw_find_synced_lfs_objects
fdw_find_synced
end
end
def find_failed_lfs_objects
def find_failed
if use_legacy_queries?
legacy_find_failed_lfs_objects
legacy_find_failed
else
fdw_find_failed_lfs_objects
fdw_find_failed
end
end
def find_synced_registries
Geo::FileRegistry.lfs_objects.synced
end
def find_failed_registries
Geo::FileRegistry.lfs_objects.failed
end
def find_synced_missing_on_primary_registries
find_synced_registries.missing_on_primary
end
#
# FDW accessors
#
def fdw_find_lfs_objects
fdw_lfs_objects.joins("INNER JOIN file_registry ON file_registry.file_id = #{fdw_lfs_objects_table}.id")
def fdw_find
fdw_all.joins("INNER JOIN file_registry ON file_registry.file_id = #{fdw_table}.id")
.geo_syncable
.merge(Geo::FileRegistry.lfs_objects)
end
def fdw_find_unsynced_lfs_objects(except_file_ids:)
fdw_lfs_objects.joins("LEFT OUTER JOIN file_registry
ON file_registry.file_id = #{fdw_lfs_objects_table}.id
def fdw_find_unsynced(except_file_ids:)
fdw_all.joins("LEFT OUTER JOIN file_registry
ON file_registry.file_id = #{fdw_table}.id
AND file_registry.file_type = 'lfs'")
.geo_syncable
.where(file_registry: { id: nil })
.where.not(id: except_file_ids)
end
def fdw_find_migrated_local_lfs_objects(except_file_ids:)
fdw_lfs_objects.joins("INNER JOIN file_registry ON file_registry.file_id = #{fdw_lfs_objects_table}.id")
def fdw_find_migrated_local(except_file_ids:)
fdw_all.joins("INNER JOIN file_registry ON file_registry.file_id = #{fdw_table}.id")
.with_files_stored_remotely
.where.not(id: except_file_ids)
.merge(Geo::FileRegistry.lfs_objects)
end
def fdw_find_synced_lfs_objects
fdw_find_lfs_objects.merge(Geo::FileRegistry.synced)
def fdw_find_synced
fdw_find.merge(Geo::FileRegistry.synced)
end
def fdw_find_synced_missing_on_primary_lfs_objects
fdw_find_lfs_objects.merge(Geo::FileRegistry.synced.missing_on_primary)
def fdw_find_synced_missing_on_primary
fdw_find.merge(Geo::FileRegistry.synced.missing_on_primary)
end
def fdw_find_failed_lfs_objects
fdw_find_lfs_objects.merge(Geo::FileRegistry.failed)
def fdw_find_failed
fdw_find.merge(Geo::FileRegistry.failed)
end
def fdw_lfs_objects
def fdw_all
if selective_sync?
Geo::Fdw::LfsObject.joins(:project).where(projects: { id: current_node.projects })
else
......@@ -162,7 +166,7 @@ module Geo
end
end
def fdw_lfs_objects_table
def fdw_table
Geo::Fdw::LfsObject.table_name
end
......@@ -170,46 +174,46 @@ module Geo
# Legacy accessors (non FDW)
#
def legacy_find_synced_lfs_objects
def legacy_find_synced
legacy_inner_join_registry_ids(
syncable_lfs_objects,
Geo::FileRegistry.lfs_objects.synced.pluck(:file_id),
syncable,
find_synced_registries.pluck(:file_id),
LfsObject
)
end
def legacy_find_failed_lfs_objects
def legacy_find_failed
legacy_inner_join_registry_ids(
syncable_lfs_objects,
find_failed_lfs_objects_registries.pluck(:file_id),
syncable,
find_failed_registries.pluck(:file_id),
LfsObject
)
end
def legacy_find_unsynced_lfs_objects(except_file_ids:)
registry_file_ids = legacy_pluck_registry_file_ids(file_types: :lfs) | except_file_ids
def legacy_find_unsynced(except_file_ids:)
registry_file_ids = Geo::FileRegistry.lfs_objects.pluck(:file_id) | except_file_ids
legacy_left_outer_join_registry_ids(
syncable_lfs_objects,
syncable,
registry_file_ids,
LfsObject
)
end
def legacy_find_migrated_local_lfs_objects(except_file_ids:)
def legacy_find_migrated_local(except_file_ids:)
registry_file_ids = Geo::FileRegistry.lfs_objects.pluck(:file_id) - except_file_ids
legacy_inner_join_registry_ids(
lfs_objects.with_files_stored_remotely,
all.with_files_stored_remotely,
registry_file_ids,
LfsObject
)
end
def legacy_find_synced_missing_on_primary_lfs_objects
def legacy_find_synced_missing_on_primary
legacy_inner_join_registry_ids(
syncable_lfs_objects,
Geo::FileRegistry.lfs_objects.synced.missing_on_primary.pluck(:file_id),
syncable,
find_synced_missing_on_primary_registries.pluck(:file_id),
LfsObject
)
end
......
......@@ -150,9 +150,9 @@ class GeoNodeStatus < ActiveRecord::Base
self.last_event_date = latest_event&.created_at
self.repositories_count = projects_finder.count_repositories
self.wikis_count = projects_finder.count_wikis
self.lfs_objects_count = lfs_objects_finder.count_syncable_lfs_objects
self.job_artifacts_count = job_artifacts_finder.count_syncable_job_artifacts
self.attachments_count = attachments_finder.count_syncable_attachments
self.lfs_objects_count = lfs_objects_finder.count_syncable
self.job_artifacts_count = job_artifacts_finder.count_syncable
self.attachments_count = attachments_finder.count_syncable
self.last_successful_status_check_at = Time.now
self.storage_shards = StorageShard.all
......@@ -202,18 +202,18 @@ class GeoNodeStatus < ActiveRecord::Base
self.repositories_failed_count = projects_finder.count_failed_repositories
self.wikis_synced_count = projects_finder.count_synced_wikis
self.wikis_failed_count = projects_finder.count_failed_wikis
self.lfs_objects_synced_count = lfs_objects_finder.count_synced_lfs_objects
self.lfs_objects_failed_count = lfs_objects_finder.count_failed_lfs_objects
self.lfs_objects_registry_count = lfs_objects_finder.count_registry_lfs_objects
self.lfs_objects_synced_missing_on_primary_count = lfs_objects_finder.count_synced_missing_on_primary_lfs_objects
self.job_artifacts_synced_count = job_artifacts_finder.count_synced_job_artifacts
self.job_artifacts_failed_count = job_artifacts_finder.count_failed_job_artifacts
self.job_artifacts_registry_count = job_artifacts_finder.count_registry_job_artifacts
self.job_artifacts_synced_missing_on_primary_count = job_artifacts_finder.count_synced_missing_on_primary_job_artifacts
self.attachments_synced_count = attachments_finder.count_synced_attachments
self.attachments_failed_count = attachments_finder.count_failed_attachments
self.attachments_registry_count = attachments_finder.count_registry_attachments
self.attachments_synced_missing_on_primary_count = attachments_finder.count_synced_missing_on_primary_attachments
self.lfs_objects_synced_count = lfs_objects_finder.count_synced
self.lfs_objects_failed_count = lfs_objects_finder.count_failed
self.lfs_objects_registry_count = lfs_objects_finder.count_registry
self.lfs_objects_synced_missing_on_primary_count = lfs_objects_finder.count_synced_missing_on_primary
self.job_artifacts_synced_count = job_artifacts_finder.count_synced
self.job_artifacts_failed_count = job_artifacts_finder.count_failed
self.job_artifacts_registry_count = job_artifacts_finder.count_registry
self.job_artifacts_synced_missing_on_primary_count = job_artifacts_finder.count_synced_missing_on_primary
self.attachments_synced_count = attachments_finder.count_synced
self.attachments_failed_count = attachments_finder.count_failed
self.attachments_registry_count = attachments_finder.count_registry
self.attachments_synced_missing_on_primary_count = attachments_finder.count_synced_missing_on_primary
load_verification_data
end
......
......@@ -28,24 +28,25 @@ module Geo
end
end
# Get a batch of unsynced resources, taking equal parts from each resource.
#
# @return [Array] job arguments of unsynced resources
def find_unsynced_jobs(batch_size:)
find_jobs(sync_statuses: [:unsynced], batch_size: batch_size)
jobs = job_finders.reduce([]) do |jobs, job_finder|
jobs << job_finder.find_unsynced_jobs(batch_size: batch_size)
end
# @return [Array] job arguments of low priority resources
def find_low_priority_jobs(batch_size:)
find_jobs(sync_statuses: [:failed, :synced_missing_on_primary], batch_size: batch_size)
take_batch(*jobs, batch_size: batch_size)
end
# Get a batch of resources taking equal parts from each resource.
# Get a batch of failed and synced-but-missing-on-primary resources, taking
# equal parts from each resource.
#
# @return [Array] job arguments of a batch of resources
def find_jobs(sync_statuses:, batch_size:)
# @return [Array] job arguments of low priority resources
def find_low_priority_jobs(batch_size:)
jobs = job_finders.reduce([]) do |jobs, job_finder|
sync_statuses.reduce(jobs) do |jobs, sync_status|
jobs << job_finder.find_jobs(sync_status: sync_status, batch_size: batch_size)
end
jobs << job_finder.find_failed_jobs(batch_size: batch_size)
jobs << job_finder.find_synced_missing_on_primary_jobs(batch_size: batch_size)
end
take_batch(*jobs, batch_size: batch_size)
......
module Geo
class FileDownloadDispatchWorker
class AttachmentJobFinder < JobFinder
def resource_type
:attachment
end
EXCEPT_RESOURCE_IDS_KEY = :except_file_ids
def except_resource_ids_key
:except_file_ids
def registry_finder
@registry_finder ||= Geo::AttachmentRegistryFinder.new(current_node: Gitlab::Geo.current_node)
end
private
# Why do we need a different `file_type` for each Uploader? Why not just use 'upload'?
def find_unsynced_jobs(batch_size:)
registry_finder.find_unsynced_attachments(batch_size: batch_size, except_file_ids: scheduled_file_ids)
.pluck(:id, :uploader)
def convert_resource_relation_to_job_args(relation)
relation.pluck(:id, :uploader)
.map { |id, uploader| [uploader.sub(/Uploader\z/, '').underscore, id] }
end
def find_failed_jobs(batch_size:)
find_failed_registries(batch_size: batch_size).pluck(:file_type, :file_id)
end
def find_synced_missing_on_primary_jobs(batch_size:)
find_synced_missing_on_primary_registries(batch_size: batch_size).pluck(:file_type, :file_id)
def convert_registry_relation_to_job_args(relation)
relation.pluck(:file_type, :file_id)
end
end
end
......
module Geo
class FileDownloadDispatchWorker
class JobArtifactJobFinder < JobFinder
def resource_type
:job_artifact
end
def resource_id_prefix
:artifact
end
def find_unsynced_jobs(batch_size:)
registry_finder.find_unsynced_job_artifacts(batch_size: batch_size, except_artifact_ids: scheduled_file_ids)
.pluck(:id)
.map { |id| ['job_artifact', id] }
end
def find_failed_jobs(batch_size:)
find_failed_registries(batch_size: batch_size).pluck(:artifact_id).map { |id| ['job_artifact', id] }
end
RESOURCE_ID_KEY = :artifact_id
EXCEPT_RESOURCE_IDS_KEY = :except_artifact_ids
FILE_SERVICE_OBJECT_TYPE = :job_artifact
def find_synced_missing_on_primary_jobs(batch_size:)
find_synced_missing_on_primary_registries(batch_size: batch_size).pluck(:artifact_id).map { |id| ['job_artifact', id] }
def registry_finder
@registry_finder ||= Geo::JobArtifactRegistryFinder.new(current_node: Gitlab::Geo.current_node)
end
end
end
......
module Geo
class FileDownloadDispatchWorker
# This class is meant to be inherited, and is responsible for generating
# batches of job arguments for FileDownloadWorker.
#
# The subclass should define
#
# * registry_finder
# * EXCEPT_RESOURCE_IDS_KEY
# * RESOURCE_ID_KEY
# * FILE_SERVICE_OBJECT_TYPE
#
class JobFinder
include Gitlab::Utils::StrongMemoize
attr_reader :registry_finder, :scheduled_file_ids
attr_reader :scheduled_file_ids
def initialize(scheduled_file_ids)
current_node = Gitlab::Geo.current_node
@registry_finder = registry_finder_class.new(current_node: current_node)
@scheduled_file_ids = scheduled_file_ids
end
def registry_finder_class
"Geo::#{resource_type.to_s.classify}RegistryFinder".constantize
def find_unsynced_jobs(batch_size:)
convert_resource_relation_to_job_args(
registry_finder.find_unsynced(find_batch_params(batch_size))
)
end
def except_resource_ids_key
:"except_#{resource_id_prefix}_ids"
def find_failed_jobs(batch_size:)
convert_registry_relation_to_job_args(
registry_finder.find_retryable_failed_registries(find_batch_params(batch_size))
)
end
def find_jobs(sync_status:, batch_size:)
self.public_send(:"find_#{sync_status}_jobs", batch_size: batch_size) # rubocop:disable GitlabSecurity/PublicSend
def find_synced_missing_on_primary_jobs(batch_size:)
convert_registry_relation_to_job_args(
registry_finder.find_retryable_synced_missing_on_primary_registries(find_batch_params(batch_size))
)
end
def find_failed_registries(batch_size:)
registry_finder.public_send(:"find_retryable_failed_#{resource_type}s_registries", batch_size: batch_size, except_resource_ids_key => scheduled_file_ids) # rubocop:disable GitlabSecurity/PublicSend
private
def find_batch_params(batch_size)
{
:batch_size => batch_size,
self.class::EXCEPT_RESOURCE_IDS_KEY => scheduled_file_ids
}
end
def convert_resource_relation_to_job_args(relation)
relation.pluck(:id).map { |id| [self.class::FILE_SERVICE_OBJECT_TYPE.to_s, id] }
end
def find_synced_missing_on_primary_registries(batch_size:)
registry_finder.public_send(:"find_retryable_synced_missing_on_primary_#{resource_type}s_registries", batch_size: batch_size, except_resource_ids_key => scheduled_file_ids) # rubocop:disable GitlabSecurity/PublicSend
def convert_registry_relation_to_job_args(relation)
relation.pluck(self.class::RESOURCE_ID_KEY).map { |id| [self.class::FILE_SERVICE_OBJECT_TYPE.to_s, id] }
end
end
end
......
module Geo
class FileDownloadDispatchWorker
class LfsObjectJobFinder < JobFinder
def resource_type
:lfs_object
end
def except_resource_ids_key
:except_file_ids
end
def find_unsynced_jobs(batch_size:)
registry_finder.find_unsynced_lfs_objects(batch_size: batch_size, except_file_ids: scheduled_file_ids)
.pluck(:id)
.map { |id| ['lfs', id] }
end
def find_failed_jobs(batch_size:)
find_failed_registries(batch_size: batch_size).pluck(:file_id).map { |id| ['lfs', id] }
end
RESOURCE_ID_KEY = :file_id
EXCEPT_RESOURCE_IDS_KEY = :except_file_ids
FILE_SERVICE_OBJECT_TYPE = :lfs
def find_synced_missing_on_primary_jobs(batch_size:)
find_synced_missing_on_primary_registries(batch_size: batch_size).pluck(:file_id).map { |id| ['lfs', id] }
def registry_finder
@registry_finder ||= Geo::LfsObjectRegistryFinder.new(current_node: Gitlab::Geo.current_node)
end
end
end
......
......@@ -45,7 +45,7 @@ module Geo
def find_migrated_local_lfs_objects_ids(batch_size:)
return [] unless lfs_objects_object_store_enabled?
lfs_objects_finder.find_migrated_local_lfs_objects(batch_size: batch_size, except_file_ids: scheduled_file_ids(:lfs))
lfs_objects_finder.find_migrated_local(batch_size: batch_size, except_file_ids: scheduled_file_ids(:lfs))
.pluck(:id)
.map { |id| ['lfs', id] }
end
......@@ -53,7 +53,7 @@ module Geo
def find_migrated_local_attachments_ids(batch_size:)
return [] unless attachments_object_store_enabled?
attachments_finder.find_migrated_local_attachments(batch_size: batch_size, except_file_ids: scheduled_file_ids(Geo::FileService::DEFAULT_OBJECT_TYPES))
attachments_finder.find_migrated_local(batch_size: batch_size, except_file_ids: scheduled_file_ids(Geo::FileService::DEFAULT_OBJECT_TYPES))
.pluck(:uploader, :id)
.map { |uploader, id| [uploader.sub(/Uploader\z/, '').underscore, id] }
end
......@@ -61,7 +61,7 @@ module Geo
def find_migrated_local_job_artifacts_ids(batch_size:)
return [] unless job_artifacts_object_store_enabled?
job_artifacts_finder.find_migrated_local_job_artifacts(batch_size: batch_size, except_artifact_ids: scheduled_file_ids(:job_artifact))
job_artifacts_finder.find_migrated_local(batch_size: batch_size, except_artifact_ids: scheduled_file_ids(:job_artifact))
.pluck(:id)
.map { |id| ['job_artifact', id] }
end
......
......@@ -3,6 +3,10 @@ require 'spec_helper'
describe Geo::ProjectRegistryFinder, :geo do
include ::EE::GeoHelpers
# Using let() instead of set() because set() does not work properly
# when using the :delete DatabaseCleaner strategy, which is required for FDW
# tests because a foreign table can't see changes inside a transaction of a
# different connection.
let(:secondary) { create(:geo_node) }
let(:synced_group) { create(:group) }
let!(:project_not_synced) { create(:project) }
......
shared_examples_for 'a file registry finder' do
it 'responds to file registry finder methods' do
file_registry_finder_methods = %i{
syncable
count_syncable
count_synced
count_failed
count_synced_missing_on_primary
count_registry
find_unsynced
find_migrated_local
find_retryable_failed_registries
find_retryable_synced_missing_on_primary_registries
}
file_registry_finder_methods.each do |method|
expect(subject).to respond_to(method)
end
end
# Disable transactions via :delete method because a foreign table
# can't see changes inside a transaction of a different connection.
context 'FDW', :delete do
before do
skip('FDW is not configured') if Gitlab::Database.postgresql? && !Gitlab::Geo::Fdw.enabled?
end
include_examples 'counts all the things'
include_examples 'finds all the things' do
let(:method_prefix) { 'fdw' }
end
end
context 'Legacy' do
before do
allow(Gitlab::Geo::Fdw).to receive(:enabled?).and_return(false)
end
include_examples 'counts all the things'
include_examples 'finds all the things' do
let(:method_prefix) { 'legacy' }
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment