Commit a7b5eb50 authored by pbair's avatar pbair

Use database parameter to configure BG migrations

Introduce a new class to run background migration jobs, that properly
handles multiple databases by requiring the target trackign database
to be passed in.

This information will either be determined by metadata attached to DML
migrations, or from the appropriate worker class for the target
database. For now we default to :main to maintain
backwards-compatibility.
parent e223d9e4
......@@ -2,8 +2,12 @@
module Gitlab
module BackgroundMigration
def self.queue
@queue ||= BackgroundMigrationWorker.sidekiq_options['queue']
def self.coordinator_for_database(database)
JobCoordinator.for_database(database)
end
def self.queue(database: :main)
coordinator_for_database(database).queue
end
# Begins stealing jobs from the background migrations queue, blocking the
......@@ -16,35 +20,10 @@ module Gitlab
# re-raises the exception.
#
# steal_class - The name of the class for which to steal jobs.
def self.steal(steal_class, retry_dead_jobs: false)
queues = [
Sidekiq::ScheduledSet.new,
Sidekiq::Queue.new(self.queue)
]
if retry_dead_jobs
queues << Sidekiq::RetrySet.new
queues << Sidekiq::DeadSet.new
end
queues.each do |queue|
queue.each do |job|
migration_class, migration_args = job.args
next unless job.klass == 'BackgroundMigrationWorker'
next unless migration_class == steal_class
next if block_given? && !(yield job)
begin
perform(migration_class, migration_args) if job.delete
rescue Exception # rubocop:disable Lint/RescueException
BackgroundMigrationWorker # enqueue this migration again
.perform_async(migration_class, migration_args)
raise
end
end
end
# retry_dead_jobs - Flag to control whether jobs in Sidekiq::RetrySet or Sidekiq::DeadSet are retried.
# database - tracking database this migration executes against
def self.steal(steal_class, retry_dead_jobs: false, database: :main, &block)
coordinator_for_database(database).steal(steal_class, retry_dead_jobs: retry_dead_jobs, &block)
end
##
......@@ -55,64 +34,13 @@ module Gitlab
#
# arguments - The arguments to pass to the background migration's "perform"
# method.
def self.perform(class_name, arguments)
migration_class_for(class_name).new.perform(*arguments)
end
def self.remaining
enqueued = Sidekiq::Queue.new(self.queue)
scheduled = Sidekiq::ScheduledSet.new
[enqueued, scheduled].sum do |set|
set.count do |job|
job.klass == 'BackgroundMigrationWorker'
end
end
end
def self.exists?(migration_class, additional_queues = [])
enqueued = Sidekiq::Queue.new(self.queue)
scheduled = Sidekiq::ScheduledSet.new
enqueued_job?([enqueued, scheduled], migration_class)
end
def self.dead_jobs?(migration_class)
dead_set = Sidekiq::DeadSet.new
enqueued_job?([dead_set], migration_class)
end
def self.retrying_jobs?(migration_class)
retry_set = Sidekiq::RetrySet.new
enqueued_job?([retry_set], migration_class)
end
def self.migration_class_for(class_name)
# We don't pass class name with Gitlab::BackgroundMigration:: prefix anymore
# but some jobs could be already spawned so we need to have some backward compatibility period.
# Can be removed since 13.x
full_class_name_prefix_regexp = /\A(::)?Gitlab::BackgroundMigration::/
if class_name.match(full_class_name_prefix_regexp)
Gitlab::ErrorTracking.track_and_raise_for_dev_exception(
StandardError.new("Full class name is used"),
class_name: class_name
)
class_name = class_name.sub(full_class_name_prefix_regexp, '')
end
const_get(class_name, false)
# database - tracking database this migration executes against
def self.perform(class_name, arguments, database: :main)
coordinator_for_database(database).perform(class_name, arguments)
end
def self.enqueued_job?(queues, migration_class)
queues.any? do |queue|
queue.any? do |job|
job.klass == 'BackgroundMigrationWorker' && job.args.first == migration_class
end
end
def self.exists?(migration_class, additional_queues = [], database: :main)
coordinator_for_database(database).exists?(migration_class, additional_queues) # rubocop:disable CodeReuse/ActiveRecord
end
end
end
# frozen_string_literal: true
module Gitlab
module BackgroundMigration
# Class responsible for executing background migrations based on the given database.
#
# Chooses the correct worker class when selecting jobs from the queue based on the
# convention of how the queues and worker classes are setup for each database.
#
# Also provides a database connection to the correct tracking database.
class JobCoordinator
VALID_DATABASES = %i[main].freeze
WORKER_CLASS_NAME = 'BackgroundMigrationWorker'
def self.for_database(database)
database = database.to_sym
unless VALID_DATABASES.include?(database)
raise ArgumentError, "database must be one of [#{VALID_DATABASES.join(', ')}], got '#{database}'"
end
namespace = database.to_s.capitalize unless database == :main
namespaced_worker_class = [namespace, WORKER_CLASS_NAME].compact.join('::')
new(database, "::#{namespaced_worker_class}".constantize)
end
attr_reader :database, :worker_class
def queue
@queue ||= worker_class.sidekiq_options['queue']
end
def with_shared_connection(&block)
Gitlab::Database::SharedModel.using_connection(connection, &block)
end
def steal(steal_class, retry_dead_jobs: false)
queues = [
Sidekiq::ScheduledSet.new,
Sidekiq::Queue.new(self.queue)
]
if retry_dead_jobs
queues << Sidekiq::RetrySet.new
queues << Sidekiq::DeadSet.new
end
queues.each do |queue|
queue.each do |job|
migration_class, migration_args = job.args
next unless job.klass == worker_class.name
next unless migration_class == steal_class
next if block_given? && !(yield job)
begin
perform(migration_class, migration_args) if job.delete
rescue Exception # rubocop:disable Lint/RescueException
worker_class # enqueue this migration again
.perform_async(migration_class, migration_args)
raise
end
end
end
end
def perform(class_name, arguments)
migration_class_for(class_name).new.perform(*arguments)
end
def remaining
enqueued = Sidekiq::Queue.new(self.queue)
scheduled = Sidekiq::ScheduledSet.new
[enqueued, scheduled].sum do |set|
set.count do |job|
job.klass == worker_class.name
end
end
end
def exists?(migration_class, additional_queues = [])
enqueued = Sidekiq::Queue.new(self.queue)
scheduled = Sidekiq::ScheduledSet.new
enqueued_job?([enqueued, scheduled], migration_class)
end
def dead_jobs?(migration_class)
dead_set = Sidekiq::DeadSet.new
enqueued_job?([dead_set], migration_class)
end
def retrying_jobs?(migration_class)
retry_set = Sidekiq::RetrySet.new
enqueued_job?([retry_set], migration_class)
end
def migration_class_for(class_name)
Gitlab::BackgroundMigration.const_get(class_name, false)
end
def enqueued_job?(queues, migration_class)
queues.any? do |queue|
queue.any? do |job|
job.klass == worker_class.name && job.args.first == migration_class
end
end
end
private
def initialize(database, worker_class)
@database = database
@worker_class = worker_class
end
def connection
@connection ||= Gitlab::Database.databases.fetch(database, Gitlab::Database.main).scope.connection
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::BackgroundMigration::JobCoordinator do
let(:database) { :main }
let(:worker_class) { BackgroundMigrationWorker }
let(:coordinator) { described_class.new(database, worker_class) }
describe '.for_database' do
it 'returns an executor with the correct worker class and database' do
coordinator = described_class.for_database(database)
expect(coordinator.database).to eq(database)
expect(coordinator.worker_class).to eq(worker_class)
end
context 'when passed in as a string' do
it 'retruns an executor with the correct worker class and database' do
coordinator = described_class.for_database(database.to_s)
expect(coordinator.database).to eq(database)
expect(coordinator.worker_class).to eq(worker_class)
end
end
context 'when an invalid value is given' do
it 'raises an error' do
expect do
described_class.for_database('notvalid')
end.to raise_error(ArgumentError, "database must be one of [main], got 'notvalid'")
end
end
end
describe '#queue' do
it 'returns background migration worker queue' do
expect(coordinator.queue).to eq(worker_class.sidekiq_options['queue'])
end
end
describe '#with_shared_connection' do
it 'yields to the block after properly configuring SharedModel' do
expect(Gitlab::Database::SharedModel).to receive(:using_connection)
.with(Gitlab::Database.main.scope.connection).and_yield
expect { |b| coordinator.with_shared_connection(&b) }.to yield_with_no_args
end
end
describe '#steal' do
context 'when there are enqueued jobs present' do
let(:queue) do
[
double(args: ['Foo', [10, 20]], klass: worker_class.name),
double(args: ['Bar', [20, 30]], klass: worker_class.name),
double(args: ['Foo', [20, 30]], klass: 'MergeWorker')
]
end
before do
allow(Sidekiq::Queue).to receive(:new)
.with(coordinator.queue)
.and_return(queue)
end
context 'when queue contains unprocessed jobs' do
it 'steals jobs from a queue' do
expect(queue[0]).to receive(:delete).and_return(true)
expect(coordinator).to receive(:perform).with('Foo', [10, 20])
coordinator.steal('Foo')
end
it 'does not steal job that has already been taken' do
expect(queue[0]).to receive(:delete).and_return(false)
expect(coordinator).not_to receive(:perform)
coordinator.steal('Foo')
end
it 'does not steal jobs for a different migration' do
expect(coordinator).not_to receive(:perform)
expect(queue[0]).not_to receive(:delete)
coordinator.steal('Baz')
end
context 'when a custom predicate is given' do
it 'steals jobs that match the predicate' do
expect(queue[0]).to receive(:delete).and_return(true)
expect(coordinator).to receive(:perform).with('Foo', [10, 20])
coordinator.steal('Foo') { |job| job.args.second.first == 10 && job.args.second.second == 20 }
end
it 'does not steal jobs that do not match the predicate' do
expect(described_class).not_to receive(:perform)
expect(queue[0]).not_to receive(:delete)
coordinator.steal('Foo') { |(arg1, _)| arg1 == 5 }
end
end
end
context 'when one of the jobs raises an error' do
let(:migration) { spy(:migration) }
let(:queue) do
[double(args: ['Foo', [10, 20]], klass: worker_class.name),
double(args: ['Foo', [20, 30]], klass: worker_class.name)]
end
before do
stub_const('Gitlab::BackgroundMigration::Foo', migration)
allow(queue[0]).to receive(:delete).and_return(true)
allow(queue[1]).to receive(:delete).and_return(true)
end
it 'enqueues the migration again and re-raises the error' do
allow(migration).to receive(:perform).with(10, 20).and_raise(Exception, 'Migration error').once
expect(worker_class).to receive(:perform_async).with('Foo', [10, 20]).once
expect { coordinator.steal('Foo') }.to raise_error(Exception)
end
end
end
context 'when there are scheduled jobs present', :redis do
it 'steals all jobs from the scheduled sets' do
Sidekiq::Testing.disable! do
worker_class.perform_in(10.minutes, 'Object')
expect(Sidekiq::ScheduledSet.new).to be_one
expect(coordinator).to receive(:perform).with('Object', any_args)
coordinator.steal('Object')
expect(Sidekiq::ScheduledSet.new).to be_none
end
end
end
context 'when there are enqueued and scheduled jobs present', :redis do
it 'steals from the scheduled sets queue first' do
Sidekiq::Testing.disable! do
expect(coordinator).to receive(:perform).with('Object', [1]).ordered
expect(coordinator).to receive(:perform).with('Object', [2]).ordered
worker_class.perform_async('Object', [2])
worker_class.perform_in(10.minutes, 'Object', [1])
coordinator.steal('Object')
end
end
end
context 'when retry_dead_jobs is true', :redis do
let(:retry_queue) do
[double(args: ['Object', [3]], klass: worker_class.name, delete: true)]
end
let(:dead_queue) do
[double(args: ['Object', [4]], klass: worker_class.name, delete: true)]
end
before do
allow(Sidekiq::RetrySet).to receive(:new).and_return(retry_queue)
allow(Sidekiq::DeadSet).to receive(:new).and_return(dead_queue)
end
it 'steals from the dead and retry queue' do
Sidekiq::Testing.disable! do
expect(coordinator).to receive(:perform).with('Object', [1]).ordered
expect(coordinator).to receive(:perform).with('Object', [2]).ordered
expect(coordinator).to receive(:perform).with('Object', [3]).ordered
expect(coordinator).to receive(:perform).with('Object', [4]).ordered
worker_class.perform_async('Object', [2])
worker_class.perform_in(10.minutes, 'Object', [1])
coordinator.steal('Object', retry_dead_jobs: true)
end
end
end
end
describe '#perform' do
let(:migration) { spy(:migration) }
before do
stub_const('Gitlab::BackgroundMigration::Foo', migration)
end
it 'performs a background migration' do
expect(migration).to receive(:perform).with(10, 20).once
coordinator.perform('Foo', [10, 20])
end
end
describe '.remaining', :redis do
context 'when there are jobs remaining' do
before do
Sidekiq::Testing.disable! do
MergeWorker.perform_async('Foo')
MergeWorker.perform_in(10.minutes, 'Foo')
5.times do
worker_class.perform_async('Foo')
end
3.times do
worker_class.perform_in(10.minutes, 'Foo')
end
end
end
it 'returns the enqueued jobs plus the scheduled jobs' do
expect(coordinator.remaining).to eq(8)
end
end
context 'when there are no jobs remaining' do
it 'returns zero' do
expect(coordinator.remaining).to be_zero
end
end
end
describe '.exists?', :redis do
context 'when there are enqueued jobs present' do
before do
Sidekiq::Testing.disable! do
MergeWorker.perform_async('Bar')
worker_class.perform_async('Foo')
end
end
it 'returns true if specific job exists' do
expect(coordinator.exists?('Foo')).to eq(true)
end
it 'returns false if specific job does not exist' do
expect(coordinator.exists?('Bar')).to eq(false)
end
end
context 'when there are scheduled jobs present' do
before do
Sidekiq::Testing.disable! do
MergeWorker.perform_in(10.minutes, 'Bar')
worker_class.perform_in(10.minutes, 'Foo')
end
end
it 'returns true if specific job exists' do
expect(coordinator.exists?('Foo')).to eq(true)
end
it 'returns false if specific job does not exist' do
expect(coordinator.exists?('Bar')).to eq(false)
end
end
end
describe '.dead_jobs?' do
let(:queue) do
[
double(args: ['Foo', [10, 20]], klass: worker_class.name),
double(args: ['Bar'], klass: 'MergeWorker')
]
end
context 'when there are dead jobs present' do
before do
allow(Sidekiq::DeadSet).to receive(:new).and_return(queue)
end
it 'returns true if specific job exists' do
expect(coordinator.dead_jobs?('Foo')).to eq(true)
end
it 'returns false if specific job does not exist' do
expect(coordinator.dead_jobs?('Bar')).to eq(false)
end
end
end
describe '.retrying_jobs?' do
let(:queue) do
[
double(args: ['Foo', [10, 20]], klass: worker_class.name),
double(args: ['Bar'], klass: 'MergeWorker')
]
end
context 'when there are dead jobs present' do
before do
allow(Sidekiq::RetrySet).to receive(:new).and_return(queue)
end
it 'returns true if specific job exists' do
expect(coordinator.retrying_jobs?('Foo')).to eq(true)
end
it 'returns false if specific job does not exist' do
expect(coordinator.retrying_jobs?('Bar')).to eq(false)
end
end
end
end
......@@ -583,12 +583,33 @@ RSpec.describe Gitlab::Database::Migrations::BackgroundMigrationHelpers do
end
describe '#finalized_background_migration' do
include_context 'background migration job class'
let(:job_coordinator) { Gitlab::BackgroundMigration::JobCoordinator.new(:main, BackgroundMigrationWorker) }
let!(:job_class_name) { 'TestJob' }
let!(:job_class) { Class.new }
let!(:job_perform_method) do
->(*arguments) do
Gitlab::Database::BackgroundMigrationJob.mark_all_as_succeeded(
# Value is 'TestJob' defined by :job_class_name in the let! above.
# Scoping prohibits us from directly referencing job_class_name.
RSpec.current_example.example_group_instance.job_class_name,
arguments
)
end
end
let!(:tracked_pending_job) { create(:background_migration_job, class_name: job_class_name, status: :pending, arguments: [1]) }
let!(:tracked_successful_job) { create(:background_migration_job, class_name: job_class_name, status: :succeeded, arguments: [2]) }
before do
job_class.define_method(:perform, job_perform_method)
allow(Gitlab::BackgroundMigration).to receive(:coordinator_for_database)
.with(:main).and_return(job_coordinator)
expect(job_coordinator).to receive(:migration_class_for)
.with(job_class_name).at_least(:once) { job_class }
Sidekiq::Testing.disable! do
BackgroundMigrationWorker.perform_async(job_class_name, [1, 2])
BackgroundMigrationWorker.perform_async(job_class_name, [3, 4])
......
# frozen_string_literal: true
RSpec.shared_context 'background migration job class' do
let!(:job_class_name) { 'TestJob' }
let!(:job_class) { Class.new }
let!(:job_perform_method) do
->(*arguments) do
Gitlab::Database::BackgroundMigrationJob.mark_all_as_succeeded(
# Value is 'TestJob' defined by :job_class_name in the let! above.
# Scoping prohibits us from directly referencing job_class_name.
RSpec.current_example.example_group_instance.job_class_name,
arguments
)
end
end
before do
job_class.define_method(:perform, job_perform_method)
expect(Gitlab::BackgroundMigration).to receive(:migration_class_for).with(job_class_name).at_least(:once) { job_class }
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment