Commit 89012de6 authored by Micaël Bergeron's avatar Micaël Bergeron

refactor the task as a BackgroundMigration

parent 1b5c238a
module Gitlab
module BackgroundMigration
class MigrateUploadsToObjectStorage
SanityCheckError = Class.new(StandardError)
class Upload < ActiveRecord::Base
# Upper limit for foreground checksum processing
CHECKSUM_THRESHOLD = 100.megabytes
belongs_to :model, polymorphic: true # rubocop:disable Cop/PolymorphicAssociations
validates :size, presence: true
validates :path, presence: true
validates :model, presence: true
validates :uploader, presence: true
before_save :calculate_checksum!, if: :foreground_checksummable?
after_commit :schedule_checksum, if: :checksummable?
scope :stored_locally, -> { where(store: [nil, ObjectStorage::Store::LOCAL]) }
scope :stored_remotely, -> { where(store: ObjectStorage::Store::REMOTE) }
def self.hexdigest(path)
Digest::SHA256.file(path).hexdigest
end
def absolute_path
raise ObjectStorage::RemoteStoreError, "Remote object has no absolute path." unless local?
return path unless relative_path?
uploader_class.absolute_path(self)
end
def calculate_checksum!
self.checksum = nil
return unless checksummable?
self.checksum = self.class.hexdigest(absolute_path)
end
def build_uploader(mounted_as = nil)
uploader_class.new(model, mounted_as).tap do |uploader|
uploader.upload = self
uploader.retrieve_from_store!(identifier)
end
end
def exist?
File.exist?(absolute_path)
end
def local?
return true if store.nil?
store == ObjectStorage::Store::LOCAL
end
private
def checksummable?
checksum.nil? && local? && exist?
end
def foreground_checksummable?
checksummable? && size <= CHECKSUM_THRESHOLD
end
def schedule_checksum
UploadChecksumWorker.perform_async(id)
end
def relative_path?
!path.start_with?('/')
end
def identifier
File.basename(path)
end
def uploader_class
Object.const_get(uploader)
end
end
class MigrationResult
attr_reader :upload
attr_accessor :error
def initialize(upload, error = nil)
@upload, @error = upload, error
end
def success?
error.nil?
end
def to_s
success? ? "Migration sucessful." : "Error while migrating #{upload.id}: #{error.message}"
end
end
module Report
def report(results)
success, failures = Gitlab::Utils::BisectEnumerable.bisect(results, &:success?)
puts header(success, failures)
puts failures(failures)
end
def header(success, failures)
color = failures.count == 0 ? :green : :red
"Migrated #{success.count}/#{success.count + failures.count} files.".color(color)
end
def failures(failures)
failures.map { |f| "\t#{f}".color(:red) }.join('\n')
end
end
include Report
def self.enqueue!(uploads, mounted_as, to_store)
sanity_check!(uploads, mounted_as)
BackgroundMigrationWorker.perform_async('MigrateUploadsToObjectStorage', [uploads.ids, mounted_as, to_store])
#BackgroundMigration.perform('MigrateUploadsToObjectStorage', [uploads.map(&:id), mounted_as, to_store])
end
# We need to be sure all the uploads are for the same uploader and model type
# and that the mount point exists if provided.
#
def self.sanity_check!(uploads, mounted_as)
upload = uploads.first
uploader_class = upload.uploader.constantize
model_class = uploads.first.model_type.constantize
uploader_types = uploads.map(&:uploader).uniq
model_types = uploads.map(&:model_type).uniq
model_has_mount = mounted_as.nil? || model_class.uploaders[mounted_as] == uploader_class
raise(SanityCheckError, "Multiple uploaders found: #{uploader_types}") unless uploader_types.count == 1
raise(SanityCheckError, "Multiple model types found: #{model_types}") unless model_types.count == 1
raise(SanityCheckError, "Mount point #{mounted_as} not found in #{model_class}.") unless model_has_mount
end
def perform(ids, mounted_as, to_store)
@mounted_as = mounted_as
@to_store = to_store
uploads = Upload.preload(:model).where(id: ids)
sanity_check!(uploads)
results = migrate(uploads)
report(results)
rescue SanityCheckError
# do not retry if the job is insane
logger.warn "UploadsToObjectStorage sanity check error: #{e.message}"
end
def sanity_check!(uploads)
self.class.sanity_check!(uploads, @mounted_as)
end
def build_uploaders(uploads)
uploads.map { |upload| upload.build_uploader(@mounted_as) }
end
def migrate(batch_size, &block)
each_upload_batch(batch_size) do |batch|
results = build_uploaders(batch).map(&method(:process_uploader))
yield results # yield processed batch as [MigrationResult]
@results.concat(results)
end
end
def migrate(uploads)
build_uploaders(uploads).map(&method(:process_uploader))
end
def process_uploader(uploader)
result = MigrationResult.new(uploader.upload)
begin
uploader.migrate!(@to_store)
result
rescue => e
result.error = e
result
end
end
end
end
end
......@@ -2,117 +2,23 @@ require_relative 'helpers'
include UploadTaskHelpers
module UploadTask
module Migrate
class MigrationResult
attr_reader :upload
attr_accessor :error
def initialize(upload, error = nil)
@upload, @error = upload, error
end
def success?
error.nil?
end
def to_s
success? ? "Migration sucessful." : "Error while migrating #{upload.id}: #{error.message}"
end
end
class Reporter
def initialize(results = [])
@success, @failures = Gitlab::Utils::BisectEnumerable.bisect(results, &:success?)
end
def report
puts header
puts failures
end
def header
color = @failures.count == 0 ? :green : :red
"Migrated #{@success.count}/#{@success.count + @failures.count} files.".color(color)
end
def failures
@failures.map { |f| "\t#{f}".color(:red) }.join('\n')
end
end
class Migrator
attr_reader :to_store
def initialize(uploader_class, model_class, mounted_as, to_store)
@results = []
@uploader_class, @model_class = uploader_class, model_class
@mounted_as = mounted_as
@to_store = to_store
end
def build_uploaders(uploads)
uploads.map { |upload| upload.build_uploader(@mounted_as) }
end
def migrate(batch_size, &block)
each_upload_batch(batch_size) do |batch|
results = build_uploaders(batch).map(&method(:process_uploader))
yield results # yield processed batch as [MigrationResult]
@results.concat(results)
end
end
def report
Reporter.new(@results).report
end
def each_upload_batch(batch_size, &block)
Upload.preload(:model)
.where.not(store: @to_store)
.where(uploader: @uploader_class.to_s,
model_type: @model_class.to_s)
.in_batches(of: batch_size, &block) # rubocop: disable Cop/InBatches
end
def process_uploader(uploader)
result = MigrationResult.new(uploader.upload)
begin
uploader.migrate!(@to_store)
result
rescue => e
result.error = e
result
end
end
end
end
end
namespace :gitlab do
namespace :uploads do
desc 'GitLab | Uploads | Migrate the uploaded files to object storage'
task :migrate, [:uploader_class, :model_class, :mounted_as] => :environment do |task, args|
to_store = ObjectStorage::Store::REMOTE
uploader_class = args.uploader_class.constantize
model_class = args.model_class.constantize
mounted_as = args.mounted_as&.gsub(':', '')&.to_sym
model_class = args.model_class.constantize
mounted_as = args.mounted_as&.gsub(':', '')&.to_sym
migrator = UploadTask::Migrate::Migrator.new(
uploader_class,
model_class,
mounted_as,
ObjectStorage::Store::REMOTE
)
migrator.migrate(batch_size) do |results|
UploadTask::Migrate::Reporter.new(results).report
Upload
.where.not(store: to_store)
.where(uploader: uploader_class.to_s,
model_type: model_class.to_s)
.in_batches(of: batch_size) do |batch| # rubocop: disable Cop/InBatches
job = Gitlab::BackgroundMigration::MigrateUploadsToObjectStorage.enqueue!(batch, mounted_as, to_store)
puts "Enqueued job: #{job}"
end
puts "\n=== Migration summary ==="
migrator.report
end
end
end
require 'spec_helper'
describe Gitlab::BackgroundMigration::MigrateUploadsToObjectStorage, :sidekiq do
shared_context 'sanity_check! fails' do
before do
expect(described_class).to receive(:sanity_check!).and_raise(described_class::SanityCheckError)
end
end
let(:uploads) { Upload.all }
let(:mounted_as) { :avatar }
let(:to_store) { ObjectStorage::Store::REMOTE }
before do
stub_env('BATCH', 1)
stub_licensed_features(object_storage: true)
create_list(:upload, 5)
end
describe '.enqueue!' do
def enqueue!
described_class.enqueue!(uploads, mounted_as, to_store)
end
it 'is guarded by .sanity_check!' do
expect(described_class).to receive(:sanity_check!)
enqueue!
end
context 'sanity_check! fails' do
include_context 'sanity_check! fails'
it 'does not enqueue a job' do
expect(BackgroundMigrationWorker).not_to receive(:perform_async)
expect { enqueue! }.to raise_error(described_class::SanityCheckError)
end
end
end
describe '.sanity_check!' do
shared_examples 'raises a SanityCheckError' do
let(:mount_point) { nil }
it do
expect { described_class.sanity_check!(uploads, mount_point) }.to raise_error(described_class::SanityCheckError)
end
end
context 'uploader types mismatch' do
let!(:outlier) { create(:upload, uploader: 'FileUploader') }
include_examples 'raises a SanityCheckError'
end
context 'model types mismatch' do
let!(:outlier) { create(:upload, model_type: 'Potato') }
include_examples 'raises a SanityCheckError'
end
context 'mount point not found' do
include_examples 'raises a SanityCheckError' do
let(:mount_point) { :potato }
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment