Treat registry as SSOT for Job Artifacts

FileDownloadDispatchWorker looks only at the registry table
for things that have never been synced.

In order for the counts to be correct, we have to distinguish
between "never synced" and "failed" with "retry_count" being
"nil" for the former and "not nil" for the latter.

Feature flag is "geo_job_artifact_registry_ssot_sync".
parent 1a7ca4b0
...@@ -31,53 +31,81 @@ module Geo ...@@ -31,53 +31,81 @@ module Geo
Ci::JobArtifact.not_expired Ci::JobArtifact.not_expired
end end
# Returns Geo::JobArtifactRegistry records that have never been synced.
#
# Does not care about selective sync, because it considers the Registry
# table to be the single source of truth. The contract is that other
# processes need to ensure that the table only contains records that should
# be synced.
#
# Any registries that have ever been synced that currently need to be
# resynced will be handled by other find methods (like
# #find_retryable_failed_registries)
#
# You can pass a list with `except_ids:` so you can exclude items you
# already scheduled but haven't finished and aren't persisted to the database yet
#
# @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_ids ids that will be ignored from the query
# rubocop:disable CodeReuse/ActiveRecord
def find_never_synced_registries(batch_size:, except_ids: [])
Geo::JobArtifactRegistry
.never
.model_id_not_in(except_ids)
.limit(batch_size)
end
# rubocop:enable CodeReuse/ActiveRecord
# Deprecated in favor of the process using
# #find_missing_registry_ids and #find_never_synced_registries
#
# Find limited amount of non replicated job artifacts. # Find limited amount of non replicated job artifacts.
# #
# You can pass a list with `except_artifact_ids:` so you can exclude items you # You can pass a list with `except_ids:` so you can exclude items you
# already scheduled but haven't finished and aren't persisted to the database yet # already scheduled but haven't finished and aren't persisted to the database yet
# #
# TODO: Alternative here is to use some sort of window function with a cursor instead # TODO: Alternative here is to use some sort of window function with a cursor instead
# of simply limiting the query and passing a list of items we don't want # of simply limiting the query and passing a list of items we don't want
# #
# @param [Integer] batch_size used to limit the results returned # @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_artifact_ids ids that will be ignored from the query # @param [Array<Integer>] except_ids ids that will be ignored from the query
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def find_unsynced(batch_size:, except_artifact_ids: []) def find_unsynced(batch_size:, except_ids: [])
job_artifacts job_artifacts
.not_expired .not_expired
.missing_job_artifact_registry .missing_job_artifact_registry
.id_not_in(except_artifact_ids) .id_not_in(except_ids)
.limit(batch_size) .limit(batch_size)
end end
# rubocop: enable CodeReuse/ActiveRecord # rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def find_migrated_local(batch_size:, except_artifact_ids: []) def find_migrated_local(batch_size:, except_ids: [])
all_job_artifacts all_job_artifacts
.inner_join_job_artifact_registry .inner_join_job_artifact_registry
.with_files_stored_remotely .with_files_stored_remotely
.id_not_in(except_artifact_ids) .id_not_in(except_ids)
.limit(batch_size) .limit(batch_size)
end end
# rubocop: enable CodeReuse/ActiveRecord # rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def find_retryable_failed_registries(batch_size:, except_artifact_ids: []) def find_retryable_failed_registries(batch_size:, except_ids: [])
Geo::JobArtifactRegistry Geo::JobArtifactRegistry
.failed .failed
.retry_due .retry_due
.artifact_id_not_in(except_artifact_ids) .model_id_not_in(except_ids)
.limit(batch_size) .limit(batch_size)
end end
# rubocop: enable CodeReuse/ActiveRecord # rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def find_retryable_synced_missing_on_primary_registries(batch_size:, except_artifact_ids: []) def find_retryable_synced_missing_on_primary_registries(batch_size:, except_ids: [])
Geo::JobArtifactRegistry Geo::JobArtifactRegistry
.synced .synced
.missing_on_primary .missing_on_primary
.retry_due .retry_due
.artifact_id_not_in(except_artifact_ids) .model_id_not_in(except_ids)
.limit(batch_size) .limit(batch_size)
end end
# rubocop: enable CodeReuse/ActiveRecord # rubocop: enable CodeReuse/ActiveRecord
......
...@@ -3,15 +3,17 @@ ...@@ -3,15 +3,17 @@
class Geo::JobArtifactRegistry < Geo::BaseRegistry class Geo::JobArtifactRegistry < Geo::BaseRegistry
include Geo::Syncable include Geo::Syncable
def self.artifact_id_in(ids) MODEL_CLASS = ::Ci::JobArtifact
where(artifact_id: ids) MODEL_FOREIGN_KEY = :artifact_id
end
def self.artifact_id_not_in(ids) scope :never, -> { where(success: false, retry_count: nil) }
where.not(artifact_id: ids)
end
def self.pluck_artifact_key def self.failed
where(nil).pluck(:artifact_id) if Feature.enabled?(:geo_job_artifact_registry_ssot_sync)
where(success: false).where.not(retry_count: nil)
else
# Would do `super` except it doesn't work with an included scope
where(success: false)
end
end end
end end
...@@ -4,12 +4,22 @@ module Geo ...@@ -4,12 +4,22 @@ module Geo
class FileDownloadDispatchWorker # rubocop:disable Scalability/IdempotentWorker class FileDownloadDispatchWorker # rubocop:disable Scalability/IdempotentWorker
class JobArtifactJobFinder < JobFinder # rubocop:disable Scalability/IdempotentWorker class JobArtifactJobFinder < JobFinder # rubocop:disable Scalability/IdempotentWorker
RESOURCE_ID_KEY = :artifact_id RESOURCE_ID_KEY = :artifact_id
EXCEPT_RESOURCE_IDS_KEY = :except_artifact_ids EXCEPT_RESOURCE_IDS_KEY = :except_ids
FILE_SERVICE_OBJECT_TYPE = :job_artifact FILE_SERVICE_OBJECT_TYPE = :job_artifact
def registry_finder def registry_finder
@registry_finder ||= Geo::JobArtifactRegistryFinder.new(current_node_id: Gitlab::Geo.current_node.id) @registry_finder ||= Geo::JobArtifactRegistryFinder.new(current_node_id: Gitlab::Geo.current_node.id)
end end
def find_unsynced_jobs(batch_size:)
if Feature.enabled?(:geo_job_artifact_registry_ssot_sync)
convert_registry_relation_to_job_args(
registry_finder.find_never_synced_registries(find_batch_params(batch_size))
)
else
super
end
end
end end
end end
end end
...@@ -68,7 +68,7 @@ module Geo ...@@ -68,7 +68,7 @@ module Geo
def find_migrated_local_job_artifacts_ids(batch_size:) def find_migrated_local_job_artifacts_ids(batch_size:)
return [] unless job_artifacts_object_store_enabled? return [] unless job_artifacts_object_store_enabled?
job_artifacts_finder.find_migrated_local(batch_size: batch_size, except_artifact_ids: scheduled_file_ids(:job_artifact)) job_artifacts_finder.find_migrated_local(batch_size: batch_size, except_ids: scheduled_file_ids(:job_artifact))
.pluck(Geo::Fdw::Ci::JobArtifact.arel_table[:id]) .pluck(Geo::Fdw::Ci::JobArtifact.arel_table[:id])
.map { |id| ['job_artifact', id] } .map { |id| ['job_artifact', id] }
end end
......
...@@ -29,7 +29,7 @@ module EE ...@@ -29,7 +29,7 @@ module EE
end end
def lost_and_found_geo_registries def lost_and_found_geo_registries
@lost_and_found_geo_registries ||= ::Geo::JobArtifactRegistry.artifact_id_in(lost_and_found_ids) @lost_and_found_geo_registries ||= ::Geo::JobArtifactRegistry.model_id_in(lost_and_found_ids)
end end
end end
end end
......
...@@ -5,6 +5,16 @@ FactoryBot.define do ...@@ -5,6 +5,16 @@ FactoryBot.define do
sequence(:artifact_id) sequence(:artifact_id)
success { true } success { true }
trait :failed do
success { false }
retry_count { 1 }
end
trait :never_synced do
success { false }
retry_count { nil }
end
trait :with_artifact do trait :with_artifact do
transient do transient do
artifact_type { nil } # e.g. :archive, :metadata, :trace ... artifact_type { nil } # e.g. :archive, :metadata, :trace ...
......
...@@ -107,13 +107,13 @@ describe Geo::JobArtifactRegistryFinder, :geo_fdw do ...@@ -107,13 +107,13 @@ describe Geo::JobArtifactRegistryFinder, :geo_fdw do
describe '#count_failed' do describe '#count_failed' do
before do before do
create(:geo_job_artifact_registry, artifact_id: job_artifact_synced_project.id, success: false) create(:geo_job_artifact_registry, :failed, artifact_id: job_artifact_synced_project.id)
create(:geo_job_artifact_registry, artifact_id: job_artifact_unsynced_project.id) create(:geo_job_artifact_registry, artifact_id: job_artifact_unsynced_project.id)
create(:geo_job_artifact_registry, artifact_id: job_artifact_broken_storage_1.id, success: false) create(:geo_job_artifact_registry, :failed, artifact_id: job_artifact_broken_storage_1.id)
create(:geo_job_artifact_registry, artifact_id: job_artifact_expired_synced_project.id, success: false) create(:geo_job_artifact_registry, :failed, artifact_id: job_artifact_expired_synced_project.id)
create(:geo_job_artifact_registry, artifact_id: job_artifact_expired_broken_storage.id, success: false) create(:geo_job_artifact_registry, :failed, artifact_id: job_artifact_expired_broken_storage.id)
create(:geo_job_artifact_registry, artifact_id: job_artifact_remote_synced_project.id, success: false) create(:geo_job_artifact_registry, :failed, artifact_id: job_artifact_remote_synced_project.id)
create(:geo_job_artifact_registry, artifact_id: job_artifact_remote_broken_storage.id, success: false) create(:geo_job_artifact_registry, :failed, artifact_id: job_artifact_remote_broken_storage.id)
end end
context 'without selective sync' do context 'without selective sync' do
...@@ -229,6 +229,21 @@ describe Geo::JobArtifactRegistryFinder, :geo_fdw do ...@@ -229,6 +229,21 @@ describe Geo::JobArtifactRegistryFinder, :geo_fdw do
end end
context 'finds all the things' do context 'finds all the things' do
describe '#find_never_synced_registries' do
let!(:registry_job_artifact_1) { create(:geo_job_artifact_registry, :never_synced, artifact_id: job_artifact_synced_project.id) }
let!(:registry_job_artifact_2) { create(:geo_job_artifact_registry, :never_synced, artifact_id: job_artifact_unsynced_project.id) }
let!(:registry_job_artifact_3) { create(:geo_job_artifact_registry, artifact_id: job_artifact_broken_storage_1.id) }
let!(:registry_job_artifact_4) { create(:geo_job_artifact_registry, :failed, artifact_id: job_artifact_broken_storage_2.id) }
let!(:registry_job_artifact_remote_1) { create(:geo_job_artifact_registry, :never_synced, artifact_id: job_artifact_remote_synced_project.id) }
it 'returns registries for LFS objects that have never been synced' do
registries = subject.find_never_synced_registries(batch_size: 10)
expect(registries).to match_ids(registry_job_artifact_1, registry_job_artifact_2, registry_job_artifact_remote_1)
end
end
describe '#find_unsynced' do describe '#find_unsynced' do
before do before do
create(:geo_job_artifact_registry, artifact_id: job_artifact_synced_project.id, success: false) create(:geo_job_artifact_registry, artifact_id: job_artifact_synced_project.id, success: false)
...@@ -238,7 +253,7 @@ describe Geo::JobArtifactRegistryFinder, :geo_fdw do ...@@ -238,7 +253,7 @@ describe Geo::JobArtifactRegistryFinder, :geo_fdw do
context 'without selective sync' do context 'without selective sync' do
it 'returns job artifacts without an entry on the tracking database, ignoring expired ones' do it 'returns job artifacts without an entry on the tracking database, ignoring expired ones' do
job_artifacts = subject.find_unsynced(batch_size: 10, except_artifact_ids: [job_artifact_unsynced_project.id]) job_artifacts = subject.find_unsynced(batch_size: 10, except_ids: [job_artifact_unsynced_project.id])
expect(job_artifacts).to match_ids(job_artifact_remote_synced_project, job_artifact_remote_unsynced_project, expect(job_artifacts).to match_ids(job_artifact_remote_synced_project, job_artifact_remote_unsynced_project,
job_artifact_broken_storage_2) job_artifact_broken_storage_2)
...@@ -285,7 +300,7 @@ describe Geo::JobArtifactRegistryFinder, :geo_fdw do ...@@ -285,7 +300,7 @@ describe Geo::JobArtifactRegistryFinder, :geo_fdw do
end end
it 'returns job artifacts excluding ones from the exception list' do it 'returns job artifacts excluding ones from the exception list' do
job_artifacts = subject.find_migrated_local(batch_size: 10, except_artifact_ids: [job_artifact_remote_synced_project.id]) job_artifacts = subject.find_migrated_local(batch_size: 10, except_ids: [job_artifact_remote_synced_project.id])
expect(job_artifacts).to match_ids(job_artifact_remote_unsynced_project, job_artifact_remote_broken_storage) expect(job_artifacts).to match_ids(job_artifact_remote_unsynced_project, job_artifact_remote_broken_storage)
end end
...@@ -321,7 +336,7 @@ describe Geo::JobArtifactRegistryFinder, :geo_fdw do ...@@ -321,7 +336,7 @@ describe Geo::JobArtifactRegistryFinder, :geo_fdw do
let(:secondary) { create(:geo_node, :local_storage_only) } let(:secondary) { create(:geo_node, :local_storage_only) }
it 'returns job artifacts excluding ones from the exception list' do it 'returns job artifacts excluding ones from the exception list' do
job_artifacts = subject.find_migrated_local(batch_size: 10, except_artifact_ids: [job_artifact_remote_synced_project.id]) job_artifacts = subject.find_migrated_local(batch_size: 10, except_ids: [job_artifact_remote_synced_project.id])
expect(job_artifacts).to match_ids(job_artifact_remote_unsynced_project, job_artifact_remote_broken_storage) expect(job_artifacts).to match_ids(job_artifact_remote_unsynced_project, job_artifact_remote_broken_storage)
end end
......
...@@ -342,6 +342,11 @@ describe GeoNodeStatus, :geo, :geo_fdw do ...@@ -342,6 +342,11 @@ describe GeoNodeStatus, :geo, :geo_fdw do
end end
describe '#job_artifacts_failed_count' do describe '#job_artifacts_failed_count' do
context 'when geo_job_artifact_registry_ssot_sync is disabled' do
before do
stub_feature_flags(geo_job_artifact_registry_ssot_sync: false)
end
it 'counts failed job artifacts' do it 'counts failed job artifacts' do
# These should be ignored # These should be ignored
create(:geo_upload_registry, :failed) create(:geo_upload_registry, :failed)
...@@ -355,6 +360,25 @@ describe GeoNodeStatus, :geo, :geo_fdw do ...@@ -355,6 +360,25 @@ describe GeoNodeStatus, :geo, :geo_fdw do
end end
end end
context 'when geo_job_artifact_registry_ssot_sync is enabled' do
before do
stub_feature_flags(geo_job_artifact_registry_ssot_sync: true)
end
it 'counts failed job artifacts' do
# These should be ignored
create(:geo_upload_registry, :failed)
create(:geo_upload_registry, :avatar, :failed)
create(:geo_upload_registry, :attachment, :failed)
create(:geo_job_artifact_registry, :with_artifact, success: true)
create(:geo_job_artifact_registry, :with_artifact, :failed)
expect(subject.job_artifacts_failed_count).to eq(1)
end
end
end
describe '#job_artifacts_synced_in_percentage' do describe '#job_artifacts_synced_in_percentage' do
context 'when artifacts are available' do context 'when artifacts are available' do
before do before do
......
...@@ -294,6 +294,120 @@ describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw do ...@@ -294,6 +294,120 @@ describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw do
end end
context 'with job artifacts' do context 'with job artifacts' do
context 'with geo_job_artifact_registry_ssot_sync feature enabled' do
before do
stub_feature_flags(geo_job_artifact_registry_ssot_sync: true)
end
it 'performs Geo::FileDownloadWorker for unsynced job artifacts' do
registry = create(:geo_job_artifact_registry, :with_artifact, :never_synced)
expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with('job_artifact', registry.artifact_id).once.and_return(spy)
subject.perform
end
it 'performs Geo::FileDownloadWorker for failed-sync job artifacts' do
registry = create(:geo_job_artifact_registry, :with_artifact, :never_synced)
expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with('job_artifact', registry.artifact_id).once.and_return(spy)
subject.perform
end
it 'does not perform Geo::FileDownloadWorker for synced job artifacts' do
registry = create(:geo_job_artifact_registry, :with_artifact, bytes: 1234, success: true)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
.with('job_artifact', registry.artifact_id)
subject.perform
end
it 'does not perform Geo::FileDownloadWorker for synced job artifacts even with 0 bytes downloaded' do
registry = create(:geo_job_artifact_registry, :with_artifact, bytes: 0, success: true)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
.with('job_artifact', registry.artifact_id)
subject.perform
end
it 'does not retry failed artifacts when retry_at is tomorrow' do
registry = create(:geo_job_artifact_registry, :with_artifact, :failed, retry_at: Date.tomorrow)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
.with('job_artifact', registry.artifact_id)
subject.perform
end
it 'retries failed artifacts when retry_at is in the past' do
registry = create(:geo_job_artifact_registry, :with_artifact, :failed, retry_at: Date.yesterday)
expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with('job_artifact', registry.artifact_id).once.and_return(spy)
subject.perform
end
context 'with files missing on the primary that are marked as synced' do
let!(:artifact_file_missing_on_primary) { create(:ci_job_artifact) }
let!(:artifact_registry) { create(:geo_job_artifact_registry, artifact_id: artifact_file_missing_on_primary.id, bytes: 1234, success: true, missing_on_primary: true) }
it 'retries the files if there is spare capacity' do
registry = create(:geo_job_artifact_registry, :with_artifact, :never_synced)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', registry.artifact_id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', artifact_file_missing_on_primary.id)
subject.perform
end
it 'retries failed files with retry_at in the past' do
artifact_registry.update!(retry_at: Date.yesterday)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', artifact_file_missing_on_primary.id)
subject.perform
end
it 'does not retry files with later retry_at' do
artifact_registry.update!(retry_at: Date.tomorrow)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async).with('job_artifact', artifact_file_missing_on_primary.id)
subject.perform
end
it 'does not retry those files if there is no spare capacity' do
registry = create(:geo_job_artifact_registry, :with_artifact, :never_synced)
expect(subject).to receive(:db_retrieve_batch_size).and_return(1).twice
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', registry.artifact_id)
subject.perform
end
it 'does not retry those files if they are already scheduled' do
registry = create(:geo_job_artifact_registry, :with_artifact, :never_synced)
scheduled_jobs = [{ type: 'job_artifact', id: artifact_file_missing_on_primary.id, job_id: 'foo' }]
expect(subject).to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(1)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', registry.artifact_id)
subject.perform
end
end
end
context 'with geo_job_artifact_registry_ssot_sync feature disabled' do
before do
stub_feature_flags(geo_job_artifact_registry_ssot_sync: false)
end
it 'performs Geo::FileDownloadWorker for unsynced job artifacts' do it 'performs Geo::FileDownloadWorker for unsynced job artifacts' do
artifact = create(:ci_job_artifact) artifact = create(:ci_job_artifact)
...@@ -398,6 +512,7 @@ describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw do ...@@ -398,6 +512,7 @@ describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw do
end end
end end
end end
end
context 'backoff time' do context 'backoff time' do
let(:cache_key) { "#{described_class.name.underscore}:skip" } let(:cache_key) { "#{described_class.name.underscore}:skip" }
...@@ -424,7 +539,7 @@ describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw do ...@@ -424,7 +539,7 @@ describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw do
create_list(:user, 2, avatar: avatar) create_list(:user, 2, avatar: avatar)
create_list(:note, 2, :with_attachment) create_list(:note, 2, :with_attachment)
create(:upload, :personal_snippet_upload) create(:upload, :personal_snippet_upload)
create(:ci_job_artifact) create(:geo_job_artifact_registry, :with_artifact, :never_synced)
create(:appearance, logo: avatar, header_logo: avatar) create(:appearance, logo: avatar, header_logo: avatar)
expect(Geo::FileDownloadWorker).to receive(:perform_async).exactly(10).times.and_call_original expect(Geo::FileDownloadWorker).to receive(:perform_async).exactly(10).times.and_call_original
...@@ -471,6 +586,11 @@ describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw do ...@@ -471,6 +586,11 @@ describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw do
end end
end end
context 'when geo_job_artifact_registry_ssot_sync feature is disabled' do
before do
stub_feature_flags(geo_job_artifact_registry_ssot_sync: false)
end
it 'does not perform Geo::FileDownloadWorker for job artifact that does not belong to selected namespaces to replicate' do it 'does not perform Geo::FileDownloadWorker for job artifact that does not belong to selected namespaces to replicate' do
create(:ci_job_artifact, project: unsynced_project) create(:ci_job_artifact, project: unsynced_project)
job_artifact_in_synced_group = create(:ci_job_artifact, project: project_in_synced_group) job_artifact_in_synced_group = create(:ci_job_artifact, project: project_in_synced_group)
...@@ -480,6 +600,7 @@ describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw do ...@@ -480,6 +600,7 @@ describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw do
subject.perform subject.perform
end end
end
it 'does not perform Geo::FileDownloadWorker for upload objects that do not belong to selected namespaces to replicate' do it 'does not perform Geo::FileDownloadWorker for upload objects that do not belong to selected namespaces to replicate' do
avatar = fixture_file_upload('spec/fixtures/dk.png') avatar = fixture_file_upload('spec/fixtures/dk.png')
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment