Commit 632ce4dd authored by Robert Speicher's avatar Robert Speicher

Merge branch 'ajk-31894-multiple-design-uploads' into 'master'

#31894 - prevent concurrent design management git repo access

See merge request gitlab-org/gitlab!29526
parents 82587df6 c17cc2de
......@@ -4,7 +4,9 @@ module DesignManagement
class Version < ApplicationRecord
include Importable
include ShaAttribute
include AfterCommitQueue
include Gitlab::Utils::StrongMemoize
extend Gitlab::ExclusiveLeaseHelpers
NotSameIssue = Class.new(StandardError)
......@@ -64,6 +66,10 @@ module DesignManagement
# designs, and not being able to add designs without a saved version. Also this
# method inserts designs in bulk, rather than one by one.
#
# Before calling this method, callers must guard against concurrent
# modification by obtaining the lock on the design repository. See:
# `DesignManagement::Version.with_lock`.
#
# Parameters:
# - design_actions [DesignManagement::DesignAction]:
# the actions that have been performed in the repository.
......@@ -93,6 +99,18 @@ module DesignManagement
raise CouldNotCreateVersion.new(sha, issue_id, design_actions)
end
CREATION_TTL = 5.seconds
RETRY_DELAY = ->(num) { 0.2.seconds * num**2 }
def self.with_lock(project_id, repository, &block)
key = "with_lock:#{name}:{#{project_id}}"
in_lock(key, ttl: CREATION_TTL, retries: 5, sleep_sec: RETRY_DELAY) do |_retried|
repository.create_if_not_exists
yield
end
end
def designs_by_event
actions
.includes(:design)
......
......@@ -14,8 +14,7 @@ module DesignManagement
def execute
return error('Forbidden!') unless can_delete_designs?
actions = build_actions
version = run_actions(actions)
version = delete_designs!
# Create a Geo event so changes will be replicated to secondary node(s)
repository.log_geo_updated_event
......@@ -37,6 +36,12 @@ module DesignManagement
attr_reader :designs
def delete_designs!
DesignManagement::Version.with_lock(project.id, repository) do
run_actions(build_actions)
end
end
def can_delete_designs?
Ability.allowed?(current_user, :destroy_design, issue)
end
......
......@@ -7,17 +7,29 @@ module DesignManagement
# this concern requires the following methods to be implemented:
# current_user, target_branch, repository, commit_message
#
# Before calling `run_actions`, you should ensure the repository exists, by
# calling `repository.create_if_not_exists`.
#
# @raise [NoActions] if actions are empty
def run_actions(actions)
raise NoActions if actions.empty?
repository.create_if_not_exists
sha = repository.multi_action(current_user,
branch_name: target_branch,
message: commit_message,
actions: actions.map(&:gitaly_action))
::DesignManagement::Version.create_for_designs(actions, sha, current_user)
::DesignManagement::Version
.create_for_designs(actions, sha, current_user)
.tap { |version| post_process(version) }
end
private
def post_process(version)
version.run_after_commit_or_now do
::DesignManagement::NewVersionWorker.perform_async(id)
end
end
end
end
......@@ -17,15 +17,13 @@ module DesignManagement
return error("Not allowed!") unless can_create_designs?
return error("Only #{MAX_FILES} files are allowed simultaneously") if files.size > MAX_FILES
repository.create_if_not_exists
uploaded_designs = upload_designs!
uploaded_designs, version = upload_designs!
skipped_designs = designs - uploaded_designs
# Create a Geo event so changes will be replicated to secondary node(s)
repository.log_geo_updated_event
success({ designs: uploaded_designs, skipped_designs: skipped_designs })
success({ designs: uploaded_designs, version: version, skipped_designs: skipped_designs })
rescue ::ActiveRecord::RecordInvalid => e
error(e.message)
end
......@@ -35,13 +33,11 @@ module DesignManagement
attr_reader :files
def upload_designs!
::DesignManagement::Version.with_lock(project.id, repository) do
actions = build_actions
return [] if actions.empty?
version = run_actions(actions)
::DesignManagement::NewVersionWorker.perform_async(version.id)
actions.map(&:design)
[actions.map(&:design), actions.presence && run_actions(actions)]
end
end
# Returns `Design` instances that correspond with `files`.
......
......@@ -3,6 +3,7 @@ require 'spec_helper'
describe Mutations::DesignManagement::Upload do
include DesignManagementTestHelpers
include ConcurrentHelpers
let(:issue) { create(:issue) }
let(:user) { issue.author }
......@@ -12,6 +13,11 @@ describe Mutations::DesignManagement::Upload do
described_class.new(object: nil, context: { current_user: user }, field: nil)
end
def run_mutation(files_to_upload = files, project_path = project.full_path, iid = issue.iid)
mutation = described_class.new(object: nil, context: { current_user: user }, field: nil)
mutation.resolve(project_path: project_path, iid: iid, files: files_to_upload)
end
describe "#resolve" do
let(:files) { [fixture_file_upload('spec/fixtures/dk.png')] }
......@@ -34,6 +40,59 @@ describe Mutations::DesignManagement::Upload do
enable_design_management
end
describe 'contention in the design repo' do
before do
issue.design_collection.repository.create_if_not_exists
end
let(:files) do
['dk.png', 'rails_sample.jpg', 'banana_sample.gif']
.cycle
.take(Concurrent.processor_count * 2)
.map { |f| RenameableUpload.unique_file(f) }
end
def creates_designs
prior_count = DesignManagement::Design.count
expect { yield }.not_to raise_error
expect(DesignManagement::Design.count).to eq(prior_count + files.size)
end
describe 'running requests in parallel' do
it 'does not cause errors' do
creates_designs do
run_parallel(files.map { |f| -> { run_mutation([f]) } })
end
end
end
describe 'running requests in parallel on different issues' do
it 'does not cause errors' do
creates_designs do
issues = create_list(:issue, files.size, author: user)
issues.each { |i| i.project.add_developer(user) }
blocks = files.zip(issues).map do |(f, i)|
-> { run_mutation([f], i.project.full_path, i.iid) }
end
run_parallel(blocks)
end
end
end
describe 'running requests in serial' do
it 'does not cause errors' do
creates_designs do
files.each do |f|
run_mutation([f])
end
end
end
end
end
context "when the user is not allowed to upload designs" do
let(:user) { create(:user) }
......
......@@ -91,6 +91,12 @@ describe DesignManagement::DeleteDesignsService do
expect { run_service }.to change { counter.read(:delete) }.by(1)
end
it 'informs the new-version-worker' do
expect(::DesignManagement::NewVersionWorker).to receive(:perform_async).with(Integer)
run_service
end
it 'creates a new verison' do
expect { run_service }.to change { DesignManagement::Version.where(issue: issue).count }.by(1)
end
......
......@@ -3,6 +3,7 @@ require 'spec_helper'
describe DesignManagement::SaveDesignsService do
include DesignManagementTestHelpers
include ConcurrentHelpers
let_it_be(:developer) { create(:user) }
let(:project) { issue.project }
......@@ -20,18 +21,6 @@ describe DesignManagement::SaveDesignsService do
before do
project.add_developer(developer)
allow(::DesignManagement::NewVersionWorker).to receive(:perform_async)
end
RSpec::Matchers.define :enqueue_worker do
match do |action|
expect(::DesignManagement::NewVersionWorker)
.to receive(:perform_async).once.with(Integer)
action.call
end
supports_block_expectations
end
def run_service(files_to_upload = nil)
......@@ -109,6 +98,17 @@ describe DesignManagement::SaveDesignsService do
)
end
it 'can run the same command in parallel' do
blocks = Array.new(10).map do
unique_files = %w(rails_sample.jpg dk.png)
.map { |name| RenameableUpload.unique_file(name) }
-> { run_service(unique_files) }
end
expect { run_parallel(blocks) }.to change(DesignManagement::Version, :count).by(10)
end
it 'causes diff_refs not to be nil' do
expect(response).to include(
designs: all(have_attributes(diff_refs: be_present))
......@@ -237,7 +237,10 @@ describe DesignManagement::SaveDesignsService do
end
it 'enqueues just one new version worker' do
expect { run_service }.to enqueue_worker
expect(::DesignManagement::NewVersionWorker)
.to receive(:perform_async).once.with(Integer)
run_service
end
end
......@@ -260,7 +263,10 @@ describe DesignManagement::SaveDesignsService do
end
it 'enqueues a new version worker' do
expect { run_service }.to enqueue_worker
expect(::DesignManagement::NewVersionWorker)
.to receive(:perform_async).once.with(Integer)
run_service
end
it 'creates a single commit' do
......@@ -272,7 +278,8 @@ describe DesignManagement::SaveDesignsService do
expect { run_service }.to change { commit_count.call }.by(1)
end
it 'only does 4 gitaly calls', :request_store, :sidekiq_might_not_need_inline do
it 'only does 5 gitaly calls', :request_store, :sidekiq_might_not_need_inline do
allow(::DesignManagement::NewVersionWorker).to receive(:perform_async).with(Integer)
service = described_class.new(project, user, issue: issue, files: files)
# Some unrelated calls that are usually cached or happen only once
service.__send__(:repository).create_if_not_exists
......
......@@ -16,11 +16,15 @@ module Gitlab
lease = Gitlab::ExclusiveLease.new(key, timeout: ttl)
retried = false
max_attempts = 1 + retries
until uuid = lease.try_obtain
# Keep trying until we obtain the lease. To prevent hammering Redis too
# much we'll wait for a bit.
sleep(sleep_sec)
attempt_number = max_attempts - retries
delay = sleep_sec.respond_to?(:call) ? sleep_sec.call(attempt_number) : sleep_sec
sleep(delay)
(retries -= 1) < 0 ? break : retried ||= true
end
......
......@@ -82,10 +82,22 @@ describe Gitlab::ExclusiveLeaseHelpers, :clean_gitlab_redis_shared_state do
end
context 'when sleep second is specified' do
let(:options) { { retries: 0, sleep_sec: 0.05.seconds } }
let(:options) { { retries: 1, sleep_sec: 0.05.seconds } }
it 'receives the specified argument' do
expect(class_instance).to receive(:sleep).with(0.05.seconds).once
expect(class_instance).to receive(:sleep).with(0.05.seconds).twice
expect { subject }.to raise_error('Failed to obtain a lock')
end
end
context 'when sleep second is specified as a lambda' do
let(:options) { { retries: 2, sleep_sec: ->(num) { 0.1 + num } } }
it 'receives the specified argument' do
expect(class_instance).to receive(:sleep).with(1.1.seconds).once
expect(class_instance).to receive(:sleep).with(2.1.seconds).once
expect(class_instance).to receive(:sleep).with(3.1.seconds).once
expect { subject }.to raise_error('Failed to obtain a lock')
end
......
# frozen_string_literal: true
module ConcurrentHelpers
Cancelled = Class.new(StandardError)
# To test for contention, we may need to run some actions in parallel. This
# helper takes an array of blocks and schedules them all on different threads
# in a fixed-size thread pool.
#
# @param [Array[Proc]] blocks
# @param [Integer] task_wait_time: time to wait for each task (upper bound on
# reasonable task execution time)
# @param [Integer] max_concurrency: maximum number of tasks to run at once
#
def run_parallel(blocks, task_wait_time: 20.seconds, max_concurrency: Concurrent.processor_count - 1)
thread_pool = Concurrent::FixedThreadPool.new(
[2, max_concurrency].max, { max_queue: blocks.size }
)
opts = { executor: thread_pool }
error = Concurrent::MVar.new
blocks.map { |block| Concurrent::Future.execute(opts, &block) }.each do |future|
future.wait(task_wait_time)
if future.complete?
error.put(future.reason) if future.reason && error.empty?
else
future.cancel
error.put(Cancelled.new) if error.empty?
end
end
raise error.take if error.full?
ensure
thread_pool.shutdown
thread_pool.wait_for_termination(10)
thread_pool.kill if thread_pool.running?
end
end
# frozen_string_literal: true
class RenameableUpload < SimpleDelegator
attr_accessor :original_filename
# Get a fixture file with a new unique name, and the same extension
def self.unique_file(name)
upload = new(fixture_file_upload("spec/fixtures/#{name}"))
ext = File.extname(name)
new_name = File.basename(FactoryBot.generate(:filename), '.*')
upload.original_filename = new_name + ext
upload
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment