Remove FF to make registry table SSOT for job artifacts

The feature flag has been enabled by default
and will be removed and is working as expected.
parent de555710
...@@ -9,16 +9,7 @@ class Geo::JobArtifactRegistry < Geo::BaseRegistry ...@@ -9,16 +9,7 @@ class Geo::JobArtifactRegistry < Geo::BaseRegistry
scope :never, -> { where(success: false, retry_count: nil) } scope :never, -> { where(success: false, retry_count: nil) }
def self.failed def self.failed
if registry_consistency_worker_enabled? where(success: false).where.not(retry_count: nil)
where(success: false).where.not(retry_count: nil)
else
# Would do `super` except it doesn't work with an included scope
where(success: false)
end
end
def self.registry_consistency_worker_enabled?
Feature.enabled?(:geo_job_artifact_registry_ssot_sync, default_enabled: true)
end end
def self.finder_class def self.finder_class
......
...@@ -12,13 +12,9 @@ module Geo ...@@ -12,13 +12,9 @@ module Geo
end end
def find_unsynced_jobs(batch_size:) def find_unsynced_jobs(batch_size:)
if Geo::JobArtifactRegistry.registry_consistency_worker_enabled? convert_registry_relation_to_job_args(
convert_registry_relation_to_job_args( registry_finder.find_never_synced_registries(find_batch_params(batch_size))
registry_finder.find_never_synced_registries(find_batch_params(batch_size)) )
)
else
super
end
end end
end end
end end
......
...@@ -321,40 +321,16 @@ RSpec.describe GeoNodeStatus, :geo, :geo_fdw do ...@@ -321,40 +321,16 @@ RSpec.describe GeoNodeStatus, :geo, :geo_fdw do
end end
describe '#job_artifacts_failed_count' do describe '#job_artifacts_failed_count' do
context 'when geo_job_artifact_registry_ssot_sync is disabled' do it 'counts failed job artifacts' do
before do # These should be ignored
stub_feature_flags(geo_job_artifact_registry_ssot_sync: false) create(:geo_upload_registry, :failed)
end create(:geo_upload_registry, :avatar, :failed)
create(:geo_upload_registry, :attachment, :failed)
it 'counts failed job artifacts' do create(:geo_job_artifact_registry, :with_artifact, success: true)
# These should be ignored
create(:geo_upload_registry, :failed)
create(:geo_upload_registry, :avatar, :failed)
create(:geo_upload_registry, :attachment, :failed)
create(:geo_job_artifact_registry, :with_artifact, success: true)
create(:geo_job_artifact_registry, :with_artifact, success: false)
expect(subject.job_artifacts_failed_count).to eq(1)
end
end
context 'when geo_job_artifact_registry_ssot_sync is enabled' do
before do
stub_feature_flags(geo_job_artifact_registry_ssot_sync: true)
end
it 'counts failed job artifacts' do
# These should be ignored
create(:geo_upload_registry, :failed)
create(:geo_upload_registry, :avatar, :failed)
create(:geo_upload_registry, :attachment, :failed)
create(:geo_job_artifact_registry, :with_artifact, success: true)
create(:geo_job_artifact_registry, :with_artifact, :failed) create(:geo_job_artifact_registry, :with_artifact, :failed)
expect(subject.job_artifacts_failed_count).to eq(1) expect(subject.job_artifacts_failed_count).to eq(1)
end
end end
end end
......
...@@ -361,223 +361,107 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw, :use_sql_query_c ...@@ -361,223 +361,107 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw, :use_sql_query_c
end end
context 'with job artifacts' do context 'with job artifacts' do
context 'with geo_job_artifact_registry_ssot_sync feature enabled' do it 'performs Geo::FileDownloadWorker for unsynced job artifacts' do
before do registry = create(:geo_job_artifact_registry, :with_artifact, :never_synced)
stub_feature_flags(geo_job_artifact_registry_ssot_sync: true)
end
it 'performs Geo::FileDownloadWorker for unsynced job artifacts' do
registry = create(:geo_job_artifact_registry, :with_artifact, :never_synced)
expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with('job_artifact', registry.artifact_id).once.and_return(spy)
subject.perform
end
it 'performs Geo::FileDownloadWorker for failed-sync job artifacts' do
registry = create(:geo_job_artifact_registry, :with_artifact, :failed)
expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with('job_artifact', registry.artifact_id).once.and_return(spy)
subject.perform
end
it 'does not perform Geo::FileDownloadWorker for synced job artifacts' do
registry = create(:geo_job_artifact_registry, :with_artifact, bytes: 1234, success: true)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
.with('job_artifact', registry.artifact_id)
subject.perform
end
it 'does not perform Geo::FileDownloadWorker for synced job artifacts even with 0 bytes downloaded' do
registry = create(:geo_job_artifact_registry, :with_artifact, bytes: 0, success: true)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
.with('job_artifact', registry.artifact_id)
subject.perform
end
it 'does not retry failed artifacts when retry_at is tomorrow' do
registry = create(:geo_job_artifact_registry, :with_artifact, :failed, retry_at: Date.tomorrow)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
.with('job_artifact', registry.artifact_id)
subject.perform
end
it 'retries failed artifacts when retry_at is in the past' do
registry = create(:geo_job_artifact_registry, :with_artifact, :failed, retry_at: Date.yesterday)
expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with('job_artifact', registry.artifact_id).once.and_return(spy)
subject.perform
end
context 'with files missing on the primary that are marked as synced' do expect(Geo::FileDownloadWorker).to receive(:perform_async)
let!(:artifact_file_missing_on_primary) { create(:ci_job_artifact) } .with('job_artifact', registry.artifact_id).once.and_return(spy)
let!(:artifact_registry) { create(:geo_job_artifact_registry, artifact_id: artifact_file_missing_on_primary.id, bytes: 1234, success: true, missing_on_primary: true) }
it 'retries the files if there is spare capacity' do subject.perform
registry = create(:geo_job_artifact_registry, :with_artifact, :never_synced) end
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', registry.artifact_id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', artifact_file_missing_on_primary.id)
subject.perform
end
it 'retries failed files with retry_at in the past' do it 'performs Geo::FileDownloadWorker for failed-sync job artifacts' do
artifact_registry.update!(retry_at: Date.yesterday) registry = create(:geo_job_artifact_registry, :with_artifact, :failed)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', artifact_file_missing_on_primary.id) expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with('job_artifact', registry.artifact_id).once.and_return(spy)
subject.perform subject.perform
end end
it 'does not retry files with later retry_at' do it 'does not perform Geo::FileDownloadWorker for synced job artifacts' do
artifact_registry.update!(retry_at: Date.tomorrow) registry = create(:geo_job_artifact_registry, :with_artifact, bytes: 1234, success: true)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async).with('job_artifact', artifact_file_missing_on_primary.id) expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
.with('job_artifact', registry.artifact_id)
subject.perform subject.perform
end end
it 'does not retry those files if there is no spare capacity' do it 'does not perform Geo::FileDownloadWorker for synced job artifacts even with 0 bytes downloaded' do
registry = create(:geo_job_artifact_registry, :with_artifact, :never_synced) registry = create(:geo_job_artifact_registry, :with_artifact, bytes: 0, success: true)
expect(subject).to receive(:db_retrieve_batch_size).and_return(1).twice expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', registry.artifact_id) .with('job_artifact', registry.artifact_id)
subject.perform subject.perform
end end
it 'does not retry those files if they are already scheduled' do it 'does not retry failed artifacts when retry_at is tomorrow' do
registry = create(:geo_job_artifact_registry, :with_artifact, :never_synced) registry = create(:geo_job_artifact_registry, :with_artifact, :failed, retry_at: Date.tomorrow)
scheduled_jobs = [{ type: 'job_artifact', id: artifact_file_missing_on_primary.id, job_id: 'foo' }] expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
expect(subject).to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(1) .with('job_artifact', registry.artifact_id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', registry.artifact_id)
subject.perform subject.perform
end
end
end end
context 'with geo_job_artifact_registry_ssot_sync feature disabled' do it 'retries failed artifacts when retry_at is in the past' do
before do registry = create(:geo_job_artifact_registry, :with_artifact, :failed, retry_at: Date.yesterday)
stub_feature_flags(geo_job_artifact_registry_ssot_sync: false)
end
it 'performs Geo::FileDownloadWorker for unsynced job artifacts' do expect(Geo::FileDownloadWorker).to receive(:perform_async)
artifact = create(:ci_job_artifact) .with('job_artifact', registry.artifact_id).once.and_return(spy)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', artifact.id) subject.perform
end
subject.perform
end
it 'performs Geo::FileDownloadWorker for failed-sync job artifacts' do context 'with files missing on the primary that are marked as synced' do
artifact = create(:ci_job_artifact) let!(:artifact_file_missing_on_primary) { create(:ci_job_artifact) }
let!(:artifact_registry) { create(:geo_job_artifact_registry, artifact_id: artifact_file_missing_on_primary.id, bytes: 1234, success: true, missing_on_primary: true) }
create(:geo_job_artifact_registry, artifact_id: artifact.id, bytes: 0, success: false) it 'retries the files if there is spare capacity' do
registry = create(:geo_job_artifact_registry, :with_artifact, :never_synced)
expect(Geo::FileDownloadWorker).to receive(:perform_async) expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', registry.artifact_id)
.with('job_artifact', artifact.id).once.and_return(spy) expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', artifact_file_missing_on_primary.id)
subject.perform subject.perform
end end
it 'does not perform Geo::FileDownloadWorker for synced job artifacts' do it 'retries failed files with retry_at in the past' do
artifact = create(:ci_job_artifact) artifact_registry.update!(retry_at: Date.yesterday)
create(:geo_job_artifact_registry, artifact_id: artifact.id, bytes: 1234, success: true)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async) expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', artifact_file_missing_on_primary.id)
subject.perform subject.perform
end end
it 'does not perform Geo::FileDownloadWorker for synced job artifacts even with 0 bytes downloaded' do it 'does not retry files with later retry_at' do
artifact = create(:ci_job_artifact) artifact_registry.update!(retry_at: Date.tomorrow)
create(:geo_job_artifact_registry, artifact_id: artifact.id, bytes: 0, success: true) expect(Geo::FileDownloadWorker).not_to receive(:perform_async).with('job_artifact', artifact_file_missing_on_primary.id)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
subject.perform subject.perform
end end
it 'does not retry failed artifacts when retry_at is tomorrow' do it 'does not retry those files if there is no spare capacity' do
failed_registry = create(:geo_job_artifact_registry, :with_artifact, bytes: 0, success: false, retry_at: Date.tomorrow) registry = create(:geo_job_artifact_registry, :with_artifact, :never_synced)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async).with('job_artifact', failed_registry.artifact_id) expect(subject).to receive(:db_retrieve_batch_size).and_return(1).twice
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', registry.artifact_id)
subject.perform subject.perform
end end
it 'retries failed artifacts when retry_at is in the past' do it 'does not retry those files if they are already scheduled' do
failed_registry = create(:geo_job_artifact_registry, :with_artifact, success: false, retry_at: Date.yesterday) registry = create(:geo_job_artifact_registry, :with_artifact, :never_synced)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', failed_registry.artifact_id) scheduled_jobs = [{ type: 'job_artifact', id: artifact_file_missing_on_primary.id, job_id: 'foo' }]
expect(subject).to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(1)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', registry.artifact_id)
subject.perform subject.perform
end end
context 'with files missing on the primary that are marked as synced' do
let!(:artifact_file_missing_on_primary) { create(:ci_job_artifact) }
let!(:artifact_registry) { create(:geo_job_artifact_registry, artifact_id: artifact_file_missing_on_primary.id, bytes: 1234, success: true, missing_on_primary: true) }
it 'retries the files if there is spare capacity' do
artifact = create(:ci_job_artifact)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', artifact.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', artifact_file_missing_on_primary.id)
subject.perform
end
it 'retries failed files with retry_at in the past' do
artifact_registry.update!(retry_at: Date.yesterday)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', artifact_file_missing_on_primary.id)
subject.perform
end
it 'does not retry files with later retry_at' do
artifact_registry.update!(retry_at: Date.tomorrow)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async).with('job_artifact', artifact_file_missing_on_primary.id)
subject.perform
end
it 'does not retry those files if there is no spare capacity' do
artifact = create(:ci_job_artifact)
expect(subject).to receive(:db_retrieve_batch_size).and_return(1).twice
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', artifact.id)
subject.perform
end
it 'does not retry those files if they are already scheduled' do
artifact = create(:ci_job_artifact)
scheduled_jobs = [{ type: 'job_artifact', id: artifact_file_missing_on_primary.id, job_id: 'foo' }]
expect(subject).to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(1)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', artifact.id)
subject.perform
end
end
end end
end end
...@@ -637,22 +521,6 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw, :use_sql_query_c ...@@ -637,22 +521,6 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw, :use_sql_query_c
allow(::GeoNode).to receive(:current_node).and_return(secondary) allow(::GeoNode).to receive(:current_node).and_return(secondary)
end end
context 'when geo_job_artifact_registry_ssot_sync feature is disabled' do
before do
stub_feature_flags(geo_job_artifact_registry_ssot_sync: false)
end
it 'does not perform Geo::FileDownloadWorker for job artifact that does not belong to selected namespaces to replicate' do
create(:ci_job_artifact, project: unsynced_project)
job_artifact_in_synced_group = create(:ci_job_artifact, project: project_in_synced_group)
expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with('job_artifact', job_artifact_in_synced_group.id).once.and_return(spy)
subject.perform
end
end
context 'with geo_file_registry_ssot_sync feature disabled' do context 'with geo_file_registry_ssot_sync feature disabled' do
before do before do
stub_feature_flags(geo_file_registry_ssot_sync: false) stub_feature_flags(geo_file_registry_ssot_sync: false)
......
...@@ -94,28 +94,6 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do ...@@ -94,28 +94,6 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
expect(Geo::PackageFileRegistry.where(package_file_id: package_file.id).count).to eq(1) expect(Geo::PackageFileRegistry.where(package_file_id: package_file.id).count).to eq(1)
end end
context 'when geo_job_artifact_registry_ssot_sync is disabled' do
let_it_be(:job_artifact) { create(:ci_job_artifact) }
before do
stub_feature_flags(geo_job_artifact_registry_ssot_sync: false)
end
it 'returns false' do
expect(subject.perform).to be_falsey
end
it 'does not execute RegistryConsistencyService for Job Artifacts' do
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::LfsObjectRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::UploadRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::PackageFileRegistry, batch_size: 1000).and_call_original
expect(Geo::RegistryConsistencyService).not_to receive(:new).with(Geo::JobArtifactRegistry, batch_size: 1000)
subject.perform
end
end
context 'when geo_file_registry_ssot_sync is disabled' do context 'when geo_file_registry_ssot_sync is disabled' do
let_it_be(:upload) { create(:upload) } let_it_be(:upload) { create(:upload) }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment