Commit daa1639e authored by Michael Kozono's avatar Michael Kozono

Merge branch...

Merge branch '217477-remove-feature-flags-to-make-registry-table-ssot-for-job-artifacts' into 'master'

Remove feature flag to make registry table SSOT for Job Artifacts

See merge request gitlab-org/gitlab!34590
parents 62f9013b be130b7b
...@@ -2,10 +2,8 @@ ...@@ -2,10 +2,8 @@
module Geo module Geo
class JobArtifactRegistryFinder < FileRegistryFinder class JobArtifactRegistryFinder < FileRegistryFinder
# Counts all existing registries independent
# of any change on filters / selective sync
def count_registry def count_registry
Geo::JobArtifactRegistry.count syncable.count
end end
def count_syncable def count_syncable
...@@ -13,22 +11,19 @@ module Geo ...@@ -13,22 +11,19 @@ module Geo
end end
def count_synced def count_synced
registries_for_job_artifacts.merge(Geo::JobArtifactRegistry.synced).count syncable.synced.count
end end
def count_failed def count_failed
registries_for_job_artifacts.merge(Geo::JobArtifactRegistry.failed).count syncable.failed.count
end end
def count_synced_missing_on_primary def count_synced_missing_on_primary
registries_for_job_artifacts.merge(Geo::JobArtifactRegistry.synced.missing_on_primary).count syncable.synced.missing_on_primary.count
end end
def syncable def syncable
return job_artifacts.not_expired if selective_sync? Geo::JobArtifactRegistry
return Ci::JobArtifact.not_expired.with_files_stored_locally if local_storage_only?
Ci::JobArtifact.not_expired
end end
# Returns untracked IDs as well as tracked IDs that are unused. # Returns untracked IDs as well as tracked IDs that are unused.
...@@ -49,16 +44,8 @@ module Geo ...@@ -49,16 +44,8 @@ module Geo
# #
# @return [Array] the first element is an Array of untracked IDs, and the second element is an Array of tracked IDs that are unused # @return [Array] the first element is an Array of untracked IDs, and the second element is an Array of tracked IDs that are unused
def find_registry_differences(range) def find_registry_differences(range)
# rubocop:disable CodeReuse/ActiveRecord source_ids = job_artifacts.id_in(range).pluck(::Ci::JobArtifact.arel_table[:id]) # rubocop:disable CodeReuse/ActiveRecord
source_ids = tracked_ids = syncable.pluck_model_ids_in_range(range)
job_artifacts(fdw: false)
.id_in(range)
.pluck(::Ci::JobArtifact.arel_table[:id])
# rubocop:enable CodeReuse/ActiveRecord
tracked_ids =
Geo::JobArtifactRegistry
.pluck_model_ids_in_range(range)
untracked_ids = source_ids - tracked_ids untracked_ids = source_ids - tracked_ids
unused_tracked_ids = tracked_ids - source_ids unused_tracked_ids = tracked_ids - source_ids
...@@ -84,49 +71,17 @@ module Geo ...@@ -84,49 +71,17 @@ module Geo
# @param [Array<Integer>] except_ids ids that will be ignored from the query # @param [Array<Integer>] except_ids ids that will be ignored from the query
# rubocop:disable CodeReuse/ActiveRecord # rubocop:disable CodeReuse/ActiveRecord
def find_never_synced_registries(batch_size:, except_ids: []) def find_never_synced_registries(batch_size:, except_ids: [])
Geo::JobArtifactRegistry syncable
.never .never
.model_id_not_in(except_ids) .model_id_not_in(except_ids)
.limit(batch_size) .limit(batch_size)
end end
alias_method :find_unsynced, :find_never_synced_registries
# rubocop:enable CodeReuse/ActiveRecord # rubocop:enable CodeReuse/ActiveRecord
# Deprecated in favor of the process using
# #find_registry_differences and #find_never_synced_registries
#
# Find limited amount of non replicated job artifacts.
#
# You can pass a list with `except_ids:` so you can exclude items you
# already scheduled but haven't finished and aren't persisted to the database yet
#
# TODO: Alternative here is to use some sort of window function with a cursor instead
# of simply limiting the query and passing a list of items we don't want
#
# @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_ids ids that will be ignored from the query
# rubocop: disable CodeReuse/ActiveRecord
def find_unsynced(batch_size:, except_ids: [])
job_artifacts
.not_expired
.missing_job_artifact_registry
.id_not_in(except_ids)
.limit(batch_size)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def find_migrated_local(batch_size:, except_ids: [])
all_job_artifacts
.inner_join_job_artifact_registry
.with_files_stored_remotely
.id_not_in(except_ids)
.limit(batch_size)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def find_retryable_failed_registries(batch_size:, except_ids: []) def find_retryable_failed_registries(batch_size:, except_ids: [])
Geo::JobArtifactRegistry syncable
.failed .failed
.retry_due .retry_due
.model_id_not_in(except_ids) .model_id_not_in(except_ids)
...@@ -136,7 +91,7 @@ module Geo ...@@ -136,7 +91,7 @@ module Geo
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def find_retryable_synced_missing_on_primary_registries(batch_size:, except_ids: []) def find_retryable_synced_missing_on_primary_registries(batch_size:, except_ids: [])
Geo::JobArtifactRegistry syncable
.synced .synced
.missing_on_primary .missing_on_primary
.retry_due .retry_due
...@@ -147,18 +102,12 @@ module Geo ...@@ -147,18 +102,12 @@ module Geo
private private
def job_artifacts(fdw: true) def job_artifacts
local_storage_only?(fdw: fdw) ? all_job_artifacts(fdw: fdw).with_files_stored_locally : all_job_artifacts(fdw: fdw) local_storage_only?(fdw: false) ? all_job_artifacts.with_files_stored_locally : all_job_artifacts
end
def all_job_artifacts(fdw: true)
current_node(fdw: fdw).job_artifacts
end end
def registries_for_job_artifacts def all_job_artifacts
job_artifacts current_node(fdw: false).job_artifacts
.inner_join_job_artifact_registry
.not_expired
end end
end end
end end
...@@ -9,16 +9,7 @@ class Geo::JobArtifactRegistry < Geo::BaseRegistry ...@@ -9,16 +9,7 @@ class Geo::JobArtifactRegistry < Geo::BaseRegistry
scope :never, -> { where(success: false, retry_count: nil) } scope :never, -> { where(success: false, retry_count: nil) }
def self.failed def self.failed
if registry_consistency_worker_enabled? where(success: false).where.not(retry_count: nil)
where(success: false).where.not(retry_count: nil)
else
# Would do `super` except it doesn't work with an included scope
where(success: false)
end
end
def self.registry_consistency_worker_enabled?
Feature.enabled?(:geo_job_artifact_registry_ssot_sync, default_enabled: true)
end end
def self.finder_class def self.finder_class
......
...@@ -12,13 +12,9 @@ module Geo ...@@ -12,13 +12,9 @@ module Geo
end end
def find_unsynced_jobs(batch_size:) def find_unsynced_jobs(batch_size:)
if Geo::JobArtifactRegistry.registry_consistency_worker_enabled? convert_registry_relation_to_job_args(
convert_registry_relation_to_job_args( registry_finder.find_never_synced_registries(find_batch_params(batch_size))
registry_finder.find_never_synced_registries(find_batch_params(batch_size)) )
)
else
super
end
end end
end end
end end
......
...@@ -37,10 +37,7 @@ module Geo ...@@ -37,10 +37,7 @@ module Geo
end end
def find_migrated_local_objects(batch_size:) def find_migrated_local_objects(batch_size:)
attachment_ids = find_migrated_local_attachments_ids(batch_size: batch_size) find_migrated_local_attachments_ids(batch_size: batch_size)
job_artifact_ids = find_migrated_local_job_artifacts_ids(batch_size: batch_size)
take_batch(attachment_ids, job_artifact_ids)
end end
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
...@@ -53,16 +50,6 @@ module Geo ...@@ -53,16 +50,6 @@ module Geo
end end
# rubocop: enable CodeReuse/ActiveRecord # rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def find_migrated_local_job_artifacts_ids(batch_size:)
return [] unless job_artifacts_object_store_enabled?
job_artifacts_finder.find_migrated_local(batch_size: batch_size, except_ids: scheduled_file_ids(:job_artifact))
.pluck(Geo::Fdw::Ci::JobArtifact.arel_table[:id])
.map { |id| ['job_artifact', id] }
end
# rubocop: enable CodeReuse/ActiveRecord
def scheduled_file_ids(file_types) def scheduled_file_ids(file_types)
file_types = Array(file_types) file_types = Array(file_types)
file_types = file_types.map(&:to_s) file_types = file_types.map(&:to_s)
...@@ -74,13 +61,8 @@ module Geo ...@@ -74,13 +61,8 @@ module Geo
FileUploader.object_store_enabled? FileUploader.object_store_enabled?
end end
def job_artifacts_object_store_enabled?
JobArtifactUploader.object_store_enabled?
end
def object_store_enabled? def object_store_enabled?
attachments_object_store_enabled? || attachments_object_store_enabled?
job_artifacts_object_store_enabled?
end end
def sync_object_storage_enabled? def sync_object_storage_enabled?
...@@ -90,9 +72,5 @@ module Geo ...@@ -90,9 +72,5 @@ module Geo
def attachments_finder def attachments_finder
@attachments_finder ||= AttachmentRegistryFinder.new(current_node_id: current_node.id) @attachments_finder ||= AttachmentRegistryFinder.new(current_node_id: current_node.id)
end end
def job_artifacts_finder
@job_artifacts_finder ||= JobArtifactRegistryFinder.new(current_node_id: current_node.id)
end
end end
end end
---
title: 'Geo: Make registry table SSOT for job artifacts'
merge_request: 34590
author:
type: performance
...@@ -321,40 +321,16 @@ RSpec.describe GeoNodeStatus, :geo, :geo_fdw do ...@@ -321,40 +321,16 @@ RSpec.describe GeoNodeStatus, :geo, :geo_fdw do
end end
describe '#job_artifacts_failed_count' do describe '#job_artifacts_failed_count' do
context 'when geo_job_artifact_registry_ssot_sync is disabled' do it 'counts failed job artifacts' do
before do # These should be ignored
stub_feature_flags(geo_job_artifact_registry_ssot_sync: false) create(:geo_upload_registry, :failed)
end create(:geo_upload_registry, :avatar, :failed)
create(:geo_upload_registry, :attachment, :failed)
it 'counts failed job artifacts' do create(:geo_job_artifact_registry, :with_artifact, success: true)
# These should be ignored
create(:geo_upload_registry, :failed)
create(:geo_upload_registry, :avatar, :failed)
create(:geo_upload_registry, :attachment, :failed)
create(:geo_job_artifact_registry, :with_artifact, success: true)
create(:geo_job_artifact_registry, :with_artifact, success: false)
expect(subject.job_artifacts_failed_count).to eq(1)
end
end
context 'when geo_job_artifact_registry_ssot_sync is enabled' do
before do
stub_feature_flags(geo_job_artifact_registry_ssot_sync: true)
end
it 'counts failed job artifacts' do
# These should be ignored
create(:geo_upload_registry, :failed)
create(:geo_upload_registry, :avatar, :failed)
create(:geo_upload_registry, :attachment, :failed)
create(:geo_job_artifact_registry, :with_artifact, success: true)
create(:geo_job_artifact_registry, :with_artifact, :failed) create(:geo_job_artifact_registry, :with_artifact, :failed)
expect(subject.job_artifacts_failed_count).to eq(1) expect(subject.job_artifacts_failed_count).to eq(1)
end
end end
end end
......
...@@ -141,61 +141,6 @@ RSpec.describe Geo::MigratedLocalFilesCleanUpWorker, :geo, :geo_fdw, :use_sql_qu ...@@ -141,61 +141,6 @@ RSpec.describe Geo::MigratedLocalFilesCleanUpWorker, :geo, :geo_fdw, :use_sql_qu
end end
end end
context 'with job artifacts' do
let(:job_artifact_local) { create(:ci_job_artifact) }
let(:job_artifact_remote_1) { create(:ci_job_artifact, :remote_store, project: synced_project) }
before do
stub_artifacts_object_storage
create(:geo_job_artifact_registry, artifact_id: job_artifact_local.id)
create(:geo_job_artifact_registry, artifact_id: job_artifact_remote_1.id)
end
it 'schedules worker for artifact stored remotely and synced locally' do
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('job_artifact', job_artifact_remote_1.id)
expect(Geo::FileRegistryRemovalWorker).not_to receive(:perform_async).with(anything, job_artifact_local.id)
subject.perform
end
context 'with selective sync by namespace' do
let(:job_artifact_remote_2) { create(:ci_job_artifact, :remote_store, project: project_broken_storage) }
let(:secondary) { create(:geo_node, :local_storage_only, selective_sync_type: 'shards', selective_sync_shards: ['broken']) }
before do
create(:geo_job_artifact_registry, artifact_id: job_artifact_remote_2.id)
end
it 'schedules worker for artifact stored remotely and synced locally' do
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('job_artifact', job_artifact_remote_2.id)
expect(Geo::FileRegistryRemovalWorker).not_to receive(:perform_async).with(anything, job_artifact_remote_1.id)
expect(Geo::FileRegistryRemovalWorker).not_to receive(:perform_async).with(anything, job_artifact_local.id)
subject.perform
end
end
context 'with selective sync by shard' do
let(:job_artifact_remote_2) { create(:ci_job_artifact, :remote_store, project: unsynced_project) }
let(:secondary) { create(:geo_node, :local_storage_only, selective_sync_type: 'namespaces', namespaces: [synced_group]) }
before do
create(:geo_job_artifact_registry, artifact_id: job_artifact_remote_2.id)
end
it 'schedules worker for artifact stored remotely and synced locally' do
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('job_artifact', job_artifact_remote_1.id)
expect(Geo::FileRegistryRemovalWorker).not_to receive(:perform_async).with(anything, job_artifact_remote_2.id)
expect(Geo::FileRegistryRemovalWorker).not_to receive(:perform_async).with(anything, job_artifact_local.id)
subject.perform
end
end
end
context 'backoff time' do context 'backoff time' do
let(:cache_key) { "#{described_class.name.underscore}:skip" } let(:cache_key) { "#{described_class.name.underscore}:skip" }
......
...@@ -97,27 +97,6 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do ...@@ -97,27 +97,6 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
expect(Geo::PackageFileRegistry.where(package_file_id: package_file.id).count).to eq(1) expect(Geo::PackageFileRegistry.where(package_file_id: package_file.id).count).to eq(1)
end end
context 'when geo_job_artifact_registry_ssot_sync is disabled' do
before do
stub_feature_flags(geo_job_artifact_registry_ssot_sync: false)
end
it 'returns false' do
expect(subject.perform).to be_falsey
end
it 'does not execute RegistryConsistencyService for Job Artifacts' do
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::LfsObjectRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::ProjectRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::UploadRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::PackageFileRegistry, batch_size: 1000).and_call_original
expect(Geo::RegistryConsistencyService).not_to receive(:new).with(Geo::JobArtifactRegistry, batch_size: 1000)
subject.perform
end
end
context 'when geo_file_registry_ssot_sync is disabled' do context 'when geo_file_registry_ssot_sync is disabled' do
before do before do
stub_feature_flags(geo_file_registry_ssot_sync: false) stub_feature_flags(geo_file_registry_ssot_sync: false)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment