Commit a22f6fa6 authored by Kamil Trzciński's avatar Kamil Trzciński

Merge branch 'fix/sm/atomic-migration' into 'master'

Fix migrate! method (Minimal fix with ExclusiveLock to prevent race conditions)

Closes #4928 and #4980

See merge request gitlab-org/gitlab-ee!4624
parent b4dc556c
......@@ -24,8 +24,7 @@ module RecordsUploads
uploads.where(path: upload_path).delete_all
upload.destroy! if upload
self.upload = build_upload
upload.save!
self.upload = build_upload.tap(&:save!)
end
end
......
......@@ -61,6 +61,39 @@ module ObjectStorage
end
end
# Add support for automatic background uploading after the file is stored.
#
module BackgroundMove
extend ActiveSupport::Concern
def background_upload(mount_points = [])
return unless mount_points.any?
run_after_commit do
mount_points.each { |mount| send(mount).schedule_background_upload } # rubocop:disable GitlabSecurity/PublicSend
end
end
def changed_mounts
self.class.uploaders.select do |mount, uploader_class|
mounted_as = uploader_class.serialization_column(self.class, mount)
uploader = send(:"#{mounted_as}") # rubocop:disable GitlabSecurity/PublicSend
next unless uploader
next unless uploader.exists?
next unless send(:"#{mounted_as}_changed?") # rubocop:disable GitlabSecurity/PublicSend
mount
end.keys
end
included do
after_save on: [:create, :update] do
background_upload(changed_mounts)
end
end
end
module Concern
extend ActiveSupport::Concern
......@@ -127,7 +160,7 @@ module ObjectStorage
return unless persist_object_store?
updated = model.update_column(store_serialization_column, object_store)
raise ActiveRecordError unless updated
raise 'Failed to update object store' unless updated
end
def use_file
......@@ -153,32 +186,12 @@ module ObjectStorage
# new_store: Enum (Store::LOCAL, Store::REMOTE)
#
def migrate!(new_store)
return unless object_store != new_store
return unless file
uuid = Gitlab::ExclusiveLease.new(exclusive_lease_key, timeout: 1.hour.to_i).try_obtain
raise 'Already running' unless uuid
new_file = nil
file_to_delete = file
from_object_store = object_store
self.object_store = new_store # changes the storage and file
cache_stored_file! if file_storage?
with_callbacks(:migrate, file_to_delete) do
with_callbacks(:store, file_to_delete) do # for #store_versions!
new_file = storage.store!(file)
persist_object_store!
self.file = new_file
end
end
file
rescue => e
# in case of failure delete new file
new_file.delete unless new_file.nil?
# revert back to the old file
self.object_store = from_object_store
self.file = file_to_delete
raise e
unsafe_migrate!(new_store)
ensure
Gitlab::ExclusiveLease.cancel(exclusive_lease_key, uuid)
end
def schedule_migration_to_object_storage(*args)
......@@ -261,5 +274,43 @@ module ObjectStorage
raise UnknownStoreError
end
end
def exclusive_lease_key
"object_storage_migrate:#{model.class}:#{model.id}"
end
#
# Move the file to another store
#
# new_store: Enum (Store::LOCAL, Store::REMOTE)
#
def unsafe_migrate!(new_store)
return unless object_store != new_store
return unless file
new_file = nil
file_to_delete = file
from_object_store = object_store
self.object_store = new_store # changes the storage and file
cache_stored_file! if file_storage?
with_callbacks(:migrate, file_to_delete) do
with_callbacks(:store, file_to_delete) do # for #store_versions!
new_file = storage.store!(file)
persist_object_store!
self.file = new_file
end
end
file
rescue => e
# in case of failure delete new file
new_file.delete unless new_file.nil?
# revert back to the old file
self.object_store = from_object_store
self.file = file_to_delete
raise e
end
end
end
......@@ -61,10 +61,24 @@ describe LfsObject do
end
it 'schedules the model for migration' do
expect(ObjectStorageUploadWorker).to receive(:perform_async).with('LfsObjectUploader', described_class.name, :file, kind_of(Numeric))
expect(ObjectStorage::BackgroundMoveWorker)
.to receive(:perform_async)
.with('LfsObjectUploader', described_class.name, :file, kind_of(Numeric))
.once
subject
end
it 'schedules the model for migration once' do
expect(ObjectStorage::BackgroundMoveWorker)
.to receive(:perform_async)
.with('LfsObjectUploader', described_class.name, :file, kind_of(Numeric))
.once
lfs_object = create(:lfs_object)
lfs_object.file = fixture_file_upload(Rails.root + "spec/fixtures/dk.png", "`/png")
lfs_object.save!
end
end
context 'when is unlicensed' do
......
......@@ -997,7 +997,7 @@ describe 'Git LFS API and storage' do
context 'and workhorse requests upload finalize for a new lfs object' do
before do
allow_any_instance_of(LfsObjectUploader).to receive(:exists?) { false }
lfs_object.destroy
end
context 'with object storage disabled' do
......
......@@ -20,6 +20,19 @@ shared_examples "migrates" do |to_store:, from_store: nil|
migrate(from)
end
it 'returns corresponding file type' do
expect(subject).to be_an(CarrierWave::Uploader::Base)
expect(subject).to be_a(ObjectStorage::Concern)
if from == described_class::Store::REMOTE
expect(subject.file).to be_a(CarrierWave::Storage::Fog::File)
elsif from == described_class::Store::LOCAL
expect(subject.file).to be_a(CarrierWave::SanitizedFile)
else
raise 'Unexpected file type'
end
end
it 'does nothing when migrating to the current store' do
expect { migrate(from) }.not_to change { subject.object_store }.from(from)
end
......@@ -38,6 +51,42 @@ shared_examples "migrates" do |to_store:, from_store: nil|
expect(File.exist?(original_file)).to be_falsey
end
it 'can access to the original file during migration' do
file = subject.file
allow(subject).to receive(:delete_migrated_file) { } # Remove as a callback of :migrate
allow(subject).to receive(:record_upload) { } # Remove as a callback of :store (:record_upload)
expect(file.exists?).to be_truthy
expect { migrate(to) }.not_to change { file.exists? }
end
context 'when migrate! is not oqqupied by another process' do
it 'executes migrate!' do
expect(subject).to receive(:object_store=).at_least(1)
migrate(to)
end
end
context 'when migrate! is occupied by another process' do
let(:exclusive_lease_key) { "object_storage_migrate:#{subject.model.class}:#{subject.model.id}" }
before do
@uuid = Gitlab::ExclusiveLease.new(exclusive_lease_key, timeout: 1.hour.to_i).try_obtain
end
it 'does not execute migrate!' do
expect(subject).not_to receive(:unsafe_migrate!)
expect { migrate(to) }.to raise_error('Already running')
end
after do
Gitlab::ExclusiveLease.cancel(exclusive_lease_key, @uuid)
end
end
context 'migration is unsuccessful' do
shared_examples "handles gracefully" do |error:|
it 'does not update the object_store' do
......
......@@ -67,4 +67,14 @@ describe JobArtifactUploader do
it { is_expected.to include("/#{job_artifact.job_id}/#{job_artifact.id}/") }
it { is_expected.to end_with("ci_build_artifacts.zip") }
end
describe "#migrate!" do
before do
uploader.store!(fixture_file_upload(Rails.root.join('spec/fixtures/trace/sample_trace')))
stub_artifacts_object_storage
end
it_behaves_like "migrates", to_store: described_class::Store::REMOTE
it_behaves_like "migrates", from_store: described_class::Store::REMOTE, to_store: described_class::Store::LOCAL
end
end
......@@ -128,6 +128,33 @@ describe ObjectStorage do
expect(uploader.object_store).to eq(uploader.upload.store)
end
end
describe '#migrate!' do
let(:new_store) { ObjectStorage::Store::REMOTE }
before do
stub_uploads_object_storage(uploader: AvatarUploader)
end
subject { uploader.migrate!(new_store) }
it 'persist @object_store to the recorded upload' do
subject
expect(uploader.upload.store).to eq(new_store)
end
describe 'fails' do
it 'is handled gracefully' do
store = uploader.object_store
expect_any_instance_of(Upload).to receive(:save!).and_raise("An error")
expect { subject }.to raise_error("An error")
expect(uploader.exists?).to be_truthy
expect(uploader.upload.store).to eq(store)
end
end
end
end
# this means the model holds an <mounted_as>_store attribute directly
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment