Commit 82b9cf7b authored by Alex Kalderimis's avatar Alex Kalderimis

Add distributed lock around design management mutations

This adds a distributed lock managed in Redis around all new
design management version creations, in order to prevent concurrent
commits to master in the design repository for each project.

The lock is acquired scoped to the project ID, meaning different
projects can take different locks concurrently, but design
uploads/deletions in the same repository block each other. Each mutation
is given 5 seconds of exclusive lock time to run, and an exponential
back-off is used to handle backpressure.

ExclusiveLeaseHelpers is modified to support exponential back-off
parent fa449937
...@@ -5,6 +5,7 @@ module DesignManagement ...@@ -5,6 +5,7 @@ module DesignManagement
include Importable include Importable
include ShaAttribute include ShaAttribute
include Gitlab::Utils::StrongMemoize include Gitlab::Utils::StrongMemoize
extend Gitlab::ExclusiveLeaseHelpers
NotSameIssue = Class.new(StandardError) NotSameIssue = Class.new(StandardError)
...@@ -93,6 +94,17 @@ module DesignManagement ...@@ -93,6 +94,17 @@ module DesignManagement
raise CouldNotCreateVersion.new(sha, issue_id, design_actions) raise CouldNotCreateVersion.new(sha, issue_id, design_actions)
end end
CREATION_TTL = 5.seconds
RETRY_DELAY = ->(num) { 0.2.seconds * num**2 }
def self.lock_for_creation(project_id, &block)
key = "lock_for_creation:#{name}:{#{project_id}}"
in_lock(key, ttl: CREATION_TTL, retries: 5, sleep_sec: RETRY_DELAY) do |_retried|
yield
end
end
def designs_by_event def designs_by_event
actions actions
.includes(:design) .includes(:design)
......
...@@ -14,8 +14,11 @@ module DesignManagement ...@@ -14,8 +14,11 @@ module DesignManagement
def execute def execute
return error('Forbidden!') unless can_delete_designs? return error('Forbidden!') unless can_delete_designs?
actions = build_actions version = delete_designs!
version = run_actions(actions)
version.run_after_commit do
::DesignManagement::NewVersionWorker.perform_async(version.id)
end
# Create a Geo event so changes will be replicated to secondary node(s) # Create a Geo event so changes will be replicated to secondary node(s)
repository.log_geo_updated_event repository.log_geo_updated_event
...@@ -37,6 +40,12 @@ module DesignManagement ...@@ -37,6 +40,12 @@ module DesignManagement
attr_reader :designs attr_reader :designs
def delete_designs!
DesignManagement::Version.lock_for_creation(project.id) do
run_actions(build_actions)
end
end
def can_delete_designs? def can_delete_designs?
Ability.allowed?(current_user, :destroy_design, issue) Ability.allowed?(current_user, :destroy_design, issue)
end end
......
...@@ -35,12 +35,24 @@ module DesignManagement ...@@ -35,12 +35,24 @@ module DesignManagement
attr_reader :files attr_reader :files
def upload_designs! def upload_designs!
actions = build_actions # puts "Waiting [#{Thread.current.object_id}]"
return [] if actions.empty? actions = ::DesignManagement::Version.lock_for_creation(project.id) do
# puts ". Building [#{Thread.current.object_id}]"
version = run_actions(actions) actions = build_actions
::DesignManagement::NewVersionWorker.perform_async(version.id) # if actions.empty?
# puts ".. Skipping [#{Thread.current.object_id}]"
# else
if actions.present?
# puts ".. Running [#{Thread.current.object_id}]"
version = run_actions(actions)
version.run_after_commit do
::DesignManagement::NewVersionWorker.perform_async(version.id)
end
end
actions
end
# puts "Done [#{Thread.current.object_id}]"
actions.map(&:design) actions.map(&:design)
end end
......
...@@ -12,9 +12,39 @@ describe Mutations::DesignManagement::Upload do ...@@ -12,9 +12,39 @@ describe Mutations::DesignManagement::Upload do
described_class.new(object: nil, context: { current_user: user }, field: nil) described_class.new(object: nil, context: { current_user: user }, field: nil)
end end
def run_mutation(fs = files) def unique_file(upload)
::Gitlab::FileUpload.new(upload).tap { |f| f.original_filename = generate(:jpeg_file) }
end
def run_mutation(files_to_upload = files, project_path = project.full_path, iid = issue.iid)
mutation = described_class.new(object: nil, context: { current_user: user }, field: nil) mutation = described_class.new(object: nil, context: { current_user: user }, field: nil)
mutation.resolve(project_path: project.full_path, iid: issue.iid, files: fs) mutation.resolve(project_path: project_path, iid: iid, files: files_to_upload)
end
def parallel(blocks)
thread_pool = Concurrent::FixedThreadPool.new(
[2, Concurrent.processor_count - 1].max, { max_queue: blocks.size }
)
opts = { executor: thread_pool }
error = Concurrent::MVar.new
blocks.map { |block| Concurrent::Future.execute(opts, &block) }.each do |future|
future.wait(20)
if future.complete?
error.put(future.reason) if future.reason && error.empty?
else
future.cancel
error.put(StandardError.new(:cancelled)) if error.empty?
end
end
raise error.take if error.full?
ensure
thread_pool.shutdown
thread_pool.wait_for_termination(10)
thread_pool.kill if thread_pool.running?
end end
describe "#resolve" do describe "#resolve" do
...@@ -40,32 +70,54 @@ describe Mutations::DesignManagement::Upload do ...@@ -40,32 +70,54 @@ describe Mutations::DesignManagement::Upload do
end end
describe 'contention in the design repo' do describe 'contention in the design repo' do
before do
issue.design_collection.repository.create_if_not_exists
end
let(:files) do let(:files) do
[ ['dk.png', 'rails_sample.jpg', 'banana_sample.gif']
fixture_file_upload('spec/fixtures/dk.png'), .cycle
fixture_file_upload('spec/fixtures/rails_sample.jpg'), .take(Concurrent.processor_count * 2)
fixture_file_upload('spec/fixtures/banana_sample.gif') .map { |f| unique_file(fixture_file_upload("spec/fixtures/#{f}")) }
].cycle(20).to_a end
def creates_designs
prior_count = DesignManagement::Design.count
expect { yield }.not_to raise_error
expect(DesignManagement::Design.count).to eq(prior_count + files.size)
end end
describe 'running requests in parallel' do describe 'running requests in parallel' do
it 'does not cause errors' do it 'does not cause errors' do
expect do creates_designs do
threads = files.map do |f| parallel(files.map { |f| -> { run_mutation([f]) } })
Thread.new { run_mutation([f]) } end
end
end
describe 'running requests in parallel on different issues' do
it 'does not cause errors' do
creates_designs do
issues = create_list(:issue, files.size, author: user)
issues.each { |i| i.project.add_developer(user) }
blocks = files.zip(issues).map do |(f, i)|
-> { run_mutation([f], i.project.full_path, i.iid) }
end end
threads.each(&:join)
end.not_to raise_error parallel(blocks)
end
end end
end end
describe 'running requests in serial' do describe 'running requests in serial' do
it 'does not cause errors' do it 'does not cause errors' do
expect do creates_designs do
files.each do |f| files.each do |f|
run_mutation([f]) run_mutation([f])
end end
end.not_to raise_error end
end end
end end
end end
......
...@@ -16,11 +16,15 @@ module Gitlab ...@@ -16,11 +16,15 @@ module Gitlab
lease = Gitlab::ExclusiveLease.new(key, timeout: ttl) lease = Gitlab::ExclusiveLease.new(key, timeout: ttl)
retried = false retried = false
max_attempts = 1 + retries
until uuid = lease.try_obtain until uuid = lease.try_obtain
# Keep trying until we obtain the lease. To prevent hammering Redis too # Keep trying until we obtain the lease. To prevent hammering Redis too
# much we'll wait for a bit. # much we'll wait for a bit.
sleep(sleep_sec) attempt_number = max_attempts - retries
delay = sleep_sec.respond_to?(:call) ? sleep_sec.call(attempt_number) : sleep_sec
sleep(delay)
(retries -= 1) < 0 ? break : retried ||= true (retries -= 1) < 0 ? break : retried ||= true
end end
......
...@@ -82,10 +82,22 @@ describe Gitlab::ExclusiveLeaseHelpers, :clean_gitlab_redis_shared_state do ...@@ -82,10 +82,22 @@ describe Gitlab::ExclusiveLeaseHelpers, :clean_gitlab_redis_shared_state do
end end
context 'when sleep second is specified' do context 'when sleep second is specified' do
let(:options) { { retries: 0, sleep_sec: 0.05.seconds } } let(:options) { { retries: 1, sleep_sec: 0.05.seconds } }
it 'receives the specified argument' do it 'receives the specified argument' do
expect(class_instance).to receive(:sleep).with(0.05.seconds).once expect(class_instance).to receive(:sleep).with(0.05.seconds).twice
expect { subject }.to raise_error('Failed to obtain a lock')
end
end
context 'when sleep second is specified as a lambda' do
let(:options) { { retries: 2, sleep_sec: ->(num) { 0.1 + num } } }
it 'receives the specified argument' do
expect(class_instance).to receive(:sleep).with(1.1.seconds).once
expect(class_instance).to receive(:sleep).with(2.1.seconds).once
expect(class_instance).to receive(:sleep).with(3.1.seconds).once
expect { subject }.to raise_error('Failed to obtain a lock') expect { subject }.to raise_error('Failed to obtain a lock')
end end
......
# frozen_string_literal: true
class Gitlab::FileUpload < SimpleDelegator
attr_accessor :original_filename
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment