Commit 10d1a32a authored by Dylan Griffith's avatar Dylan Griffith

Merge branch '343047-wrap-bg-migration-methods-with-shared-connection' into 'master'

Setup shared connection for BG migration methods

See merge request gitlab-org/gitlab!74016
parents 139deecb e2e5775b
......@@ -36,38 +36,42 @@ module Gitlab
end
def steal(steal_class, retry_dead_jobs: false)
queues = [
Sidekiq::ScheduledSet.new,
Sidekiq::Queue.new(self.queue)
]
if retry_dead_jobs
queues << Sidekiq::RetrySet.new
queues << Sidekiq::DeadSet.new
end
with_shared_connection do
queues = [
Sidekiq::ScheduledSet.new,
Sidekiq::Queue.new(self.queue)
]
if retry_dead_jobs
queues << Sidekiq::RetrySet.new
queues << Sidekiq::DeadSet.new
end
queues.each do |queue|
queue.each do |job|
migration_class, migration_args = job.args
queues.each do |queue|
queue.each do |job|
migration_class, migration_args = job.args
next unless job.klass == worker_class.name
next unless migration_class == steal_class
next if block_given? && !(yield job)
next unless job.klass == worker_class.name
next unless migration_class == steal_class
next if block_given? && !(yield job)
begin
perform(migration_class, migration_args) if job.delete
rescue Exception # rubocop:disable Lint/RescueException
worker_class # enqueue this migration again
.perform_async(migration_class, migration_args)
begin
perform(migration_class, migration_args) if job.delete
rescue Exception # rubocop:disable Lint/RescueException
worker_class # enqueue this migration again
.perform_async(migration_class, migration_args)
raise
raise
end
end
end
end
end
def perform(class_name, arguments)
migration_class_for(class_name).new.perform(*arguments)
with_shared_connection do
migration_class_for(class_name).new.perform(*arguments)
end
end
def remaining
......
......@@ -73,6 +73,25 @@ RSpec.describe Gitlab::BackgroundMigration::JobCoordinator do
coordinator.steal('Foo')
end
it 'sets up the shared connection while stealing jobs' do
connection = double('connection')
allow(coordinator).to receive(:connection).and_return(connection)
expect(coordinator).to receive(:with_shared_connection).and_call_original
expect(queue[0]).to receive(:delete).and_return(true)
expect(coordinator).to receive(:perform).with('Foo', [10, 20]) do
expect(Gitlab::Database::SharedModel.connection).to be(connection)
end
coordinator.steal('Foo') do
expect(Gitlab::Database::SharedModel.connection).to be(connection)
true # the job is only performed if the block returns true
end
end
it 'does not steal job that has already been taken' do
expect(queue[0]).to receive(:delete).and_return(false)
......@@ -194,13 +213,20 @@ RSpec.describe Gitlab::BackgroundMigration::JobCoordinator do
describe '#perform' do
let(:migration) { spy(:migration) }
let(:connection) { double('connection') }
before do
stub_const('Gitlab::BackgroundMigration::Foo', migration)
allow(coordinator).to receive(:connection).and_return(connection)
end
it 'performs a background migration' do
expect(migration).to receive(:perform).with(10, 20).once
it 'performs a background migration with the configured shared connection' do
expect(coordinator).to receive(:with_shared_connection).and_call_original
expect(migration).to receive(:perform).with(10, 20).once do
expect(Gitlab::Database::SharedModel.connection).to be(connection)
end
coordinator.perform('Foo', [10, 20])
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment