Commit 06cd10a3 authored by Micaël Bergeron's avatar Micaël Bergeron

refactored the migrate task to its own queue

parent 33a856af
......@@ -121,10 +121,9 @@
- geo:geo_repositories_clean_up
- geo:geo_repository_destroy
- object_storage_upload
- object_storage:object_storage_background_upload
- object_storage:object_storage_migrate_for_artifacts
- object_storage:object_storage_migrate_for_lfs_objects
- object_storage:object_storage_migrate_for_uploads
- object_storage:object_storage_migrate_uploads
- admin_emails
- elastic_batch_project_indexer
......
......@@ -84,4 +84,5 @@
- [elastic_indexer, 1]
- [elastic_commit_indexer, 1]
- [export_csv, 1]
- [object_storage_upload, 1]
- [object_storage, 1]
......@@ -8,7 +8,7 @@ require 'carrierwave/storage/fog'
module ObjectStorage
RemoteStoreError = Class.new(StandardError)
UnknownStoreError = Class.new(StandardError)
ObjectStoreUnavailable = Class.new(StandardError)
ObjectStorageUnavailable = Class.new(StandardError)
module Store
LOCAL = 1
......@@ -21,7 +21,7 @@ module ObjectStorage
extend ActiveSupport::Concern
prepended do |base|
raise ObjectStoreUnavailable, "#{base} must include ObjectStorage::Concern to use extensions." unless base < Concern
raise "#{base} must include ObjectStorage::Concern to use extensions." unless base < Concern
base.include(::RecordsUploads::Concern)
end
......@@ -241,7 +241,7 @@ module ObjectStorage
def verify_license!(_file)
return if file_storage?
raise 'Object Storage feature is missing' unless self.class.licensed?
raise(ObjectStorageUnavailable, 'Object Storage feature is missing') unless self.class.licensed?
end
def exists?
......
# frozen_string_literal: true
# rubocop:disable Metrics/LineLength
# rubocop:disable Style/Documentation
module ObjectStorage
class MigrateUploadsWorker
include ApplicationWorker
include ObjectStorageQueue
SanityCheckError = Class.new(StandardError)
class Upload < ActiveRecord::Base
# Upper limit for foreground checksum processing
CHECKSUM_THRESHOLD = 100.megabytes
belongs_to :model, polymorphic: true # rubocop:disable Cop/PolymorphicAssociations
validates :size, presence: true
validates :path, presence: true
validates :model, presence: true
validates :uploader, presence: true
before_save :calculate_checksum!, if: :foreground_checksummable?
after_commit :schedule_checksum, if: :checksummable?
scope :stored_locally, -> { where(store: [nil, ObjectStorage::Store::LOCAL]) }
scope :stored_remotely, -> { where(store: ObjectStorage::Store::REMOTE) }
def self.hexdigest(path)
Digest::SHA256.file(path).hexdigest
end
def absolute_path
raise ObjectStorage::RemoteStoreError, "Remote object has no absolute path." unless local?
return path unless relative_path?
uploader_class.absolute_path(self)
end
def calculate_checksum!
self.checksum = nil
return unless checksummable?
self.checksum = self.class.hexdigest(absolute_path)
end
def build_uploader(mounted_as = nil)
uploader_class.new(model, mounted_as).tap do |uploader|
uploader.upload = self
uploader.retrieve_from_store!(identifier)
end
end
def exist?
File.exist?(absolute_path)
end
def local?
return true if store.nil?
store == ObjectStorage::Store::LOCAL
end
private
def checksummable?
checksum.nil? && local? && exist?
end
def foreground_checksummable?
checksummable? && size <= CHECKSUM_THRESHOLD
end
def schedule_checksum
UploadChecksumWorker.perform_async(id)
end
def relative_path?
!path.start_with?('/')
end
def identifier
File.basename(path)
end
def uploader_class
Object.const_get(uploader)
end
end
class MigrationResult
attr_reader :upload
attr_accessor :error
def initialize(upload, error = nil)
@upload, @error = upload, error
end
def success?
error.nil?
end
def to_s
success? ? "Migration sucessful." : "Error while migrating #{upload.id}: #{error.message}"
end
end
module Report
class MigrationFailures < StandardError
attr_reader :errors
def initialize(errors)
@errors = errors
end
def message
errors.map(&:message).join("\n")
end
end
def report!(results)
success, failures = results.partition(&:success?)
Rails.logger.info header(success, failures)
Rails.logger.warn failures(failures)
raise MigrationFailures.new(failures.map(&:error))
end
def header(success, failures)
color = failures.count == 0 ? :green : :red
"Migrated #{success.count}/#{success.count + failures.count} files.".color(color)
end
def failures(failures)
failures.map { |f| "\t#{f}".color(:red) }.join('\n')
end
end
include Report
def self.enqueue!(uploads, mounted_as, to_store)
sanity_check!(uploads, mounted_as)
perform_async(uploads.ids, mounted_as, to_store)
end
# We need to be sure all the uploads are for the same uploader and model type
# and that the mount point exists if provided.
#
def self.sanity_check!(uploads, mounted_as)
upload = uploads.first
uploader_class = upload.uploader.constantize
model_class = uploads.first.model_type.constantize
uploader_types = uploads.map(&:uploader).uniq
model_types = uploads.map(&:model_type).uniq
model_has_mount = mounted_as.nil? || model_class.uploaders[mounted_as] == uploader_class
raise(SanityCheckError, "Multiple uploaders found: #{uploader_types}") unless uploader_types.count == 1
raise(SanityCheckError, "Multiple model types found: #{model_types}") unless model_types.count == 1
raise(SanityCheckError, "Mount point #{mounted_as} not found in #{model_class}.") unless model_has_mount
end
def perform(ids, mounted_as, to_store)
@mounted_as = mounted_as&.to_sym
@to_store = to_store
uploads = Upload.preload(:model).where(id: ids)
sanity_check!(uploads)
results = migrate(uploads)
report!(results)
rescue SanityCheckError => e
# do not retry: the job is insane
Rails.logger.warn "UploadsToObjectStorage sanity check error: #{e.message}"
end
def sanity_check!(uploads)
self.class.sanity_check!(uploads, @mounted_as)
end
def build_uploaders(uploads)
uploads.map { |upload| upload.build_uploader(@mounted_as) }
end
def migrate(uploads)
build_uploaders(uploads).map(&method(:process_uploader))
end
def process_uploader(uploader)
result = MigrationResult.new(uploader.upload)
begin
uploader.migrate!(@to_store)
result
rescue => e
result.error = e
result
end
end
end
end
# @Deprecated - remove once the `object_storage_upload` queue is empty
# The queue has been renamed `object_storage:object_storage_background_upload`
#
class ObjectStorageUploadWorker
include ApplicationWorker
sidekiq_options retry: 5
def perform(uploader_class_name, subject_class_name, file_field, subject_id)
uploader_class = uploader_class_name.constantize
subject_class = subject_class_name.constantize
return unless uploader_class < ObjectStorage::Concern
return unless uploader_class.object_store_enabled?
return unless uploader_class.licensed?
return unless uploader_class.background_upload_enabled?
subject = subject_class.find(subject_id)
uploader = subject.public_send(file_field) # rubocop:disable GitlabSecurity/PublicSend
uploader.migrate!(ObjectStorage::Store::REMOTE)
rescue RecordNotFound
# does not retry when the record do not exists
Rails.logger.warn("Cannot find subject #{subject_class} with id=#{subject_id}.")
end
end
......@@ -2,7 +2,7 @@ namespace :gitlab do
namespace :uploads do
desc 'GitLab | Uploads | Migrate the uploaded files to object storage'
task :migrate, [:uploader_class, :model_class, :mounted_as] => :environment do |task, args|
batch_size = ENV.fetch('BATCH', 200)
batch_size = ENV.fetch('BATCH', 200).to_i
@to_store = ObjectStorage::Store::REMOTE
@mounted_as = args.mounted_as&.gsub(':', '')&.to_sym
uploader_class = args.uploader_class.constantize
......@@ -11,16 +11,16 @@ namespace :gitlab do
Upload
.where.not(store: @to_store)
.where(uploader: uploader_class.to_s,
model_type: model_class.to_s)
model_type: model_class.base_class.sti_name)
.in_batches(of: batch_size, &method(:process)) # rubocop: disable Cop/InBatches
end
def process(batch)
job = Gitlab::BackgroundMigration::MigrateUploadsToObjectStorage.enqueue!(batch,
job = ObjectStorage::MigrateUploadsWorker.enqueue!(batch,
@mounted_as,
@to_store)
puts "Enqueued job: #{job}"
rescue Gitlab::BackgroundMigration::MigrateUploadsToObjectStorage::SanityCheckError => e
rescue ObjectStorage::MigrateUploadsWorker::SanityCheckError => e
# continue for the next batch
puts "Could not enqueue batch (#{batch.ids}) #{e.message}".color(:red)
end
......
# frozen_string_literal: true
# rubocop:disable Metrics/LineLength
# rubocop:disable Style/Documentation
module Gitlab
module BackgroundMigration
class MigrateUploadsToObjectStorage
SanityCheckError = Class.new(StandardError)
class Upload < ActiveRecord::Base
# Upper limit for foreground checksum processing
CHECKSUM_THRESHOLD = 100.megabytes
belongs_to :model, polymorphic: true # rubocop:disable Cop/PolymorphicAssociations
validates :size, presence: true
validates :path, presence: true
validates :model, presence: true
validates :uploader, presence: true
before_save :calculate_checksum!, if: :foreground_checksummable?
after_commit :schedule_checksum, if: :checksummable?
scope :stored_locally, -> { where(store: [nil, ObjectStorage::Store::LOCAL]) }
scope :stored_remotely, -> { where(store: ObjectStorage::Store::REMOTE) }
def self.hexdigest(path)
Digest::SHA256.file(path).hexdigest
end
def absolute_path
raise ObjectStorage::RemoteStoreError, "Remote object has no absolute path." unless local?
return path unless relative_path?
uploader_class.absolute_path(self)
end
def calculate_checksum!
self.checksum = nil
return unless checksummable?
self.checksum = self.class.hexdigest(absolute_path)
end
def build_uploader(mounted_as = nil)
uploader_class.new(model, mounted_as).tap do |uploader|
uploader.upload = self
uploader.retrieve_from_store!(identifier)
end
end
def exist?
File.exist?(absolute_path)
end
def local?
return true if store.nil?
store == ObjectStorage::Store::LOCAL
end
private
def checksummable?
checksum.nil? && local? && exist?
end
def foreground_checksummable?
checksummable? && size <= CHECKSUM_THRESHOLD
end
def schedule_checksum
UploadChecksumWorker.perform_async(id)
end
def relative_path?
!path.start_with?('/')
end
def identifier
File.basename(path)
end
def uploader_class
Object.const_get(uploader)
end
end
class MigrationResult
attr_reader :upload
attr_accessor :error
def initialize(upload, error = nil)
@upload, @error = upload, error
end
def success?
error.nil?
end
def to_s
success? ? "Migration sucessful." : "Error while migrating #{upload.id}: #{error.message}"
end
end
module Report
def report(results)
success, failures = results.partition(&:success?)
Rails.logger.info header(success, failures)
Rails.logger.warn failures(failures)
end
def header(success, failures)
color = failures.count == 0 ? :green : :red
"Migrated #{success.count}/#{success.count + failures.count} files.".color(color)
end
def failures(failures)
failures.map { |f| "\t#{f}".color(:red) }.join('\n')
end
end
include Report
def self.enqueue!(uploads, mounted_as, to_store)
sanity_check!(uploads, mounted_as)
BackgroundMigrationWorker.perform_async('MigrateUploadsToObjectStorage', [uploads.ids, mounted_as, to_store])
end
# We need to be sure all the uploads are for the same uploader and model type
# and that the mount point exists if provided.
#
def self.sanity_check!(uploads, mounted_as)
upload = uploads.first
uploader_class = upload.uploader.constantize
model_class = uploads.first.model_type.constantize
uploader_types = uploads.map(&:uploader).uniq
model_types = uploads.map(&:model_type).uniq
model_has_mount = mounted_as.nil? || model_class.uploaders[mounted_as] == uploader_class
raise(SanityCheckError, "Multiple uploaders found: #{uploader_types}") unless uploader_types.count == 1
raise(SanityCheckError, "Multiple model types found: #{model_types}") unless model_types.count == 1
raise(SanityCheckError, "Mount point #{mounted_as} not found in #{model_class}.") unless model_has_mount
end
def perform(ids, mounted_as, to_store)
@mounted_as = mounted_as.to_sym
@to_store = to_store
uploads = Upload.preload(:model).where(id: ids)
sanity_check!(uploads)
results = migrate(uploads)
report(results)
rescue SanityCheckError => e
# do not retry if the job is insane
Rails.logger.warn "UploadsToObjectStorage sanity check error: #{e.message}"
end
def sanity_check!(uploads)
self.class.sanity_check!(uploads, @mounted_as)
end
def build_uploaders(uploads)
uploads.map { |upload| upload.build_uploader(@mounted_as) }
end
def migrate(uploads)
build_uploaders(uploads).map(&method(:process_uploader))
end
def process_uploader(uploader)
result = MigrationResult.new(uploader.upload)
begin
uploader.migrate!(@to_store)
result
rescue => e
result.error = e
result
end
end
end
end
end
......@@ -12,7 +12,7 @@ describe 'gitlab:uploads:migrate rake tasks' do
stub_uploads_object_storage(uploader_class)
Rake.application.rake_require 'tasks/gitlab/uploads/migrate'
allow(BackgroundMigrationWorker).to receive(:perform_async)
allow(ObjectStorage::MigrateUploadsWorker).to receive(:perform_async)
end
def run
......@@ -21,7 +21,7 @@ describe 'gitlab:uploads:migrate rake tasks' do
end
it 'enqueue jobs in batch' do
expect(BackgroundMigrationWorker).to receive(:perform_async).exactly(4).times
expect(ObjectStorage::MigrateUploadsWorker).to receive(:enqueue!).exactly(4).times
run
end
......
require 'spec_helper'
describe Gitlab::BackgroundMigration::MigrateUploadsToObjectStorage, :sidekiq do
describe ObjectStorage::MigrateUploadsWorker, :sidekiq do
shared_context 'sanity_check! fails' do
before do
expect(described_class).to receive(:sanity_check!).and_raise(described_class::SanityCheckError)
......@@ -22,7 +22,7 @@ describe Gitlab::BackgroundMigration::MigrateUploadsToObjectStorage, :sidekiq do
end
it 'is guarded by .sanity_check!' do
expect(BackgroundMigrationWorker).to receive(:perform_async)
expect(described_class).to receive(:perform_async)
expect(described_class).to receive(:sanity_check!)
enqueue!
......@@ -32,7 +32,7 @@ describe Gitlab::BackgroundMigration::MigrateUploadsToObjectStorage, :sidekiq do
include_context 'sanity_check! fails'
it 'does not enqueue a job' do
expect(BackgroundMigrationWorker).not_to receive(:perform_async)
expect(described_class).not_to receive(:perform_async)
expect { enqueue! }.to raise_error(described_class::SanityCheckError)
end
......@@ -104,6 +104,7 @@ describe Gitlab::BackgroundMigration::MigrateUploadsToObjectStorage, :sidekiq do
context 'migration is unsuccessful' do
before do
expect { described_class.perform }.to raise_error(described_class::Report::MigrationFailures)
allow_any_instance_of(ObjectStorage::Concern).to receive(:migrate!).and_raise(CarrierWave::UploadError, "I am a teapot.")
end
......
require 'spec_helper'
describe ObjectStorageUploadWorker do
let(:local) { ObjectStorage::Store::LOCAL }
let(:remote) { ObjectStorage::Store::REMOTE }
def perform
described_class.perform_async(uploader_class.name, subject_class, file_field, subject_id)
end
context 'for LFS' do
let!(:lfs_object) { create(:lfs_object, :with_file, file_store: local) }
let(:uploader_class) { LfsObjectUploader }
let(:subject_class) { LfsObject }
let(:file_field) { :file }
let(:subject_id) { lfs_object.id }
context 'when object storage is enabled' do
before do
stub_lfs_object_storage(background_upload: true)
end
it 'uploads object to storage' do
expect { perform }.to change { lfs_object.reload.file_store }.from(local).to(remote)
end
context 'when background upload is disabled' do
before do
allow(Gitlab.config.lfs.object_store).to receive(:background_upload) { false }
end
it 'is skipped' do
expect { perform }.not_to change { lfs_object.reload.file_store }
end
end
end
context 'when object storage is disabled' do
before do
stub_lfs_object_storage(enabled: false)
end
it "doesn't migrate files" do
perform
expect(lfs_object.reload.file_store).to eq(local)
end
end
end
context 'for legacy artifacts' do
let(:build) { create(:ci_build, :legacy_artifacts) }
let(:uploader_class) { LegacyArtifactUploader }
let(:subject_class) { Ci::Build }
let(:file_field) { :artifacts_file }
let(:subject_id) { build.id }
context 'when local storage is used' do
let(:store) { local }
context 'and remote storage is defined' do
before do
stub_artifacts_object_storage(background_upload: true)
end
it "migrates file to remote storage" do
perform
expect(build.reload.artifacts_file_store).to eq(remote)
end
context 'for artifacts_metadata' do
let(:file_field) { :artifacts_metadata }
it 'migrates metadata to remote storage' do
perform
expect(build.reload.artifacts_metadata_store).to eq(remote)
end
end
end
end
end
context 'for job artifacts' do
let(:artifact) { create(:ci_job_artifact, :archive) }
let(:uploader_class) { JobArtifactUploader }
let(:subject_class) { Ci::JobArtifact }
let(:file_field) { :file }
let(:subject_id) { artifact.id }
context 'when local storage is used' do
let(:store) { local }
context 'and remote storage is defined' do
before do
stub_artifacts_object_storage(background_upload: true)
end
it "migrates file to remote storage" do
perform
expect(artifact.reload.file_store).to eq(remote)
end
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment