Remove worker to clean up uploads migrated to object storage

The find_registry_differences method returns as unused
registries entries for uploads that are in object
storage when the object storage sync is disabled.

The Geo::Secondary::RegistryConsistencyWorker will
perform the cleanup of these unused registries.
parent 90e1de0d
......@@ -501,11 +501,6 @@ production: &base
geo_registry_sync_worker:
cron: "*/1 * * * *"
# GitLab Geo migrated local files clean up worker
# NOTE: This will only take effect if Geo is enabled (secondary nodes only)
geo_migrated_local_files_clean_up_worker:
cron: "15 */6 * * *"
# Export pseudonymized data in CSV format for analysis
pseudonymizer_worker:
cron: "0 * * * *"
......@@ -514,7 +509,7 @@ production: &base
# NOTE: This will only take effect if elasticsearch is enabled.
elastic_index_bulk_cron_worker:
cron: "*/1 * * * *"
# Elasticsearch bulk updater for initial updates.
# NOTE: This will only take effect if elasticsearch is enabled.
elastic_index_initial_bulk_cron_worker:
......
......@@ -522,9 +522,6 @@ Gitlab.ee do
Settings.cron_jobs['geo_metrics_update_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_metrics_update_worker']['cron'] ||= '*/1 * * * *'
Settings.cron_jobs['geo_metrics_update_worker']['job_class'] ||= 'Geo::MetricsUpdateWorker'
Settings.cron_jobs['geo_migrated_local_files_clean_up_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_migrated_local_files_clean_up_worker']['cron'] ||= '15 */6 * * *'
Settings.cron_jobs['geo_migrated_local_files_clean_up_worker']['job_class'] ||= 'Geo::MigratedLocalFilesCleanUpWorker'
Settings.cron_jobs['geo_prune_event_log_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_prune_event_log_worker']['cron'] ||= '*/5 * * * *'
Settings.cron_jobs['geo_prune_event_log_worker']['job_class'] ||= 'Geo::PruneEventLogWorker'
......
......@@ -92,10 +92,6 @@ module Geo
alias_method :find_unsynced, :find_never_synced_registries
# rubocop:enable CodeReuse/ActiveRecord
def find_migrated_local(batch_size:, except_ids: [])
Geo::UploadRegistry.none
end
# rubocop: disable CodeReuse/ActiveRecord
def find_retryable_failed_registries(batch_size:, except_ids: [])
syncable
......
......@@ -54,14 +54,6 @@ module Geo
raise NotImplementedError
end
# @!method find_migrated_local
# Return an ActiveRecord::Relation of tracked resource records, filtered
# by selective sync, with files stored remotely, excluding
# specified IDs, limited to batch_size
def find_migrated_local
raise NotImplementedError
end
# @!method find_retryable_failed_registries
# Return an ActiveRecord::Relation of registry records marked as failed,
# which are ready to be retried, excluding specified IDs, limited to
......
......@@ -75,14 +75,6 @@
:weight: 1
:idempotent:
:tags: []
- :name: cronjob:geo_migrated_local_files_clean_up
:feature_category: :geo_replication
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent:
:tags: []
- :name: cronjob:geo_prune_event_log
:feature_category: :geo_replication
:has_external_dependencies:
......
# frozen_string_literal: true
module Geo
class MigratedLocalFilesCleanUpWorker < ::Geo::Scheduler::Secondary::SchedulerWorker # rubocop:disable Scalability/IdempotentWorker
include ::CronjobQueue
MAX_CAPACITY = 1000
def perform
# No need to run when objects stored in Object Storage should be synced too
return if sync_object_storage_enabled?
# No need to run when nothing is configured to be in Object Storage
return unless object_store_enabled?
super
end
private
def max_capacity
MAX_CAPACITY
end
def schedule_job(object_type, object_db_id)
job_id = ::Geo::FileRegistryRemovalWorker.perform_async(object_type.to_s, object_db_id)
if job_id
retval = { id: object_db_id, type: object_type, job_id: job_id }
log_info('Scheduled Geo::FileRegistryRemovalWorker', retval)
retval
end
end
def load_pending_resources
find_migrated_local_objects(batch_size: db_retrieve_batch_size)
end
def find_migrated_local_objects(batch_size:)
find_migrated_local_attachments_ids(batch_size: batch_size)
end
# rubocop: disable CodeReuse/ActiveRecord
def find_migrated_local_attachments_ids(batch_size:)
return [] unless attachments_object_store_enabled?
attachments_finder.find_migrated_local(batch_size: batch_size, except_ids: scheduled_file_ids(Gitlab::Geo::Replication::USER_UPLOADS_OBJECT_TYPES))
.pluck(Geo::Fdw::Upload.arel_table[:uploader], Geo::Fdw::Upload.arel_table[:id])
.map { |uploader, id| [uploader.sub(/Uploader\z/, '').underscore, id] }
end
# rubocop: enable CodeReuse/ActiveRecord
def scheduled_file_ids(file_types)
file_types = Array(file_types)
file_types = file_types.map(&:to_s)
scheduled_jobs.select { |data| file_types.include?(data[:type].to_s) }.map { |data| data[:id] }
end
def attachments_object_store_enabled?
FileUploader.object_store_enabled?
end
def object_store_enabled?
attachments_object_store_enabled?
end
def sync_object_storage_enabled?
current_node.sync_object_storage
end
def attachments_finder
@attachments_finder ||= AttachmentRegistryFinder.new(current_node_id: current_node.id)
end
end
end
......@@ -16,7 +16,6 @@ module Gitlab
SECONDARY_JOBS = %w[
geo_file_download_dispatch_worker
geo_registry_sync_worker
geo_migrated_local_files_clean_up_worker
geo_repository_sync_worker
geo_container_repository_sync_worker
geo_repository_verification_secondary_scheduler_worker
......
......@@ -14,7 +14,6 @@ RSpec.describe Geo::FileRegistryFinder, :geo, :geo_fdw do
count_synced_missing_on_primary
count_registry
find_unsynced
find_migrated_local
find_retryable_failed_registries
find_retryable_synced_missing_on_primary_registries
].each do |required_method|
......
......@@ -16,7 +16,6 @@ RSpec.describe Gitlab::Geo::CronManager, :geo do
geo_repository_verification_secondary_scheduler_worker
geo_metrics_update_worker
geo_prune_event_log_worker
geo_migrated_local_files_clean_up_worker
].freeze
def job(name)
......@@ -39,8 +38,7 @@ RSpec.describe Gitlab::Geo::CronManager, :geo do
job('geo_registry_sync_worker'),
job('geo_repository_sync_worker'),
job('geo_container_repository_sync_worker'),
job('geo_repository_verification_secondary_scheduler_worker'),
job('geo_migrated_local_files_clean_up_worker')
job('geo_repository_verification_secondary_scheduler_worker')
]
end
......
......@@ -10,7 +10,6 @@ RSpec.shared_examples 'a file registry finder' do
count_synced_missing_on_primary
count_registry
find_unsynced
find_migrated_local
find_retryable_failed_registries
find_retryable_synced_missing_on_primary_registries
}
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Geo::MigratedLocalFilesCleanUpWorker, :geo, :geo_fdw, :use_sql_query_cache_for_tracking_db do
include ::EE::GeoHelpers
include ExclusiveLeaseHelpers
let(:primary) { create(:geo_node, :primary, host: 'primary-geo-node') }
let(:secondary) { create(:geo_node, :local_storage_only) }
let(:synced_group) { create(:group) }
let(:synced_project) { create(:project, group: synced_group) }
let(:unsynced_project) { create(:project) }
let(:project_broken_storage) { create(:project, :broken_storage) }
before do
stub_current_geo_node(secondary)
stub_exclusive_lease(renew: true)
end
it 'does not run when node is disabled' do
secondary.update_column(:enabled, false)
expect(subject).not_to receive(:try_obtain_lease)
subject.perform
end
it 'does not run when sync_object_storage is enabled' do
secondary.update_column(:sync_object_storage, true)
expect(subject).not_to receive(:try_obtain_lease)
subject.perform
end
context 'with attachments' do
let(:avatar_upload) { create(:upload) }
let(:personal_snippet_upload) { create(:upload, :personal_snippet_upload) }
let(:issuable_upload) { create(:upload, :issuable_upload) }
let(:namespace_upload) { create(:upload, :namespace_upload) }
let(:attachment_upload) { create(:upload, :attachment_upload) }
let(:favicon_upload) { create(:upload, :favicon_upload) }
before do
create(:geo_upload_registry, :avatar, file_id: avatar_upload.id)
create(:geo_upload_registry, :personal_file, file_id: personal_snippet_upload.id)
create(:geo_upload_registry, :file, file_id: issuable_upload.id)
create(:geo_upload_registry, :namespace_file, file_id: namespace_upload.id)
create(:geo_upload_registry, :attachment, file_id: attachment_upload.id)
create(:geo_upload_registry, :favicon, file_id: favicon_upload.id)
end
it 'schedules nothing for attachments stored locally' do
expect(subject).not_to receive(:schedule_job).with(anything, avatar_upload.id)
expect(subject).not_to receive(:schedule_job).with(anything, personal_snippet_upload.id)
expect(subject).not_to receive(:schedule_job).with(anything, issuable_upload.id)
expect(subject).not_to receive(:schedule_job).with(anything, namespace_upload.id)
expect(subject).not_to receive(:schedule_job).with(anything, attachment_upload.id)
expect(subject).not_to receive(:schedule_job).with(anything, favicon_upload.id)
subject.perform
end
context 'attachments stored remotely' do
before do
stub_uploads_object_storage(AvatarUploader)
stub_uploads_object_storage(PersonalFileUploader)
stub_uploads_object_storage(FileUploader)
stub_uploads_object_storage(NamespaceFileUploader)
stub_uploads_object_storage(AttachmentUploader)
stub_uploads_object_storage(FaviconUploader)
avatar_upload.update_column(:store, FileUploader::Store::REMOTE)
personal_snippet_upload.update_column(:store, FileUploader::Store::REMOTE)
issuable_upload.update_column(:store, FileUploader::Store::REMOTE)
namespace_upload.update_column(:store, FileUploader::Store::REMOTE)
attachment_upload.update_column(:store, FileUploader::Store::REMOTE)
favicon_upload.update_column(:store, FileUploader::Store::REMOTE)
end
it 'schedules workers for uploads stored remotely and synced locally' do
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('avatar', avatar_upload.id)
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('personal_file', personal_snippet_upload.id)
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('file', issuable_upload.id)
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('namespace_file', namespace_upload.id)
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('attachment', attachment_upload.id)
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('favicon', favicon_upload.id)
subject.perform
end
context 'with selective sync by namespace' do
let(:issuable_upload_synced_group) { create(:upload, :issuable_upload, model: synced_project) }
let(:secondary) { create(:geo_node, :local_storage_only, selective_sync_type: 'namespaces', namespaces: [synced_group]) }
before do
create(:geo_upload_registry, :file, file_id: issuable_upload_synced_group.id)
issuable_upload_synced_group.update_column(:store, FileUploader::Store::REMOTE)
end
it 'schedules workers for uploads stored remotely and synced locally' do
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('file', issuable_upload_synced_group.id)
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('favicon', favicon_upload.id)
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('personal_file', personal_snippet_upload.id)
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('attachment', attachment_upload.id)
expect(Geo::FileRegistryRemovalWorker).not_to receive(:perform_async).with('avatar', avatar_upload.id)
expect(Geo::FileRegistryRemovalWorker).not_to receive(:perform_async).with('file', issuable_upload.id)
expect(Geo::FileRegistryRemovalWorker).not_to receive(:perform_async).with('namespace_file', namespace_upload.id)
subject.perform
end
end
context 'with selective sync by shard' do
let(:issuable_upload_synced_group) { create(:upload, :issuable_upload, model: project_broken_storage) }
let(:secondary) { create(:geo_node, :local_storage_only, selective_sync_type: 'shards', selective_sync_shards: ['broken']) }
before do
create(:geo_upload_registry, :file, file_id: issuable_upload_synced_group.id)
issuable_upload_synced_group.update_column(:store, FileUploader::Store::REMOTE)
end
it 'schedules workers for uploads stored remotely and synced locally' do
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('file', issuable_upload_synced_group.id)
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('favicon', favicon_upload.id)
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('personal_file', personal_snippet_upload.id)
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('attachment', attachment_upload.id)
expect(Geo::FileRegistryRemovalWorker).not_to receive(:perform_async).with('avatar', avatar_upload.id)
expect(Geo::FileRegistryRemovalWorker).not_to receive(:perform_async).with('file', issuable_upload.id)
expect(Geo::FileRegistryRemovalWorker).not_to receive(:perform_async).with('namespace_file', namespace_upload.id)
subject.perform
end
end
end
end
context 'backoff time' do
let(:cache_key) { "#{described_class.name.underscore}:skip" }
before do
stub_uploads_object_storage(AvatarUploader)
allow(Rails.cache).to receive(:read).and_call_original
allow(Rails.cache).to receive(:write).and_call_original
end
it 'sets the back off time when there are no pending items' do
expect(Rails.cache).to receive(:write).with(cache_key, true, expires_in: 300.seconds).once
subject.perform
end
it 'does not perform Geo::FileRegistryRemovalWorker when the backoff time is set' do
create(:geo_upload_registry, :avatar)
expect(Rails.cache).to receive(:read).with(cache_key).and_return(true)
expect(Geo::FileRegistryRemovalWorker).not_to receive(:perform_async)
subject.perform
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment