Commit e205d28f authored by Ash McKenzie's avatar Ash McKenzie

Merge branch '206946-registry-is-ssot-for-job-artifacts' into 'master'

Geo - Make registry table SSOT for Job Artifacts

See merge request gitlab-org/gitlab!25378
parents 9e3b997b 45f2456f
......@@ -31,65 +31,120 @@ module Geo
Ci::JobArtifact.not_expired
end
# Returns untracked IDs as well as tracked IDs that are unused.
#
# Untracked IDs are model IDs that are supposed to be synced but don't yet
# have a registry entry.
#
# Unused tracked IDs are model IDs that are not supposed to be synced but
# already have a registry entry. For example:
#
# - orphaned registries
# - records that became excluded from selective sync
# - records that are in object storage, and `sync_object_storage` became
# disabled
#
# We compute both sets in this method to reduce the number of DB queries
# performed.
#
# @return [Array] the first element is an Array of untracked IDs, and the second element is an Array of tracked IDs that are unused
def find_registry_differences(range)
source_ids = job_artifacts(fdw: false).where(id: range).pluck(::Ci::JobArtifact.arel_table[:id]) # rubocop:disable CodeReuse/ActiveRecord
tracked_ids = Geo::JobArtifactRegistry.pluck_model_ids_in_range(range)
untracked_ids = source_ids - tracked_ids
unused_tracked_ids = tracked_ids - source_ids
[untracked_ids, unused_tracked_ids]
end
# Returns Geo::JobArtifactRegistry records that have never been synced.
#
# Does not care about selective sync, because it considers the Registry
# table to be the single source of truth. The contract is that other
# processes need to ensure that the table only contains records that should
# be synced.
#
# Any registries that have ever been synced that currently need to be
# resynced will be handled by other find methods (like
# #find_retryable_failed_registries)
#
# You can pass a list with `except_ids:` so you can exclude items you
# already scheduled but haven't finished and aren't persisted to the database yet
#
# @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_ids ids that will be ignored from the query
# rubocop:disable CodeReuse/ActiveRecord
def find_never_synced_registries(batch_size:, except_ids: [])
Geo::JobArtifactRegistry
.never
.model_id_not_in(except_ids)
.limit(batch_size)
end
# rubocop:enable CodeReuse/ActiveRecord
# Deprecated in favor of the process using
# #find_registry_differences and #find_never_synced_registries
#
# Find limited amount of non replicated job artifacts.
#
# You can pass a list with `except_artifact_ids:` so you can exclude items you
# You can pass a list with `except_ids:` so you can exclude items you
# already scheduled but haven't finished and aren't persisted to the database yet
#
# TODO: Alternative here is to use some sort of window function with a cursor instead
# of simply limiting the query and passing a list of items we don't want
#
# @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_artifact_ids ids that will be ignored from the query
# @param [Array<Integer>] except_ids ids that will be ignored from the query
# rubocop: disable CodeReuse/ActiveRecord
def find_unsynced(batch_size:, except_artifact_ids: [])
def find_unsynced(batch_size:, except_ids: [])
job_artifacts
.not_expired
.missing_job_artifact_registry
.id_not_in(except_artifact_ids)
.id_not_in(except_ids)
.limit(batch_size)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def find_migrated_local(batch_size:, except_artifact_ids: [])
def find_migrated_local(batch_size:, except_ids: [])
all_job_artifacts
.inner_join_job_artifact_registry
.with_files_stored_remotely
.id_not_in(except_artifact_ids)
.id_not_in(except_ids)
.limit(batch_size)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def find_retryable_failed_registries(batch_size:, except_artifact_ids: [])
def find_retryable_failed_registries(batch_size:, except_ids: [])
Geo::JobArtifactRegistry
.failed
.retry_due
.artifact_id_not_in(except_artifact_ids)
.model_id_not_in(except_ids)
.limit(batch_size)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def find_retryable_synced_missing_on_primary_registries(batch_size:, except_artifact_ids: [])
def find_retryable_synced_missing_on_primary_registries(batch_size:, except_ids: [])
Geo::JobArtifactRegistry
.synced
.missing_on_primary
.retry_due
.artifact_id_not_in(except_artifact_ids)
.model_id_not_in(except_ids)
.limit(batch_size)
end
# rubocop: enable CodeReuse/ActiveRecord
private
def job_artifacts
local_storage_only? ? all_job_artifacts.with_files_stored_locally : all_job_artifacts
def job_artifacts(fdw: true)
local_storage_only?(fdw: fdw) ? all_job_artifacts(fdw: fdw).with_files_stored_locally : all_job_artifacts(fdw: fdw)
end
def all_job_artifacts
current_node.job_artifacts
def all_job_artifacts(fdw: true)
current_node(fdw: fdw).job_artifacts
end
def registries_for_job_artifacts
......
......@@ -83,7 +83,7 @@ module Geo
# rubocop:enable CodeReuse/ActiveRecord
# Deprecated in favor of the process using
# #find_missing_registry_ids and #find_never_synced_registries
# #find_registry_differences and #find_never_synced_registries
#
# Find limited amount of non replicated lfs objects.
#
......
......@@ -20,7 +20,7 @@ module EE
DAST_REPORT_TYPES = %w[dast].freeze
scope :not_expired, -> { where('expire_at IS NULL OR expire_at > ?', Time.current) }
scope :project_id_in, ->(ids) { joins(:project).merge(::Project.id_in(ids)) }
scope :project_id_in, ->(ids) { where(project_id: ids) }
scope :with_files_stored_remotely, -> { where(file_store: ::JobArtifactUploader::Store::REMOTE) }
scope :security_reports, -> do
......
......@@ -3,15 +3,31 @@
class Geo::JobArtifactRegistry < Geo::BaseRegistry
include Geo::Syncable
def self.artifact_id_in(ids)
where(artifact_id: ids)
MODEL_CLASS = ::Ci::JobArtifact
MODEL_FOREIGN_KEY = :artifact_id
scope :never, -> { where(success: false, retry_count: nil) }
def self.failed
if registry_consistency_worker_enabled?
where(success: false).where.not(retry_count: nil)
else
# Would do `super` except it doesn't work with an included scope
where(success: false)
end
end
def self.registry_consistency_worker_enabled?
Feature.enabled?(:geo_job_artifact_registry_ssot_sync)
end
def self.artifact_id_not_in(ids)
where.not(artifact_id: ids)
def self.finder_class
::Geo::JobArtifactRegistryFinder
end
def self.pluck_artifact_key
where(nil).pluck(:artifact_id)
# When false, RegistryConsistencyService will frequently check the end of the
# table to quickly handle new replicables.
def self.has_create_events?
false
end
end
......@@ -221,7 +221,20 @@ class GeoNode < ApplicationRecord
def job_artifacts
return Ci::JobArtifact.all unless selective_sync?
Ci::JobArtifact.project_id_in(projects)
query = Ci::JobArtifact.project_id_in(projects).select(:id)
cte = Gitlab::SQL::CTE.new(:restricted_job_artifacts, query)
job_artifact_table = Ci::JobArtifact.arel_table
inner_join_restricted_job_artifacts =
cte.table
.join(job_artifact_table, Arel::Nodes::InnerJoin)
.on(cte.table[:id].eq(job_artifact_table[:id]))
.join_sources
Ci::JobArtifact
.with(cte.to_arel)
.from(cte.table)
.joins(inner_join_restricted_job_artifacts)
end
def container_repositories
......
......@@ -4,12 +4,22 @@ module Geo
class FileDownloadDispatchWorker # rubocop:disable Scalability/IdempotentWorker
class JobArtifactJobFinder < JobFinder # rubocop:disable Scalability/IdempotentWorker
RESOURCE_ID_KEY = :artifact_id
EXCEPT_RESOURCE_IDS_KEY = :except_artifact_ids
EXCEPT_RESOURCE_IDS_KEY = :except_ids
FILE_SERVICE_OBJECT_TYPE = :job_artifact
def registry_finder
@registry_finder ||= Geo::JobArtifactRegistryFinder.new(current_node_id: Gitlab::Geo.current_node.id)
end
def find_unsynced_jobs(batch_size:)
if Geo::JobArtifactRegistry.registry_consistency_worker_enabled?
convert_registry_relation_to_job_args(
registry_finder.find_never_synced_registries(find_batch_params(batch_size))
)
else
super
end
end
end
end
end
......@@ -68,7 +68,7 @@ module Geo
def find_migrated_local_job_artifacts_ids(batch_size:)
return [] unless job_artifacts_object_store_enabled?
job_artifacts_finder.find_migrated_local(batch_size: batch_size, except_artifact_ids: scheduled_file_ids(:job_artifact))
job_artifacts_finder.find_migrated_local(batch_size: batch_size, except_ids: scheduled_file_ids(:job_artifact))
.pluck(Geo::Fdw::Ci::JobArtifact.arel_table[:id])
.map { |id| ['job_artifact', id] }
end
......
......@@ -16,7 +16,7 @@ module Geo
feature_category :geo_replication
# This is probably not the best place to "register" replicables for this functionality
REGISTRY_CLASSES = [Geo::LfsObjectRegistry].freeze
REGISTRY_CLASSES = [Geo::JobArtifactRegistry, Geo::LfsObjectRegistry].freeze
BATCH_SIZE = 1000
# @return [Boolean] true if at least 1 registry was created, else false
......@@ -40,13 +40,15 @@ module Geo
def backfill
log_info("Backfill registries", registry_classes: registry_classes.map(&:to_s), batch_size: BATCH_SIZE)
registry_classes.any? do |registry_class|
Geo::RegistryConsistencyService.new(registry_class, batch_size: BATCH_SIZE).execute
registry_classes.map { |registry_class| registry_service(registry_class).execute }.any?
end
def registry_service(registry_class)
Geo::RegistryConsistencyService.new(registry_class, batch_size: BATCH_SIZE)
end
def registry_classes
@registry_classes = REGISTRY_CLASSES.select(&:registry_consistency_worker_enabled?)
@registry_classes ||= REGISTRY_CLASSES.select(&:registry_consistency_worker_enabled?)
end
end
end
......
......@@ -29,7 +29,7 @@ module EE
end
def lost_and_found_geo_registries
@lost_and_found_geo_registries ||= ::Geo::JobArtifactRegistry.artifact_id_in(lost_and_found_ids)
@lost_and_found_geo_registries ||= ::Geo::JobArtifactRegistry.model_id_in(lost_and_found_ids)
end
end
end
......
......@@ -5,6 +5,16 @@ FactoryBot.define do
sequence(:artifact_id)
success { true }
trait :failed do
success { false }
retry_count { 1 }
end
trait :never_synced do
success { false }
retry_count { nil }
end
trait :with_artifact do
transient do
artifact_type { nil } # e.g. :archive, :metadata, :trace ...
......
......@@ -747,22 +747,18 @@ describe GeoNode, :request_store, :geo, type: :model do
describe '#job_artifacts' do
context 'when selective sync is enabled' do
it 'applies project restriction' do
it 'applies a CTE statement' do
node.update!(selective_sync_type: 'namespaces')
expect(Ci::JobArtifact).to receive(:project_id_in).once.and_call_original
node.job_artifacts
expect(node.job_artifacts.to_sql).to match(/WITH .+restricted_job_artifacts/)
end
end
context 'when selective sync is disabled' do
it 'does not apply project restriction' do
it 'doest not apply a CTE statement' do
node.update!(selective_sync_type: nil)
expect(Ci::JobArtifact).not_to receive(:project_id_in)
node.job_artifacts
expect(node.job_artifacts.to_sql).not_to match(/WITH .+restricted_job_artifacts/)
end
end
end
......
......@@ -342,6 +342,11 @@ describe GeoNodeStatus, :geo, :geo_fdw do
end
describe '#job_artifacts_failed_count' do
context 'when geo_job_artifact_registry_ssot_sync is disabled' do
before do
stub_feature_flags(geo_job_artifact_registry_ssot_sync: false)
end
it 'counts failed job artifacts' do
# These should be ignored
create(:geo_upload_registry, :failed)
......@@ -355,6 +360,25 @@ describe GeoNodeStatus, :geo, :geo_fdw do
end
end
context 'when geo_job_artifact_registry_ssot_sync is enabled' do
before do
stub_feature_flags(geo_job_artifact_registry_ssot_sync: true)
end
it 'counts failed job artifacts' do
# These should be ignored
create(:geo_upload_registry, :failed)
create(:geo_upload_registry, :avatar, :failed)
create(:geo_upload_registry, :attachment, :failed)
create(:geo_job_artifact_registry, :with_artifact, success: true)
create(:geo_job_artifact_registry, :with_artifact, :failed)
expect(subject.job_artifacts_failed_count).to eq(1)
end
end
end
describe '#job_artifacts_synced_in_percentage' do
context 'when artifacts are available' do
before do
......
......@@ -294,6 +294,120 @@ describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw do
end
context 'with job artifacts' do
context 'with geo_job_artifact_registry_ssot_sync feature enabled' do
before do
stub_feature_flags(geo_job_artifact_registry_ssot_sync: true)
end
it 'performs Geo::FileDownloadWorker for unsynced job artifacts' do
registry = create(:geo_job_artifact_registry, :with_artifact, :never_synced)
expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with('job_artifact', registry.artifact_id).once.and_return(spy)
subject.perform
end
it 'performs Geo::FileDownloadWorker for failed-sync job artifacts' do
registry = create(:geo_job_artifact_registry, :with_artifact, :never_synced)
expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with('job_artifact', registry.artifact_id).once.and_return(spy)
subject.perform
end
it 'does not perform Geo::FileDownloadWorker for synced job artifacts' do
registry = create(:geo_job_artifact_registry, :with_artifact, bytes: 1234, success: true)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
.with('job_artifact', registry.artifact_id)
subject.perform
end
it 'does not perform Geo::FileDownloadWorker for synced job artifacts even with 0 bytes downloaded' do
registry = create(:geo_job_artifact_registry, :with_artifact, bytes: 0, success: true)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
.with('job_artifact', registry.artifact_id)
subject.perform
end
it 'does not retry failed artifacts when retry_at is tomorrow' do
registry = create(:geo_job_artifact_registry, :with_artifact, :failed, retry_at: Date.tomorrow)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
.with('job_artifact', registry.artifact_id)
subject.perform
end
it 'retries failed artifacts when retry_at is in the past' do
registry = create(:geo_job_artifact_registry, :with_artifact, :failed, retry_at: Date.yesterday)
expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with('job_artifact', registry.artifact_id).once.and_return(spy)
subject.perform
end
context 'with files missing on the primary that are marked as synced' do
let!(:artifact_file_missing_on_primary) { create(:ci_job_artifact) }
let!(:artifact_registry) { create(:geo_job_artifact_registry, artifact_id: artifact_file_missing_on_primary.id, bytes: 1234, success: true, missing_on_primary: true) }
it 'retries the files if there is spare capacity' do
registry = create(:geo_job_artifact_registry, :with_artifact, :never_synced)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', registry.artifact_id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', artifact_file_missing_on_primary.id)
subject.perform
end
it 'retries failed files with retry_at in the past' do
artifact_registry.update!(retry_at: Date.yesterday)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', artifact_file_missing_on_primary.id)
subject.perform
end
it 'does not retry files with later retry_at' do
artifact_registry.update!(retry_at: Date.tomorrow)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async).with('job_artifact', artifact_file_missing_on_primary.id)
subject.perform
end
it 'does not retry those files if there is no spare capacity' do
registry = create(:geo_job_artifact_registry, :with_artifact, :never_synced)
expect(subject).to receive(:db_retrieve_batch_size).and_return(1).twice
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', registry.artifact_id)
subject.perform
end
it 'does not retry those files if they are already scheduled' do
registry = create(:geo_job_artifact_registry, :with_artifact, :never_synced)
scheduled_jobs = [{ type: 'job_artifact', id: artifact_file_missing_on_primary.id, job_id: 'foo' }]
expect(subject).to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(1)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', registry.artifact_id)
subject.perform
end
end
end
context 'with geo_job_artifact_registry_ssot_sync feature disabled' do
before do
stub_feature_flags(geo_job_artifact_registry_ssot_sync: false)
end
it 'performs Geo::FileDownloadWorker for unsynced job artifacts' do
artifact = create(:ci_job_artifact)
......@@ -398,6 +512,7 @@ describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw do
end
end
end
end
context 'backoff time' do
let(:cache_key) { "#{described_class.name.underscore}:skip" }
......@@ -424,7 +539,7 @@ describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw do
create_list(:user, 2, avatar: avatar)
create_list(:note, 2, :with_attachment)
create(:upload, :personal_snippet_upload)
create(:ci_job_artifact)
create(:geo_job_artifact_registry, :with_artifact, :never_synced)
create(:appearance, logo: avatar, header_logo: avatar)
expect(Geo::FileDownloadWorker).to receive(:perform_async).exactly(10).times.and_call_original
......@@ -471,6 +586,11 @@ describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw do
end
end
context 'when geo_job_artifact_registry_ssot_sync feature is disabled' do
before do
stub_feature_flags(geo_job_artifact_registry_ssot_sync: false)
end
it 'does not perform Geo::FileDownloadWorker for job artifact that does not belong to selected namespaces to replicate' do
create(:ci_job_artifact, project: unsynced_project)
job_artifact_in_synced_group = create(:ci_job_artifact, project: project_in_synced_group)
......@@ -480,6 +600,7 @@ describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw do
subject.perform
end
end
it 'does not perform Geo::FileDownloadWorker for upload objects that do not belong to selected namespaces to replicate' do
avatar = fixture_file_upload('spec/fixtures/dk.png')
......
......@@ -79,13 +79,20 @@ describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
# Somewhat of an integration test
it 'creates missing registries for each registry class' do
lfs_object = create(:lfs_object)
job_artifact = create(:ci_job_artifact)
expect(Geo::LfsObjectRegistry.where(lfs_object_id: lfs_object.id).count).to eq(0)
expect(Geo::JobArtifactRegistry.where(artifact_id: job_artifact.id).count).to eq(0)
expect do
subject.perform
end.to change { Geo::LfsObjectRegistry.where(lfs_object_id: lfs_object.id).count }.from(0).to(1)
expect(Geo::LfsObjectRegistry.where(lfs_object_id: lfs_object.id).count).to eq(1)
expect(Geo::JobArtifactRegistry.where(artifact_id: job_artifact.id).count).to eq(1)
end
context 'when geo_lfs_registry_ssot_sync is disabled' do
let_it_be(:lfs_object) { create(:lfs_object) }
before do
stub_feature_flags(geo_lfs_registry_ssot_sync: false)
end
......@@ -94,8 +101,30 @@ describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
expect(subject.perform).to be_falsey
end
it 'does not execute RegistryConsistencyService' do
expect(Geo::RegistryConsistencyService).not_to receive(:new)
it 'does not execute RegistryConsistencyService for LFS objects' do
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::JobArtifactRegistry, batch_size: 1000).and_call_original
expect(Geo::RegistryConsistencyService).not_to receive(:new).with(Geo::LfsObjectRegistry, batch_size: 1000)
subject.perform
end
end
context 'when geo_job_artifact_registry_ssot_sync is disabled' do
let_it_be(:job_artifact) { create(:ci_job_artifact) }
before do
stub_feature_flags(geo_job_artifact_registry_ssot_sync: false)
end
it 'returns false' do
expect(subject.perform).to be_falsey
end
it 'does not execute RegistryConsistencyService for Job Artifacts' do
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::LfsObjectRegistry, batch_size: 1000).and_call_original
expect(Geo::RegistryConsistencyService).not_to receive(:new).with(Geo::JobArtifactRegistry, batch_size: 1000)
subject.perform
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment