Commit 2e57be51 authored by Kamil Trzciński's avatar Kamil Trzciński

Merge branch 'fix/sm/atomic-migration' into 'master'

Fix migrate! method (Minimal fix with ExclusiveLock to prevent race conditions)

Closes #4928 and #4980

See merge request gitlab-org/gitlab-ee!4624
parents 9c0e4387 588937ff
...@@ -24,8 +24,7 @@ module RecordsUploads ...@@ -24,8 +24,7 @@ module RecordsUploads
uploads.where(path: upload_path).delete_all uploads.where(path: upload_path).delete_all
upload.destroy! if upload upload.destroy! if upload
self.upload = build_upload self.upload = build_upload.tap(&:save!)
upload.save!
end end
end end
......
...@@ -88,7 +88,13 @@ module ObjectStorage ...@@ -88,7 +88,13 @@ module ObjectStorage
def changed_mounts def changed_mounts
self.class.uploaders.select do |mount, uploader_class| self.class.uploaders.select do |mount, uploader_class|
mounted_as = uploader_class.serialization_column(self.class, mount) mounted_as = uploader_class.serialization_column(self.class, mount)
mount if send(:"#{mounted_as}_changed?") # rubocop:disable GitlabSecurity/PublicSend uploader = send(:"#{mounted_as}") # rubocop:disable GitlabSecurity/PublicSend
next unless uploader
next unless uploader.exists?
next unless send(:"#{mounted_as}_changed?") # rubocop:disable GitlabSecurity/PublicSend
mount
end.keys end.keys
end end
...@@ -169,7 +175,7 @@ module ObjectStorage ...@@ -169,7 +175,7 @@ module ObjectStorage
return unless persist_object_store? return unless persist_object_store?
updated = model.update_column(store_serialization_column, object_store) updated = model.update_column(store_serialization_column, object_store)
raise ActiveRecordError unless updated raise 'Failed to update object store' unless updated
end end
def use_file def use_file
...@@ -195,32 +201,12 @@ module ObjectStorage ...@@ -195,32 +201,12 @@ module ObjectStorage
# new_store: Enum (Store::LOCAL, Store::REMOTE) # new_store: Enum (Store::LOCAL, Store::REMOTE)
# #
def migrate!(new_store) def migrate!(new_store)
return unless object_store != new_store uuid = Gitlab::ExclusiveLease.new(exclusive_lease_key, timeout: 1.hour.to_i).try_obtain
return unless file raise 'Already running' unless uuid
new_file = nil
file_to_delete = file
from_object_store = object_store
self.object_store = new_store # changes the storage and file
cache_stored_file! if file_storage?
with_callbacks(:migrate, file_to_delete) do
with_callbacks(:store, file_to_delete) do # for #store_versions!
new_file = storage.store!(file)
persist_object_store!
self.file = new_file
end
end
file unsafe_migrate!(new_store)
rescue => e ensure
# in case of failure delete new file Gitlab::ExclusiveLease.cancel(exclusive_lease_key, uuid)
new_file.delete unless new_file.nil?
# revert back to the old file
self.object_store = from_object_store
self.file = file_to_delete
raise e
end end
def schedule_background_upload(*args) def schedule_background_upload(*args)
...@@ -310,5 +296,43 @@ module ObjectStorage ...@@ -310,5 +296,43 @@ module ObjectStorage
raise UnknownStoreError raise UnknownStoreError
end end
end end
def exclusive_lease_key
"object_storage_migrate:#{model.class}:#{model.id}"
end
#
# Move the file to another store
#
# new_store: Enum (Store::LOCAL, Store::REMOTE)
#
def unsafe_migrate!(new_store)
return unless object_store != new_store
return unless file
new_file = nil
file_to_delete = file
from_object_store = object_store
self.object_store = new_store # changes the storage and file
cache_stored_file! if file_storage?
with_callbacks(:migrate, file_to_delete) do
with_callbacks(:store, file_to_delete) do # for #store_versions!
new_file = storage.store!(file)
persist_object_store!
self.file = new_file
end
end
file
rescue => e
# in case of failure delete new file
new_file.delete unless new_file.nil?
# revert back to the old file
self.object_store = from_object_store
self.file = file_to_delete
raise e
end
end end
end end
...@@ -61,10 +61,24 @@ describe LfsObject do ...@@ -61,10 +61,24 @@ describe LfsObject do
end end
it 'schedules the model for migration' do it 'schedules the model for migration' do
expect(ObjectStorage::BackgroundMoveWorker).to receive(:perform_async).with('LfsObjectUploader', described_class.name, :file, kind_of(Numeric)) expect(ObjectStorage::BackgroundMoveWorker)
.to receive(:perform_async)
.with('LfsObjectUploader', described_class.name, :file, kind_of(Numeric))
.once
subject subject
end end
it 'schedules the model for migration once' do
expect(ObjectStorage::BackgroundMoveWorker)
.to receive(:perform_async)
.with('LfsObjectUploader', described_class.name, :file, kind_of(Numeric))
.once
lfs_object = create(:lfs_object)
lfs_object.file = fixture_file_upload(Rails.root + "spec/fixtures/dk.png", "`/png")
lfs_object.save!
end
end end
context 'when is unlicensed' do context 'when is unlicensed' do
......
...@@ -20,6 +20,19 @@ shared_examples "migrates" do |to_store:, from_store: nil| ...@@ -20,6 +20,19 @@ shared_examples "migrates" do |to_store:, from_store: nil|
migrate(from) migrate(from)
end end
it 'returns corresponding file type' do
expect(subject).to be_an(CarrierWave::Uploader::Base)
expect(subject).to be_a(ObjectStorage::Concern)
if from == described_class::Store::REMOTE
expect(subject.file).to be_a(CarrierWave::Storage::Fog::File)
elsif from == described_class::Store::LOCAL
expect(subject.file).to be_a(CarrierWave::SanitizedFile)
else
raise 'Unexpected file type'
end
end
it 'does nothing when migrating to the current store' do it 'does nothing when migrating to the current store' do
expect { migrate(from) }.not_to change { subject.object_store }.from(from) expect { migrate(from) }.not_to change { subject.object_store }.from(from)
end end
...@@ -38,6 +51,42 @@ shared_examples "migrates" do |to_store:, from_store: nil| ...@@ -38,6 +51,42 @@ shared_examples "migrates" do |to_store:, from_store: nil|
expect(File.exist?(original_file)).to be_falsey expect(File.exist?(original_file)).to be_falsey
end end
it 'can access to the original file during migration' do
file = subject.file
allow(subject).to receive(:delete_migrated_file) { } # Remove as a callback of :migrate
allow(subject).to receive(:record_upload) { } # Remove as a callback of :store (:record_upload)
expect(file.exists?).to be_truthy
expect { migrate(to) }.not_to change { file.exists? }
end
context 'when migrate! is not oqqupied by another process' do
it 'executes migrate!' do
expect(subject).to receive(:object_store=).at_least(1)
migrate(to)
end
end
context 'when migrate! is occupied by another process' do
let(:exclusive_lease_key) { "object_storage_migrate:#{subject.model.class}:#{subject.model.id}" }
before do
@uuid = Gitlab::ExclusiveLease.new(exclusive_lease_key, timeout: 1.hour.to_i).try_obtain
end
it 'does not execute migrate!' do
expect(subject).not_to receive(:unsafe_migrate!)
expect { migrate(to) }.to raise_error('Already running')
end
after do
Gitlab::ExclusiveLease.cancel(exclusive_lease_key, @uuid)
end
end
context 'migration is unsuccessful' do context 'migration is unsuccessful' do
shared_examples "handles gracefully" do |error:| shared_examples "handles gracefully" do |error:|
it 'does not update the object_store' do it 'does not update the object_store' do
......
...@@ -128,6 +128,33 @@ describe ObjectStorage do ...@@ -128,6 +128,33 @@ describe ObjectStorage do
expect(uploader.object_store).to eq(uploader.upload.store) expect(uploader.object_store).to eq(uploader.upload.store)
end end
end end
describe '#migrate!' do
let(:new_store) { ObjectStorage::Store::REMOTE }
before do
stub_uploads_object_storage(uploader: AvatarUploader)
end
subject { uploader.migrate!(new_store) }
it 'persist @object_store to the recorded upload' do
subject
expect(uploader.upload.store).to eq(new_store)
end
describe 'fails' do
it 'is handled gracefully' do
store = uploader.object_store
expect_any_instance_of(Upload).to receive(:save!).and_raise("An error")
expect { subject }.to raise_error("An error")
expect(uploader.exists?).to be_truthy
expect(uploader.upload.store).to eq(store)
end
end
end
end end
# this means the model holds an <mounted_as>_store attribute directly # this means the model holds an <mounted_as>_store attribute directly
......
...@@ -1025,7 +1025,7 @@ describe 'Git LFS API and storage' do ...@@ -1025,7 +1025,7 @@ describe 'Git LFS API and storage' do
context 'and workhorse requests upload finalize for a new lfs object' do context 'and workhorse requests upload finalize for a new lfs object' do
before do before do
allow_any_instance_of(LfsObjectUploader).to receive(:exists?) { false } lfs_object.destroy
end end
context 'with object storage disabled' do context 'with object storage disabled' do
......
...@@ -67,4 +67,14 @@ describe JobArtifactUploader do ...@@ -67,4 +67,14 @@ describe JobArtifactUploader do
it { is_expected.to include("/#{job_artifact.job_id}/#{job_artifact.id}/") } it { is_expected.to include("/#{job_artifact.job_id}/#{job_artifact.id}/") }
it { is_expected.to end_with("ci_build_artifacts.zip") } it { is_expected.to end_with("ci_build_artifacts.zip") }
end end
describe "#migrate!" do
before do
uploader.store!(fixture_file_upload(Rails.root.join('spec/fixtures/trace/sample_trace')))
stub_artifacts_object_storage
end
it_behaves_like "migrates", to_store: described_class::Store::REMOTE
it_behaves_like "migrates", from_store: described_class::Store::REMOTE, to_store: described_class::Store::LOCAL
end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment