Commit fb89302c authored by Mayra Cabrera's avatar Mayra Cabrera

Merge branch '206946-registry-is-ssot-for-uploads' into 'master'

Geo - Make registry table SSOT for Uploads

See merge request gitlab-org/gitlab!25482
parents fe8d8ccb c93ba76a
......@@ -34,64 +34,119 @@ module Geo
Upload
end
# Returns untracked IDs as well as tracked IDs that are unused.
#
# Untracked IDs are model IDs that are supposed to be synced but don't yet
# have a registry entry.
#
# Unused tracked IDs are model IDs that are not supposed to be synced but
# already have a registry entry. For example:
#
# - orphaned registries
# - records that became excluded from selective sync
# - records that are in object storage, and `sync_object_storage` became
# disabled
#
# We compute both sets in this method to reduce the number of DB queries
# performed.
#
# @return [Array] the first element is an Array of untracked IDs, and the second element is an Array of tracked IDs that are unused
def find_registry_differences(range)
source = attachments(fdw: false).where(id: range).pluck(::Upload.arel_table[:id], ::Upload.arel_table[:uploader]) # rubocop:disable CodeReuse/ActiveRecord
tracked = Geo::UploadRegistry.where(file_id: range).pluck(:file_id, :file_type) # rubocop:disable CodeReuse/ActiveRecord
untracked = source - tracked
unused_tracked = tracked - source
[untracked, unused_tracked]
end
# Returns Geo::UploadRegistry records that have never been synced.
#
# Does not care about selective sync, because it considers the Registry
# table to be the single source of truth. The contract is that other
# processes need to ensure that the table only contains records that should
# be synced.
#
# Any registries that have ever been synced that currently need to be
# resynced will be handled by other find methods (like
# #find_retryable_failed_registries)
#
# You can pass a list with `except_ids:` so you can exclude items you
# already scheduled but haven't finished and aren't persisted to the database yet
#
# @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_ids ids that will be ignored from the query
# rubocop:disable CodeReuse/ActiveRecord
def find_never_synced_registries(batch_size:, except_ids: [])
Geo::UploadRegistry
.never
.model_id_not_in(except_ids)
.limit(batch_size)
end
# rubocop:enable CodeReuse/ActiveRecord
# Deprecated in favor of the process using
# #find_registry_differences and #find_never_synced_registries
#
# Find limited amount of non replicated attachments.
#
# You can pass a list with `except_file_ids:` so you can exclude items you
# You can pass a list with `except_ids:` so you can exclude items you
# already scheduled but haven't finished and aren't persisted to the database yet
#
# TODO: Alternative here is to use some sort of window function with a cursor instead
# of simply limiting the query and passing a list of items we don't want
#
# @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_file_ids ids that will be ignored from the query
# @param [Array<Integer>] except_ids ids that will be ignored from the query
# rubocop: disable CodeReuse/ActiveRecord
def find_unsynced(batch_size:, except_file_ids: [])
def find_unsynced(batch_size:, except_ids: [])
attachments
.missing_registry
.id_not_in(except_file_ids)
.id_not_in(except_ids)
.limit(batch_size)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def find_migrated_local(batch_size:, except_file_ids: [])
def find_migrated_local(batch_size:, except_ids: [])
all_attachments
.inner_join_registry
.with_files_stored_remotely
.id_not_in(except_file_ids)
.id_not_in(except_ids)
.limit(batch_size)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def find_retryable_failed_registries(batch_size:, except_file_ids: [])
def find_retryable_failed_registries(batch_size:, except_ids: [])
Geo::UploadRegistry
.failed
.retry_due
.file_id_not_in(except_file_ids)
.model_id_not_in(except_ids)
.limit(batch_size)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def find_retryable_synced_missing_on_primary_registries(batch_size:, except_file_ids: [])
def find_retryable_synced_missing_on_primary_registries(batch_size:, except_ids: [])
Geo::UploadRegistry
.synced
.missing_on_primary
.retry_due
.file_id_not_in(except_file_ids)
.model_id_not_in(except_ids)
.limit(batch_size)
end
# rubocop: enable CodeReuse/ActiveRecord
private
def attachments
local_storage_only? ? all_attachments.with_files_stored_locally : all_attachments
def attachments(fdw: true)
local_storage_only?(fdw: fdw) ? all_attachments(fdw: fdw).with_files_stored_locally : all_attachments(fdw: fdw)
end
def all_attachments
current_node.attachments
def all_attachments(fdw: true)
current_node(fdw: fdw).attachments
end
def registries_for_attachments
......
......@@ -3,6 +3,9 @@
class Geo::UploadRegistry < Geo::BaseRegistry
include Geo::Syncable
MODEL_CLASS = ::Upload
MODEL_FOREIGN_KEY = :file_id
self.table_name = 'file_registry'
belongs_to :upload, foreign_key: :file_id
......@@ -11,16 +14,27 @@ class Geo::UploadRegistry < Geo::BaseRegistry
scope :fresh, -> { order(created_at: :desc) }
scope :never, -> { where(success: false, retry_count: nil) }
def self.file_id_in(ids)
where(file_id: ids)
def self.registry_consistency_worker_enabled?
Feature.enabled?(:geo_file_registry_ssot_sync)
end
def self.finder_class
::Geo::AttachmentRegistryFinder
end
def self.file_id_not_in(ids)
where.not(file_id: ids)
# If false, RegistryConsistencyService will frequently check the end of the
# table to quickly handle new replicables.
def self.has_create_events?
false
end
def self.pluck_file_key
where(nil).pluck(:file_id)
# TODO: Investigate replacing this with bulk insert (there was an obstacle).
# https://gitlab.com/gitlab-org/gitlab/issues/197310
def self.insert_for_model_ids(attrs)
attrs.map do |file_id, file_type|
registry = create(file_id: file_id, file_type: file_type)
registry.id
end.compact
end
def self.with_search(query)
......
......@@ -3,12 +3,22 @@
module Geo
class FileDownloadDispatchWorker # rubocop:disable Scalability/IdempotentWorker
class AttachmentJobFinder < JobFinder # rubocop:disable Scalability/IdempotentWorker
EXCEPT_RESOURCE_IDS_KEY = :except_file_ids
EXCEPT_RESOURCE_IDS_KEY = :except_ids
def registry_finder
@registry_finder ||= Geo::AttachmentRegistryFinder.new(current_node_id: Gitlab::Geo.current_node.id)
end
def find_unsynced_jobs(batch_size:)
if Geo::UploadRegistry.registry_consistency_worker_enabled?
convert_registry_relation_to_job_args(
registry_finder.find_never_synced_registries(find_batch_params(batch_size))
)
else
super
end
end
private
# Why do we need a different `file_type` for each Uploader? Why not just use 'upload'?
......
......@@ -12,7 +12,7 @@ module Geo
end
def find_unsynced_jobs(batch_size:)
if Feature.enabled?(:geo_lfs_registry_ssot_sync)
if Geo::LfsObjectRegistry.registry_consistency_worker_enabled?
convert_registry_relation_to_job_args(
registry_finder.find_never_synced_registries(find_batch_params(batch_size))
)
......
......@@ -58,7 +58,7 @@ module Geo
def find_migrated_local_attachments_ids(batch_size:)
return [] unless attachments_object_store_enabled?
attachments_finder.find_migrated_local(batch_size: batch_size, except_file_ids: scheduled_file_ids(Gitlab::Geo::Replication::USER_UPLOADS_OBJECT_TYPES))
attachments_finder.find_migrated_local(batch_size: batch_size, except_ids: scheduled_file_ids(Gitlab::Geo::Replication::USER_UPLOADS_OBJECT_TYPES))
.pluck(:uploader, :id)
.map { |uploader, id| [uploader.sub(/Uploader\z/, '').underscore, id] }
end
......
......@@ -16,7 +16,7 @@ module Geo
feature_category :geo_replication
# This is probably not the best place to "register" replicables for this functionality
REGISTRY_CLASSES = [Geo::JobArtifactRegistry, Geo::LfsObjectRegistry].freeze
REGISTRY_CLASSES = [Geo::JobArtifactRegistry, Geo::LfsObjectRegistry, Geo::UploadRegistry].freeze
BATCH_SIZE = 1000
# @return [Boolean] true if at least 1 registry was created, else false
......
......@@ -19,6 +19,11 @@ FactoryBot.define do
retry_count { 1 }
end
trait :never_synced do
success { false }
retry_count { nil }
end
trait :with_file do
after(:build, :stub) do |registry, _|
file =
......
......@@ -53,7 +53,7 @@ describe Geo::AttachmentRegistryFinder, :geo, :geo_fdw do
end
it 'returns attachments without an entry on the tracking database, excluding from exception list' do
attachments = subject.find_unsynced(batch_size: 10, except_file_ids: [upload_issuable_synced_nested_project.id])
attachments = subject.find_unsynced(batch_size: 10, except_ids: [upload_issuable_synced_nested_project.id])
expect(attachments).to match_ids(upload_unsynced_project, upload_synced_project, upload_personal_snippet,
upload_remote_unsynced_project, upload_remote_synced_group)
......@@ -64,7 +64,7 @@ describe Geo::AttachmentRegistryFinder, :geo, :geo_fdw do
let(:secondary) { create(:geo_node, :local_storage_only) }
it 'returns local attachments only' do
attachments = subject.find_unsynced(batch_size: 10, except_file_ids: [upload_synced_project.id])
attachments = subject.find_unsynced(batch_size: 10, except_ids: [upload_synced_project.id])
expect(attachments).to match_ids(upload_issuable_synced_nested_project, upload_unsynced_project,
upload_personal_snippet)
......@@ -75,7 +75,7 @@ describe Geo::AttachmentRegistryFinder, :geo, :geo_fdw do
let(:secondary) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) }
it 'returns attachments without an entry on the tracking database, excluding from exception list' do
attachments = subject.find_unsynced(batch_size: 10, except_file_ids: [upload_synced_project.id])
attachments = subject.find_unsynced(batch_size: 10, except_ids: [upload_synced_project.id])
expect(attachments).to match_ids(upload_issuable_synced_nested_project, upload_personal_snippet,
upload_remote_synced_group)
......@@ -101,7 +101,7 @@ describe Geo::AttachmentRegistryFinder, :geo, :geo_fdw do
end
it 'returns attachments stored remotely and successfully synced locally' do
attachments = subject.find_migrated_local(batch_size: 100, except_file_ids: [upload_remote_unsynced_project.id])
attachments = subject.find_migrated_local(batch_size: 100, except_ids: [upload_remote_unsynced_project.id])
expect(attachments).to match_ids(upload_remote_synced_project)
end
......
......@@ -25,8 +25,6 @@ describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
end
describe '#perform' do
subject { described_class.new }
before do
allow(subject).to receive(:sleep) # faster tests
end
......@@ -80,14 +78,17 @@ describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
it 'creates missing registries for each registry class' do
lfs_object = create(:lfs_object)
job_artifact = create(:ci_job_artifact)
upload = create(:upload)
expect(Geo::LfsObjectRegistry.where(lfs_object_id: lfs_object.id).count).to eq(0)
expect(Geo::JobArtifactRegistry.where(artifact_id: job_artifact.id).count).to eq(0)
expect(Geo::UploadRegistry.where(file_id: upload.id).count).to eq(0)
subject.perform
expect(Geo::LfsObjectRegistry.where(lfs_object_id: lfs_object.id).count).to eq(1)
expect(Geo::JobArtifactRegistry.where(artifact_id: job_artifact.id).count).to eq(1)
expect(Geo::UploadRegistry.where(file_id: upload.id).count).to eq(1)
end
context 'when geo_lfs_registry_ssot_sync is disabled' do
......@@ -103,6 +104,7 @@ describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
it 'does not execute RegistryConsistencyService for LFS objects' do
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::JobArtifactRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::UploadRegistry, batch_size: 1000).and_call_original
expect(Geo::RegistryConsistencyService).not_to receive(:new).with(Geo::LfsObjectRegistry, batch_size: 1000)
......@@ -123,6 +125,7 @@ describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
it 'does not execute RegistryConsistencyService for Job Artifacts' do
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::LfsObjectRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::UploadRegistry, batch_size: 1000).and_call_original
expect(Geo::RegistryConsistencyService).not_to receive(:new).with(Geo::JobArtifactRegistry, batch_size: 1000)
......@@ -130,6 +133,27 @@ describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
end
end
context 'when geo_file_registry_ssot_sync is disabled' do
let_it_be(:upload) { create(:upload) }
before do
stub_feature_flags(geo_file_registry_ssot_sync: false)
end
it 'returns false' do
expect(subject.perform).to be_falsey
end
it 'does not execute RegistryConsistencyService for Uploads' do
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::JobArtifactRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::LfsObjectRegistry, batch_size: 1000).and_call_original
expect(Geo::RegistryConsistencyService).not_to receive(:new).with(Geo::UploadRegistry, batch_size: 1000)
subject.perform
end
end
context 'when the current Geo node is disabled or primary' do
before do
stub_primary_node
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment