Commit fdc2417f authored by Alex Kalderimis's avatar Alex Kalderimis

Add concurrency tests to save design service spec

This verifies that services can be run in parallel. For this we add a
new concurrency helper to abstract the work or running parallel tasks.

This showed that we needed to move repository creation into the
synchronized block.
parent cc387763
......@@ -17,8 +17,6 @@ module DesignManagement
return error("Not allowed!") unless can_create_designs?
return error("Only #{MAX_FILES} files are allowed simultaneously") if files.size > MAX_FILES
repository.create_if_not_exists
uploaded_designs, version = upload_designs!
skipped_designs = designs - uploaded_designs
......@@ -36,6 +34,8 @@ module DesignManagement
def upload_designs!
::DesignManagement::Version.lock_for_creation(project.id) do
repository.create_if_not_exists
actions = build_actions
[actions.map(&:design), actions.presence && run_actions(actions)]
......
......@@ -3,6 +3,7 @@ require 'spec_helper'
describe Mutations::DesignManagement::Upload do
include DesignManagementTestHelpers
include ConcurrentHelpers
let(:issue) { create(:issue) }
let(:user) { issue.author }
......@@ -21,32 +22,6 @@ describe Mutations::DesignManagement::Upload do
mutation.resolve(project_path: project_path, iid: iid, files: files_to_upload)
end
def parallel(blocks)
thread_pool = Concurrent::FixedThreadPool.new(
[2, Concurrent.processor_count - 1].max, { max_queue: blocks.size }
)
opts = { executor: thread_pool }
error = Concurrent::MVar.new
blocks.map { |block| Concurrent::Future.execute(opts, &block) }.each do |future|
future.wait(20)
if future.complete?
error.put(future.reason) if future.reason && error.empty?
else
future.cancel
error.put(StandardError.new(:cancelled)) if error.empty?
end
end
raise error.take if error.full?
ensure
thread_pool.shutdown
thread_pool.wait_for_termination(10)
thread_pool.kill if thread_pool.running?
end
describe "#resolve" do
let(:files) { [fixture_file_upload('spec/fixtures/dk.png')] }
......@@ -92,7 +67,7 @@ describe Mutations::DesignManagement::Upload do
describe 'running requests in parallel' do
it 'does not cause errors' do
creates_designs do
parallel(files.map { |f| -> { run_mutation([f]) } })
run_parallel(files.map { |f| -> { run_mutation([f]) } })
end
end
end
......@@ -106,7 +81,7 @@ describe Mutations::DesignManagement::Upload do
-> { run_mutation([f], i.project.full_path, i.iid) }
end
parallel(blocks)
run_parallel(blocks)
end
end
end
......
......@@ -3,6 +3,7 @@ require 'spec_helper'
describe DesignManagement::SaveDesignsService do
include DesignManagementTestHelpers
include ConcurrentHelpers
let_it_be(:developer) { create(:user) }
let(:project) { issue.project }
......@@ -98,6 +99,17 @@ describe DesignManagement::SaveDesignsService do
)
end
it 'can run the same command in parallel' do
blocks = Array.new(10).map do
wrapped_files = files.map { |f| Gitlab::FileUpload.new(f) }
wrapped_files.each { |f| f.original_filename = generate(:jpeg_file) }
-> { run_service(wrapped_files) }
end
expect { run_parallel(blocks) }.to change(DesignManagement::Version, :count).by(10)
end
it 'causes diff_refs not to be nil' do
expect(response).to include(
designs: all(have_attributes(diff_refs: be_present))
......
# frozen_string_literal: true
module ConcurrentHelpers
Cancelled = Class.new(StandardError)
# To test for contention, we may need to run some actions in parallel. This
# helper takes an array of blocks and schedules them all on different threads
# in a fixed-size thread pool.
#
# @param [Array[Proc]] blocks
# @param [Integer] task_wait_time: time to wait for each task (upper bound on
# reasonable task execution time)
# @param [Integer] max_concurrency: maximum number of tasks to run at once
#
def run_parallel(blocks, task_wait_time: 20.seconds, max_concurrency: Concurrent.processor_count - 1)
thread_pool = Concurrent::FixedThreadPool.new(
[2, max_concurrency].max, { max_queue: blocks.size }
)
opts = { executor: thread_pool }
error = Concurrent::MVar.new
blocks.map { |block| Concurrent::Future.execute(opts, &block) }.each do |future|
future.wait(task_wait_time)
if future.complete?
error.put(future.reason) if future.reason && error.empty?
else
future.cancel
error.put(Cancelled.new) if error.empty?
end
end
raise error.take if error.full?
ensure
thread_pool.shutdown
thread_pool.wait_for_termination(10)
thread_pool.kill if thread_pool.running?
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment