Commit e523b67a authored by Michael Kozono's avatar Michael Kozono Committed by Douglas Barbosa Alexandre

Treat registry as SSOT for LFS

FileDownloadDispatchWorker looks only at the registry table
for things that have never been synced.

In order for the counts to be correct, we have to distinguish
between "never synced" and "failed" with "retry_count" being
"nil" for the former and "not nil" for the latter.

Feature flag is "geo_lfs_registry_ssot_sync".
parent 35f3601b
......@@ -16,6 +16,8 @@ class LfsObjectsProject < ApplicationRecord
design: 2 ## EE-specific
}
scope :project_id_in, ->(ids) { where(project_id: ids) }
private
def update_project_statistics
......
......@@ -432,6 +432,11 @@ production: &base
geo_repository_sync_worker:
cron: "*/1 * * * *"
# GitLab Geo registry backfill worker
# NOTE: This will only take effect if Geo is enabled (secondary nodes only)
geo_secondary_registry_consistency_worker:
cron: "* * * * *"
# GitLab Geo file download dispatch worker
# NOTE: This will only take effect if Geo is enabled (secondary nodes only)
geo_file_download_dispatch_worker:
......
......@@ -498,6 +498,9 @@ Gitlab.ee do
Settings.cron_jobs['geo_repository_sync_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_repository_sync_worker']['cron'] ||= '*/1 * * * *'
Settings.cron_jobs['geo_repository_sync_worker']['job_class'] ||= 'Geo::RepositorySyncWorker'
Settings.cron_jobs['geo_secondary_registry_consistency_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_secondary_registry_consistency_worker']['cron'] ||= '* * * * *'
Settings.cron_jobs['geo_secondary_registry_consistency_worker']['job_class'] ||= 'Geo::Secondary::RegistryConsistencyWorker'
Settings.cron_jobs['geo_repository_verification_primary_batch_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_repository_verification_primary_batch_worker']['cron'] ||= '*/1 * * * *'
Settings.cron_jobs['geo_repository_verification_primary_batch_worker']['job_class'] ||= 'Geo::RepositoryVerification::Primary::BatchWorker'
......
......@@ -78,8 +78,8 @@ module Geo
raise NotImplementedError
end
def local_storage_only?
!current_node&.sync_object_storage
def local_storage_only?(fdw: true)
!current_node(fdw: fdw)&.sync_object_storage
end
end
end
......@@ -31,6 +31,60 @@ module Geo
LfsObject
end
# Returns untracked IDs as well as tracked IDs that are unused.
#
# Untracked IDs are model IDs that are supposed to be synced but don't yet
# have a registry entry.
#
# Unused tracked IDs are model IDs that are not supposed to be synced but
# already have a registry entry. For example:
#
# - orphaned registries
# - records that became excluded from selective sync
# - records that are in object storage, and `sync_object_storage` became
# disabled
#
# We compute both sets in this method to reduce the number of DB queries
# performed.
#
# @return [Array] the first element is an Array of untracked IDs, and the second element is an Array of tracked IDs that are unused
def find_registry_differences(range)
source_ids = lfs_objects(fdw: false).where(id: range).pluck_primary_key # rubocop:disable CodeReuse/ActiveRecord
tracked_ids = Geo::LfsObjectRegistry.pluck_model_ids_in_range(range)
untracked_ids = source_ids - tracked_ids
unused_tracked_ids = tracked_ids - source_ids
[untracked_ids, unused_tracked_ids]
end
# Returns LfsObjectRegistry records that have never been synced.
#
# Does not care about selective sync, because it considers the Registry
# table to be the single source of truth. The contract is that other
# processes need to ensure that the table only contains records that should
# be synced.
#
# Any registries that have ever been synced that currently need to be
# resynced will be handled by other find methods (like
# #find_retryable_failed_registries)
#
# You can pass a list with `except_ids:` so you can exclude items you
# already scheduled but haven't finished and aren't persisted to the database yet
#
# @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_ids ids that will be ignored from the query
# rubocop:disable CodeReuse/ActiveRecord
def find_never_synced_registries(batch_size:, except_ids: [])
Geo::LfsObjectRegistry.never
.model_id_not_in(except_ids)
.limit(batch_size)
end
# rubocop:enable CodeReuse/ActiveRecord
# Deprecated in favor of the process using
# #find_missing_registry_ids and #find_never_synced_registries
#
# Find limited amount of non replicated lfs objects.
#
# You can pass a list with `except_ids:` so you can exclude items you
......@@ -62,7 +116,7 @@ module Geo
registries_for_lfs_objects
.merge(Geo::LfsObjectRegistry.failed)
.merge(Geo::LfsObjectRegistry.retry_due)
.lfs_object_id_not_in(except_ids)
.model_id_not_in(except_ids)
.limit(batch_size)
end
# rubocop:enable CodeReuse/ActiveRecord
......@@ -73,19 +127,19 @@ module Geo
.synced
.missing_on_primary
.retry_due
.lfs_object_id_not_in(except_ids)
.model_id_not_in(except_ids)
.limit(batch_size)
end
# rubocop:enable CodeReuse/ActiveRecord
private
def lfs_objects
local_storage_only? ? all_lfs_objects.with_files_stored_locally : all_lfs_objects
def lfs_objects(fdw: true)
local_storage_only?(fdw: fdw) ? all_lfs_objects(fdw: fdw).with_files_stored_locally : all_lfs_objects(fdw: fdw)
end
def all_lfs_objects
current_node.lfs_objects
def all_lfs_objects(fdw: true)
current_node(fdw: fdw).lfs_objects
end
def registries_for_lfs_objects
......
......@@ -14,10 +14,20 @@ module Geo
private
def current_node
strong_memoize(:current_node) do
def current_node(fdw: true)
fdw ? current_node_fdw : current_node_non_fdw
end
def current_node_fdw
strong_memoize(:current_node_fdw) do
Geo::Fdw::GeoNode.find(current_node_id) if current_node_id
end
end
def current_node_non_fdw
strong_memoize(:current_node_non_fdw) do
GeoNode.find(current_node_id) if current_node_id
end
end
end
end
......@@ -2,4 +2,25 @@
class Geo::BaseRegistry < Geo::TrackingBase
self.abstract_class = true
def self.pluck_model_ids_in_range(range)
where(self::MODEL_FOREIGN_KEY => range).pluck(self::MODEL_FOREIGN_KEY)
end
def self.model_id_in(ids)
where(self::MODEL_FOREIGN_KEY => ids)
end
def self.model_id_not_in(ids)
where.not(self::MODEL_FOREIGN_KEY => ids)
end
# TODO: Investigate replacing this with bulk insert (there was an obstacle).
# https://gitlab.com/gitlab-org/gitlab/issues/197310
def self.insert_for_model_ids(ids)
ids.map do |id|
registry = create(self::MODEL_FOREIGN_KEY => id)
registry.id
end.compact
end
end
......@@ -4,15 +4,35 @@ class Geo::LfsObjectRegistry < Geo::BaseRegistry
include ::ShaAttribute
include ::Geo::Syncable
MODEL_CLASS = ::LfsObject
MODEL_FOREIGN_KEY = :lfs_object_id
sha_attribute :sha256
belongs_to :lfs_object, class_name: 'LfsObject'
def self.lfs_object_id_in(ids)
where(lfs_object_id: ids)
scope :never, -> { where(success: false, retry_count: nil) }
def self.failed
if registry_consistency_worker_enabled?
where(success: false).where.not(retry_count: nil)
else
# Would do `super` except it doesn't work with an included scope
where(success: false)
end
end
def self.registry_consistency_worker_enabled?
Feature.enabled?(:geo_lfs_registry_ssot_sync)
end
def self.finder_class
::Geo::LfsObjectRegistryFinder
end
def self.lfs_object_id_not_in(ids)
where.not(lfs_object_id: ids)
# If false, RegistryConsistencyService will frequently check the end of the
# table to quickly handle new replicables.
def self.has_create_events?
false
end
end
......@@ -233,7 +233,20 @@ class GeoNode < ApplicationRecord
def lfs_objects
return LfsObject.all unless selective_sync?
LfsObject.project_id_in(projects)
query = LfsObjectsProject.project_id_in(projects).select(:lfs_object_id)
cte = Gitlab::SQL::CTE.new(:restricted_lfs_objects, query)
lfs_object_table = LfsObject.arel_table
inner_join_restricted_lfs_objects =
cte.table
.join(lfs_object_table, Arel::Nodes::InnerJoin)
.on(cte.table[:lfs_object_id].eq(lfs_object_table[:id]))
.join_sources
LfsObject
.with(cte.to_arel)
.from(cte.table)
.joins(inner_join_restricted_lfs_objects)
end
def projects
......
# frozen_string_literal: true
module Geo
# Accepts a registry class, queries the next batch of replicable records, and
# creates any missing registries.
class RegistryConsistencyService
include ::Gitlab::Geo::LogHelpers
attr_reader :registry_class, :model_class, :batch_size
def initialize(registry_class, batch_size:)
@registry_class = registry_class
@model_class = registry_class::MODEL_CLASS
@batch_size = batch_size
end
def execute
range = next_range!
return unless range
created_in_range = create_missing_in_range(range)
created_above = create_missing_above(end_of_batch: range.last)
created_in_range.any? ||
created_above.any?
rescue => e
log_error("Error while backfilling #{registry_class}", e)
raise
end
private
# @return [Range] the next range of a batch of records
def next_range!
Gitlab::LoopingBatcher.new(model_class, key: batcher_key, batch_size: batch_size).next_range!
end
def batcher_key
"registry_consistency:#{registry_class.name.parameterize}"
end
# @return [Array] the list of IDs of created records
def create_missing_in_range(range)
untracked, _ = find_registry_differences(range)
return [] if untracked.empty?
created = registry_class.insert_for_model_ids(untracked)
log_created(range, untracked, created)
created
end
def find_registry_differences(range)
finder.find_registry_differences(range)
end
def finder
@finder ||= registry_class.finder_class.new(current_node_id: Gitlab::Geo.current_node.id)
end
# This hack is used to sync new files soon after they are created.
#
# This is not needed for replicables that have already implemented
# create events.
#
# @param [Integer] the last ID of the batch processed in create_missing_in_range
# @return [Array] the list of IDs of created records
def create_missing_above(end_of_batch:)
return [] if registry_class.has_create_events?
last_id = model_class.last.id
# When the LoopingBatcher will soon approach the end of the table, it
# finds the records at the end of the table anyway, so conserve resources.
return [] if batch_close_to_the_end?(end_of_batch, last_id)
# Try to call this service often enough that batch_size is greater than
# the number of recently created records since last call.
start = last_id - batch_size + 1
finish = last_id
create_missing_in_range(start..finish)
end
# Returns true when LoopingBatcher will soon return ranges near the end of
# the table.
#
# @return [Boolean] true if the end_of_batch ID is near the end of the table
def batch_close_to_the_end?(end_of_batch, last_id)
last_id < end_of_batch + 5 * batch_size
end
def log_created(range, untracked, created)
log_info(
"Created registry entries",
{
registry_class: registry_class.name,
start: range.first,
finish: range.last,
created: created.size,
failed_to_create: untracked.size - created.size
}
)
end
end
end
......@@ -93,6 +93,12 @@
:latency_sensitive:
:resource_boundary: :unknown
:weight: 1
- :name: cronjob:geo_secondary_registry_consistency
:feature_category: :geo_replication
:has_external_dependencies:
:latency_sensitive:
:resource_boundary: :unknown
:weight: 1
- :name: cronjob:geo_sidekiq_cron_config
:feature_category: :geo_replication
:has_external_dependencies:
......
......@@ -10,6 +10,16 @@ module Geo
def registry_finder
@registry_finder ||= Geo::LfsObjectRegistryFinder.new(current_node_id: Gitlab::Geo.current_node.id)
end
def find_unsynced_jobs(batch_size:)
if Feature.enabled?(:geo_lfs_registry_ssot_sync)
convert_registry_relation_to_job_args(
registry_finder.find_never_synced_registries(find_batch_params(batch_size))
)
else
super
end
end
end
end
end
# frozen_string_literal: true
module Geo
module Secondary
# Iterates over syncable records and creates the corresponding registry
# records which are missing. Then, the workers that actually schedule the
# sync work only have to query the registry table for never-synced records.
class RegistryConsistencyWorker
include ApplicationWorker
prepend Reenqueuer
include ::Gitlab::Geo::LogHelpers
# There is no relevant user/project/namespace/caller context for this worker
include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
feature_category :geo_replication
# This is probably not the best place to "register" replicables for this functionality
REGISTRY_CLASSES = [Geo::LfsObjectRegistry].freeze
BATCH_SIZE = 1000
# @return [Boolean] true if at least 1 registry was created, else false
def perform
return false unless registry_classes.any? # May as well remove this check after one registry no longer feature flags this
return false unless Gitlab::Geo.secondary?
backfill
rescue => e
log_error("Error while backfilling all", e)
raise
end
def lease_timeout
[registry_classes.size, 1].max * 1.minute
end
private
def backfill
log_info("Backfill registries", registry_classes: registry_classes.map(&:to_s), batch_size: BATCH_SIZE)
registry_classes.any? do |registry_class|
Geo::RegistryConsistencyService.new(registry_class, batch_size: BATCH_SIZE).execute
end
end
def registry_classes
@registry_classes = REGISTRY_CLASSES.select(&:registry_consistency_worker_enabled?)
end
end
end
end
......@@ -19,6 +19,7 @@ module Gitlab
geo_repository_sync_worker
geo_container_repository_sync_worker
geo_repository_verification_secondary_scheduler_worker
geo_secondary_registry_consistency_worker
].freeze
GEO_JOBS = (COMMON_JOBS + PRIMARY_JOBS + SECONDARY_JOBS).freeze
......
......@@ -17,7 +17,7 @@ module Gitlab
def for_lfs_objects(ids)
query
.joins(fdw_inner_join_lfs_objects)
.lfs_object_id_in(ids)
.model_id_in(ids)
end
# rubocop:enable CodeReuse/ActiveRecord
......
......@@ -10,6 +10,11 @@ FactoryBot.define do
retry_count { 1 }
end
trait :never_synced do
success { false }
retry_count { nil }
end
trait :with_lfs_object do
after(:build, :stub) do |registry, _|
lfs_object = create(:lfs_object)
......
......@@ -263,6 +263,188 @@ describe Geo::LfsObjectRegistryFinder, :geo_fdw do
end
context 'finds all the things' do
describe '#find_registry_differences' do
context 'untracked IDs' do
before do
create(:geo_lfs_object_registry, lfs_object_id: lfs_object_1.id)
create(:geo_lfs_object_registry, :failed, lfs_object_id: lfs_object_3.id)
create(:geo_lfs_object_registry, lfs_object_id: lfs_object_4.id)
allow_any_instance_of(LfsObjectsProject).to receive(:update_project_statistics).and_return(nil)
create(:lfs_objects_project, project: synced_project, lfs_object: lfs_object_1)
create(:lfs_objects_project, project: synced_project_in_nested_group, lfs_object: lfs_object_2)
create(:lfs_objects_project, project: synced_project_in_nested_group, lfs_object: lfs_object_3)
create(:lfs_objects_project, project: unsynced_project, lfs_object: lfs_object_4)
create(:lfs_objects_project, project: project_broken_storage, lfs_object: lfs_object_5)
end
it 'includes LFS object IDs without an entry on the tracking database' do
untracked_ids, _ = subject.find_registry_differences(LfsObject.first.id..LfsObject.last.id)
expect(untracked_ids).to match_array(
[lfs_object_2.id, lfs_object_5.id, lfs_object_remote_1.id,
lfs_object_remote_2.id, lfs_object_remote_3.id])
end
it 'excludes LFS objects outside the ID range' do
untracked_ids, _ = subject.find_registry_differences(lfs_object_3.id..lfs_object_remote_2.id)
expect(untracked_ids).to match_array(
[lfs_object_5.id, lfs_object_remote_1.id,
lfs_object_remote_2.id])
end
context 'with selective sync by namespace' do
let(:secondary) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) }
it 'excludes LFS object IDs that are not in selectively synced projects' do
untracked_ids, _ = subject.find_registry_differences(LfsObject.first.id..LfsObject.last.id)
expect(untracked_ids).to match_array([lfs_object_2.id])
end
end
context 'with selective sync by shard' do
let(:secondary) { create(:geo_node, selective_sync_type: 'shards', selective_sync_shards: ['broken']) }
it 'excludes LFS object IDs that are not in selectively synced projects' do
untracked_ids, _ = subject.find_registry_differences(LfsObject.first.id..LfsObject.last.id)
expect(untracked_ids).to match_array([lfs_object_5.id])
end
end
context 'with object storage sync disabled' do
let(:secondary) { create(:geo_node, :local_storage_only) }
it 'excludes LFS objects in object storage' do
untracked_ids, _ = subject.find_registry_differences(LfsObject.first.id..LfsObject.last.id)
expect(untracked_ids).to match_array([lfs_object_2.id, lfs_object_5.id])
end
end
end
context 'unused tracked IDs' do
context 'with an orphaned registry' do
let!(:orphaned) { create(:geo_lfs_object_registry, lfs_object_id: 1234567) }
it 'includes tracked IDs that do not exist in the model table' do
range = 1234567..1234567
_, unused_tracked_ids = subject.find_registry_differences(range)
expect(unused_tracked_ids).to match_array([1234567])
end
it 'excludes IDs outside the ID range' do
range = 1..1000
_, unused_tracked_ids = subject.find_registry_differences(range)
expect(unused_tracked_ids).to be_empty
end
end
context 'with selective sync by namespace' do
let(:secondary) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) }
context 'with a tracked LFS object' do
let!(:registry_entry) { create(:geo_lfs_object_registry, lfs_object_id: lfs_object_1.id) }
let(:range) { lfs_object_1.id..lfs_object_1.id }
context 'excluded from selective sync' do
it 'includes tracked LFS object IDs that exist but are not in a selectively synced project' do
_, unused_tracked_ids = subject.find_registry_differences(range)
expect(unused_tracked_ids).to match_array([lfs_object_1.id])
end
end
context 'included in selective sync' do
let!(:join_record) { create(:lfs_objects_project, project: synced_project, lfs_object: lfs_object_1) }
it 'excludes tracked LFS object IDs that are in selectively synced projects' do
_, unused_tracked_ids = subject.find_registry_differences(range)
expect(unused_tracked_ids).to be_empty
end
end
end
end
context 'with selective sync by shard' do
let(:secondary) { create(:geo_node, selective_sync_type: 'shards', selective_sync_shards: ['broken']) }
context 'with a tracked LFS object' do
let!(:registry_entry) { create(:geo_lfs_object_registry, lfs_object_id: lfs_object_1.id) }
let(:range) { lfs_object_1.id..lfs_object_1.id }
context 'excluded from selective sync' do
it 'includes tracked LFS object IDs that exist but are not in a selectively synced project' do
_, unused_tracked_ids = subject.find_registry_differences(range)
expect(unused_tracked_ids).to match_array([lfs_object_1.id])
end
end
context 'included in selective sync' do
let!(:join_record) { create(:lfs_objects_project, project: project_broken_storage, lfs_object: lfs_object_1) }
it 'excludes tracked LFS object IDs that are in selectively synced projects' do
_, unused_tracked_ids = subject.find_registry_differences(range)
expect(unused_tracked_ids).to be_empty
end
end
end
end
context 'with object storage sync disabled' do
let(:secondary) { create(:geo_node, :local_storage_only) }
context 'with a tracked LFS object' do
context 'in object storage' do
it 'includes tracked LFS object IDs that are in object storage' do
create(:geo_lfs_object_registry, lfs_object_id: lfs_object_remote_1.id)
range = lfs_object_remote_1.id..lfs_object_remote_1.id
_, unused_tracked_ids = subject.find_registry_differences(range)
expect(unused_tracked_ids).to match_array([lfs_object_remote_1.id])
end
end
context 'not in object storage' do
it 'excludes tracked LFS object IDs that are not in object storage' do
create(:geo_lfs_object_registry, lfs_object_id: lfs_object_1.id)
range = lfs_object_1.id..lfs_object_1.id
_, unused_tracked_ids = subject.find_registry_differences(range)
expect(unused_tracked_ids).to be_empty
end
end
end
end
end
end
describe '#find_never_synced_registries' do
let!(:registry_lfs_object_1) { create(:geo_lfs_object_registry, :never_synced, lfs_object_id: lfs_object_1.id) }
let!(:registry_lfs_object_2) { create(:geo_lfs_object_registry, :never_synced, lfs_object_id: lfs_object_2.id) }
let!(:registry_lfs_object_3) { create(:geo_lfs_object_registry, lfs_object_id: lfs_object_3.id) }
let!(:registry_lfs_object_4) { create(:geo_lfs_object_registry, :failed, lfs_object_id: lfs_object_4.id) }
let!(:registry_lfs_object_remote_1) { create(:geo_lfs_object_registry, :never_synced, lfs_object_id: lfs_object_remote_1.id) }
it 'returns registries for LFS objects that have never been synced' do
registries = subject.find_never_synced_registries(batch_size: 10)
expect(registries).to match_ids(registry_lfs_object_1, registry_lfs_object_2, registry_lfs_object_remote_1)
end
end
describe '#find_unsynced' do
before do
create(:geo_lfs_object_registry, lfs_object_id: lfs_object_1.id)
......
# frozen_string_literal: true
require 'spec_helper'
describe Geo::RegistryConsistencyService, :geo_fdw, :use_clean_rails_memory_store_caching do
include EE::GeoHelpers
let(:secondary) { create(:geo_node) }
subject { described_class.new(registry_class, batch_size: batch_size) }
before do
stub_current_geo_node(secondary)
end
::Geo::Secondary::RegistryConsistencyWorker::REGISTRY_CLASSES.each do |klass|
let(:registry_class) { klass }
let(:model_class) { registry_class::MODEL_CLASS }
let(:batch_size) { 2 }
describe 'registry_class interface' do
it 'defines a MODEL_CLASS constant' do
expect(registry_class::MODEL_CLASS).not_to be_nil
end
it 'responds to .name' do
expect(registry_class).to respond_to(:name)
end
it 'responds to .insert_for_model_ids' do
expect(registry_class).to respond_to(:insert_for_model_ids)
end
it 'responds to .finder_class' do
expect(registry_class).to respond_to(:finder_class)
end
it 'responds to .has_create_events?' do
expect(registry_class).to respond_to(:has_create_events?)
end
end
describe '#execute' do
context 'when there are replicable records missing registries' do
let!(:expected_batch) { create_list(model_class.underscore.to_sym, batch_size) }
it 'creates missing registries' do
expect do
subject.execute
end.to change { registry_class.model_id_in(expected_batch).count }.by(batch_size)
end
it 'returns truthy' do
expect(subject.execute).to be_truthy
end
it 'does not exceed batch size' do
not_expected = create(model_class.underscore.to_sym)
subject.execute
expect(registry_class.model_id_in(not_expected)).to be_none
end
# Temporarily, until we implement create events for these replicables
context 'when the number of records is greater than 6 batches' do
let!(:five_batches_worth) { create_list(model_class.underscore.to_sym, 5 * batch_size) }
context 'when the previous batch is greater than 5 batches from the end of the table' do
context 'when create events are implemented for this replicable' do
before do
expect(registry_class).to receive(:has_create_events?).and_return(true)
end
it 'does not create missing registries in a batch at the end of the table' do
expected = expected_batch
expect do
subject.execute
end.to change { registry_class.count }.by(batch_size)
expect(registry_class.model_id_in(expected).count).to eq(2)
end
it 'calls #create_missing_in_range only once' do
expect(subject).to receive(:create_missing_in_range).once.and_call_original
subject.execute
end
end
context 'when create events are not yet implemented for this replicable' do
before do
expect(registry_class).to receive(:has_create_events?).and_return(false)
end
it 'creates missing registries in a batch at the end of the table' do
expected = expected_batch + five_batches_worth.last(batch_size)
expect do
subject.execute
end.to change { registry_class.count }.by(batch_size * 2)
expect(registry_class.model_id_in(expected).count).to eq(4)
end
it 'calls #create_missing_in_range twice' do
expect(subject).to receive(:create_missing_in_range).twice.and_call_original
subject.execute
end
end
end
context 'when the previous batch is less than 5 batches from the end of the table' do
before do
# Do one batch
subject.execute
end
it 'does not create registries in a batch at the end of the table' do
expect do
subject.execute
end.to change { registry_class.count }.by(batch_size)
end
it 'calls #create_missing_in_range once' do
expect(subject).to receive(:create_missing_in_range).once.and_call_original
subject.execute
end
end
end
context 'when the number of records is less than 6 batches' do
it 'calls #create_missing_in_range once' do
expect(subject).to receive(:create_missing_in_range).once.and_call_original
subject.execute
end
end
end
context 'when all replicable records have registries' do
it 'does nothing' do
create_list(model_class.underscore.to_sym, batch_size)
subject.execute # create the missing registries
expect do
subject.execute
end.not_to change { registry_class.count }
end
it 'returns falsey' do
create_list(model_class.underscore.to_sym, batch_size)
subject.execute # create the missing registries
expect(subject.execute).to be_falsey
end
end
context 'when there are no replicable records' do
it 'does nothing' do
expect do
subject.execute
end.not_to change { registry_class.count }
end
it 'returns falsey' do
expect(subject.execute).to be_falsey
end
end
end
end
end
......@@ -16,6 +16,7 @@ module EE
def stub_primary_node
allow(::Gitlab::Geo).to receive(:primary?).and_return(true)
allow(::Gitlab::Geo).to receive(:secondary?).and_return(false)
end
def stub_secondary_node
......
......@@ -45,6 +45,8 @@ describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw do
it 'does not schedule duplicated jobs' do
lfs_object_1 = create(:lfs_object, :with_file)
lfs_object_2 = create(:lfs_object, :with_file)
create(:geo_lfs_object_registry, :never_synced, lfs_object: lfs_object_1)
create(:geo_lfs_object_registry, :failed, lfs_object: lfs_object_2)
stub_const('Geo::Scheduler::SchedulerWorker::DB_RETRIEVE_BATCH_SIZE', 5)
secondary.update!(files_max_capacity: 2)
......@@ -170,37 +172,123 @@ describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw do
stub_lfs_object_storage
end
context 'with files missing on the primary that are marked as synced' do
let!(:lfs_object_file_missing_on_primary) { create(:lfs_object, :with_file) }
context 'with geo_lfs_registry_ssot_sync feature enabled' do
context 'with files missing on the primary' do
let!(:lfs_object_file_missing_on_primary) { create(:lfs_object, :with_file) }
context 'with lfs_object_registry entries' do
before do
create(:geo_lfs_object_registry, :never_synced, lfs_object: lfs_object_local_store)
create(:geo_lfs_object_registry, :failed, lfs_object: lfs_object_remote_store)
Geo::LfsObjectRegistry.create!(lfs_object_id: lfs_object_file_missing_on_primary.id, bytes: 1234, success: true, missing_on_primary: true)
end
it 'enqueues file downloads if there is spare capacity' do
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_local_store.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_file_missing_on_primary.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_remote_store.id)
subject.perform
end
it 'does not retry those files if there is no spare capacity' do
expect(subject).to receive(:db_retrieve_batch_size).and_return(1).twice
expect(Geo::FileDownloadWorker).to receive(:perform_async).once
subject.perform
end
it 'does not retry those files if they are already scheduled' do
scheduled_jobs = [{ type: 'lfs', id: lfs_object_file_missing_on_primary.id, job_id: 'foo' }]
expect(subject).to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(1)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_local_store.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_remote_store.id)
subject.perform
end
end
context 'with no lfs_object_registry entries' do
it 'does not enqueue file downloads' do
expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
subject.perform
end
end
end
end
context 'with geo_lfs_registry_ssot_sync feature disabled' do
before do
Geo::LfsObjectRegistry.create!(lfs_object_id: lfs_object_file_missing_on_primary.id, bytes: 1234, success: true, missing_on_primary: true)
stub_feature_flags(geo_lfs_registry_ssot_sync: false)
end
it 'retries the files if there is spare capacity' do
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_local_store.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_file_missing_on_primary.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_remote_store.id)
context 'with files missing on the primary' do
let!(:lfs_object_file_missing_on_primary) { create(:lfs_object, :with_file) }
subject.perform
end
context 'with lfs_object_registry entries' do
before do
create(:geo_lfs_object_registry, :never_synced, lfs_object: lfs_object_local_store)
create(:geo_lfs_object_registry, :failed, lfs_object: lfs_object_remote_store)
Geo::LfsObjectRegistry.create!(lfs_object_id: lfs_object_file_missing_on_primary.id, bytes: 1234, success: true, missing_on_primary: true)
end
it 'does not retry those files if there is no spare capacity' do
expect(subject).to receive(:db_retrieve_batch_size).and_return(1).twice
it 'enqueues file downloads if there is spare capacity' do
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_local_store.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_file_missing_on_primary.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_remote_store.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).once
subject.perform
end
subject.perform
end
it 'does not enqueue file downloads if there is no spare capacity' do
expect(subject).to receive(:db_retrieve_batch_size).and_return(1).twice
it 'does not retry those files if they are already scheduled' do
scheduled_jobs = [{ type: 'lfs', id: lfs_object_file_missing_on_primary.id, job_id: 'foo' }]
expect(subject).to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(1)
expect(Geo::FileDownloadWorker).to receive(:perform_async).once
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_local_store.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_remote_store.id)
subject.perform
end
subject.perform
it 'does not enqueue file downloads if they are already scheduled' do
scheduled_jobs = [{ type: 'lfs', id: lfs_object_file_missing_on_primary.id, job_id: 'foo' }]
expect(subject).to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(1)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_local_store.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_remote_store.id)
subject.perform
end
end
context 'with no lfs_object_registry entries' do
it 'enqueues file downloads if there is spare capacity' do
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_local_store.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_file_missing_on_primary.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_remote_store.id)
subject.perform
end
it 'does not enqueue file downloads if there is no spare capacity' do
expect(subject).to receive(:db_retrieve_batch_size).and_return(1).twice
expect(Geo::FileDownloadWorker).to receive(:perform_async).once
subject.perform
end
it 'does not enqueue file downloads if they are already scheduled' do
scheduled_jobs = [{ type: 'lfs', id: lfs_object_file_missing_on_primary.id, job_id: 'foo' }]
expect(subject).to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(1)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_local_store.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_remote_store.id)
subject.perform
end
end
end
end
end
......@@ -332,7 +420,7 @@ describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw do
allow_any_instance_of(::Gitlab::Geo::Replication::BaseTransfer).to receive(:download_from_primary).and_return(result_object)
avatar = fixture_file_upload('spec/fixtures/dk.png')
create_list(:lfs_object, 2, :with_file)
create_list(:geo_lfs_object_registry, 2, :with_lfs_object, :never_synced)
create_list(:user, 2, avatar: avatar)
create_list(:note, 2, :with_attachment)
create(:upload, :personal_snippet_upload)
......@@ -367,14 +455,20 @@ describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw do
allow(::GeoNode).to receive(:current_node).and_return(secondary)
end
it 'does not perform Geo::FileDownloadWorker for LFS object that does not belong to selected namespaces to replicate' do
lfs_object_in_synced_group = create(:lfs_objects_project, project: project_in_synced_group)
create(:lfs_objects_project, project: unsynced_project)
context 'when geo_lfs_registry_ssot_sync feature is disabled' do
before do
stub_feature_flags(geo_lfs_registry_ssot_sync: false)
end
it 'does not perform Geo::FileDownloadWorker for LFS object that does not belong to selected namespaces to replicate' do
lfs_object_in_synced_group = create(:lfs_objects_project, project: project_in_synced_group)
create(:lfs_objects_project, project: unsynced_project)
expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with('lfs', lfs_object_in_synced_group.lfs_object_id).once.and_return(spy)
expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with('lfs', lfs_object_in_synced_group.lfs_object_id).once.and_return(spy)
subject.perform
subject.perform
end
end
it 'does not perform Geo::FileDownloadWorker for job artifact that does not belong to selected namespaces to replicate' do
......
# frozen_string_literal: true
require 'spec_helper'
describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
include EE::GeoHelpers
include ExclusiveLeaseHelpers
let(:primary) { create(:geo_node, :primary) }
let(:secondary) { create(:geo_node) }
before do
stub_current_geo_node(secondary)
end
let(:worker_class) { described_class }
it_behaves_like 'reenqueuer'
it 'uses a cronjob queue' do
expect(subject.sidekiq_options_hash).to include(
'queue' => 'cronjob:geo_secondary_registry_consistency',
'queue_namespace' => :cronjob
)
end
describe '#perform' do
subject { described_class.new }
before do
allow(subject).to receive(:sleep) # faster tests
end
it_behaves_like 'it is rate limited to 1 call per', 5.seconds do
let(:rate_limited_method) { subject.perform }
end
context 'when RegistryConsistencyService#execute returns true at least once' do
before do
described_class::REGISTRY_CLASSES.each_with_index do |registry_class, index|
first_one = index == 0
service = double
expect(Geo::RegistryConsistencyService).to receive(:new).with(registry_class, batch_size: 1000).and_return(service)
expect(service).to receive(:execute).and_return(first_one)
end
end
it 'returns true' do
expect(subject.perform).to be_truthy
end
it 'RegistryConsistencyWorker gets reenqueued' do
expect(Geo::Secondary::RegistryConsistencyWorker).to receive(:perform_async)
subject.perform
end
end
context 'when RegistryConsistencyService#execute returns false for all registry classes' do
before do
described_class::REGISTRY_CLASSES.each do |registry_class|
service = double
expect(Geo::RegistryConsistencyService).to receive(:new).with(registry_class, batch_size: 1000).and_return(service)
expect(service).to receive(:execute).and_return(false)
end
end
it 'returns false' do
expect(subject.perform).to be_falsey
end
it 'RegistryConsistencyWorker does not get reenqueued (we will wait until next cronjob)' do
expect(Geo::Secondary::RegistryConsistencyWorker).not_to receive(:perform_async)
subject.perform
end
end
# Somewhat of an integration test
it 'creates missing registries for each registry class' do
lfs_object = create(:lfs_object)
expect do
subject.perform
end.to change { Geo::LfsObjectRegistry.where(lfs_object_id: lfs_object.id).count }.from(0).to(1)
end
context 'when geo_lfs_registry_ssot_sync is disabled' do
before do
stub_feature_flags(geo_lfs_registry_ssot_sync: false)
end
it 'returns false' do
expect(subject.perform).to be_falsey
end
it 'does not execute RegistryConsistencyService' do
expect(Geo::RegistryConsistencyService).not_to receive(:new)
subject.perform
end
end
context 'when the current Geo node is disabled or primary' do
before do
stub_primary_node
end
it 'returns false' do
expect(subject.perform).to be_falsey
end
it 'does not execute RegistryConsistencyService' do
expect(Geo::RegistryConsistencyService).not_to receive(:new)
subject.perform
end
end
end
end
# frozen_string_literal: true
module Gitlab
# Returns an ID range within a table so it can be iterated over. Repeats from
# the beginning after it reaches the end.
#
# Used by Geo in particular to iterate over a replicable and its registry
# table.
#
# Tracks a cursor for each table, by "key". If the table is smaller than
# batch_size, then a range for the whole table is returned on every call.
class LoopingBatcher
# @param [Class] model_class the class of the table to iterate on
# @param [String] key to identify the cursor. Note, cursor is already unique
# per table.
# @param [Integer] batch_size to limit the number of records in a batch
def initialize(model_class, key:, batch_size: 1000)
@model_class = model_class
@key = key
@batch_size = batch_size
end
# @return [Range] a range of IDs. `nil` if 0 records at or after the cursor.
def next_range!
return unless @model_class.any?
batch_first_id = cursor_id
batch_last_id = get_batch_last_id(batch_first_id)
return unless batch_last_id
batch_first_id..batch_last_id
end
private
# @private
#
# Get the last ID of the batch. Increment the cursor or reset it if at end.
#
# @param [Integer] batch_first_id the first ID of the batch
# @return [Integer] batch_last_id the last ID of the batch (not the table)
def get_batch_last_id(batch_first_id)
batch_last_id, more_rows = run_query(@model_class.table_name, @model_class.primary_key, batch_first_id, @batch_size)
if more_rows
increment_batch(batch_last_id)
else
reset if batch_first_id > 1
end
batch_last_id
end
def run_query(table, primary_key, batch_first_id, batch_size)
sql = <<~SQL
SELECT MAX(batch.id) AS batch_last_id,
EXISTS (
SELECT #{primary_key}
FROM #{table}
WHERE #{primary_key} > MAX(batch.id)
) AS more_rows
FROM (
SELECT #{primary_key}
FROM #{table}
WHERE #{primary_key} >= #{batch_first_id}
ORDER BY #{primary_key}
LIMIT #{batch_size}) AS batch;
SQL
result = ActiveRecord::Base.connection.exec_query(sql).first
[result["batch_last_id"], result["more_rows"]]
end
def reset
set_cursor_id(1)
end
def increment_batch(batch_last_id)
set_cursor_id(batch_last_id + 1)
end
# @private
#
# @return [Integer] the cursor ID, or 1 if it is not set
def cursor_id
Rails.cache.fetch("#{cache_key}:cursor_id") || 1
end
def set_cursor_id(id)
Rails.cache.write("#{cache_key}:cursor_id", id)
end
def cache_key
@cache_key ||= "#{self.class.name.parameterize}:#{@model_class.name.parameterize}:#{@key}:cursor_id"
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::LoopingBatcher, :use_clean_rails_memory_store_caching do
describe '#next_range!' do
let(:model_class) { LfsObject }
let(:key) { 'looping_batcher_spec' }
let(:batch_size) { 2 }
subject { described_class.new(model_class, key: key, batch_size: batch_size).next_range! }
context 'when there are no records' do
it { is_expected.to be_nil }
end
context 'when there are records' do
let!(:records) { create_list(model_class.underscore, 3) }
context 'when it has never been called before' do
it { is_expected.to be_a Range }
it 'starts from the beginning' do
expect(subject.first).to eq(1)
end
it 'ends at a full batch' do
expect(subject.last).to eq(records.second.id)
end
context 'when the batch size is greater than the number of records' do
let(:batch_size) { 5 }
it 'ends at the last ID' do
expect(subject.last).to eq(records.last.id)
end
end
end
context 'when it was called before' do
context 'when the previous batch included the end of the table' do
before do
described_class.new(model_class, key: key, batch_size: model_class.count).next_range!
end
it 'starts from the beginning' do
expect(subject).to eq(1..records.second.id)
end
end
context 'when the previous batch did not include the end of the table' do
before do
described_class.new(model_class, key: key, batch_size: model_class.count - 1).next_range!
end
it 'starts after the previous batch' do
expect(subject).to eq(records.last.id..records.last.id)
end
end
context 'if cache is cleared' do
it 'starts from the beginning' do
Rails.cache.clear
expect(subject).to eq(1..records.second.id)
end
end
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment