Commit f7d0d18a authored by Dylan Griffith's avatar Dylan Griffith

Merge branch '343055-put-bg-migrations-in-correct-queue' into 'master'

Select worker to schedule BG migrations

See merge request gitlab-org/gitlab!73306
parents adc4e642 d68e7f4c
...@@ -36,6 +36,8 @@ module Gitlab ...@@ -36,6 +36,8 @@ module Gitlab
attr_reader :worker_class attr_reader :worker_class
delegate :minimum_interval, :perform_in, to: :worker_class
def queue def queue
@queue ||= worker_class.sidekiq_options['queue'] @queue ||= worker_class.sidekiq_options['queue']
end end
......
# frozen_string_literal: true # frozen_string_literal: true
module Gitlab module Gitlab
module Database module Database
module Migrations module Migrations
...@@ -45,11 +44,11 @@ module Gitlab ...@@ -45,11 +44,11 @@ module Gitlab
raise "#{model_class} does not have an ID column of #{primary_column_name} to use for batch ranges" unless model_class.column_names.include?(primary_column_name.to_s) raise "#{model_class} does not have an ID column of #{primary_column_name} to use for batch ranges" unless model_class.column_names.include?(primary_column_name.to_s)
raise "#{primary_column_name} is not an integer column" unless model_class.columns_hash[primary_column_name.to_s].type == :integer raise "#{primary_column_name} is not an integer column" unless model_class.columns_hash[primary_column_name.to_s].type == :integer
job_coordinator = coordinator_for_tracking_database
# To not overload the worker too much we enforce a minimum interval both # To not overload the worker too much we enforce a minimum interval both
# when scheduling and performing jobs. # when scheduling and performing jobs.
if delay_interval < BackgroundMigrationWorker.minimum_interval delay_interval = [delay_interval, job_coordinator.minimum_interval].max
delay_interval = BackgroundMigrationWorker.minimum_interval
end
final_delay = 0 final_delay = 0
batch_counter = 0 batch_counter = 0
...@@ -60,14 +59,14 @@ module Gitlab ...@@ -60,14 +59,14 @@ module Gitlab
start_id, end_id = relation.pluck(min, max).first start_id, end_id = relation.pluck(min, max).first
# `BackgroundMigrationWorker.bulk_perform_in` schedules all jobs for # `SingleDatabaseWorker.bulk_perform_in` schedules all jobs for
# the same time, which is not helpful in most cases where we wish to # the same time, which is not helpful in most cases where we wish to
# spread the work over time. # spread the work over time.
final_delay = initial_delay + delay_interval * index final_delay = initial_delay + delay_interval * index
full_job_arguments = [start_id, end_id] + other_job_arguments full_job_arguments = [start_id, end_id] + other_job_arguments
track_in_database(job_class_name, full_job_arguments) if track_jobs track_in_database(job_class_name, full_job_arguments) if track_jobs
migrate_in(final_delay, job_class_name, full_job_arguments) migrate_in(final_delay, job_class_name, full_job_arguments, coordinator: job_coordinator)
batch_counter += 1 batch_counter += 1
end end
...@@ -91,9 +90,11 @@ module Gitlab ...@@ -91,9 +90,11 @@ module Gitlab
# delay_interval - The duration between each job's scheduled time # delay_interval - The duration between each job's scheduled time
# batch_size - The maximum number of jobs to fetch to memory from the database. # batch_size - The maximum number of jobs to fetch to memory from the database.
def requeue_background_migration_jobs_by_range_at_intervals(job_class_name, delay_interval, batch_size: BATCH_SIZE, initial_delay: 0) def requeue_background_migration_jobs_by_range_at_intervals(job_class_name, delay_interval, batch_size: BATCH_SIZE, initial_delay: 0)
job_coordinator = coordinator_for_tracking_database
# To not overload the worker too much we enforce a minimum interval both # To not overload the worker too much we enforce a minimum interval both
# when scheduling and performing jobs. # when scheduling and performing jobs.
delay_interval = [delay_interval, BackgroundMigrationWorker.minimum_interval].max delay_interval = [delay_interval, job_coordinator.minimum_interval].max
final_delay = 0 final_delay = 0
job_counter = 0 job_counter = 0
...@@ -103,7 +104,7 @@ module Gitlab ...@@ -103,7 +104,7 @@ module Gitlab
job_batch.each do |job| job_batch.each do |job|
final_delay = initial_delay + delay_interval * job_counter final_delay = initial_delay + delay_interval * job_counter
migrate_in(final_delay, job_class_name, job.arguments) migrate_in(final_delay, job_class_name, job.arguments, coordinator: job_coordinator)
job_counter += 1 job_counter += 1
end end
...@@ -132,17 +133,20 @@ module Gitlab ...@@ -132,17 +133,20 @@ module Gitlab
# This method does not garauntee that all jobs completed successfully. # This method does not garauntee that all jobs completed successfully.
# It can only be used if the previous background migration used the queue_background_migration_jobs_by_range_at_intervals helper. # It can only be used if the previous background migration used the queue_background_migration_jobs_by_range_at_intervals helper.
def finalize_background_migration(class_name, delete_tracking_jobs: ['succeeded']) def finalize_background_migration(class_name, delete_tracking_jobs: ['succeeded'])
job_coordinator = coordinator_for_tracking_database
# Empty the sidekiq queue. # Empty the sidekiq queue.
Gitlab::BackgroundMigration.steal(class_name) job_coordinator.steal(class_name)
# Process pending tracked jobs. # Process pending tracked jobs.
jobs = Gitlab::Database::BackgroundMigrationJob.pending.for_migration_class(class_name) jobs = Gitlab::Database::BackgroundMigrationJob.pending.for_migration_class(class_name)
jobs.find_each do |job| jobs.find_each do |job|
BackgroundMigrationWorker.new.perform(job.class_name, job.arguments) job_coordinator.perform(job.class_name, job.arguments)
end end
# Empty the sidekiq queue. # Empty the sidekiq queue.
Gitlab::BackgroundMigration.steal(class_name) job_coordinator.steal(class_name)
# Delete job tracking rows. # Delete job tracking rows.
delete_job_tracking(class_name, status: delete_tracking_jobs) if delete_tracking_jobs delete_job_tracking(class_name, status: delete_tracking_jobs) if delete_tracking_jobs
...@@ -152,36 +156,14 @@ module Gitlab ...@@ -152,36 +156,14 @@ module Gitlab
Rails.env.test? || Rails.env.development? Rails.env.test? || Rails.env.development?
end end
def migrate_async(*args) def migrate_in(*args, coordinator: coordinator_for_tracking_database)
with_migration_context do
BackgroundMigrationWorker.perform_async(*args)
end
end
def migrate_in(*args)
with_migration_context do
BackgroundMigrationWorker.perform_in(*args)
end
end
def bulk_migrate_in(*args)
with_migration_context do
BackgroundMigrationWorker.bulk_perform_in(*args)
end
end
def bulk_migrate_async(*args)
with_migration_context do with_migration_context do
BackgroundMigrationWorker.bulk_perform_async(*args) coordinator.perform_in(*args)
end end
end end
def with_migration_context(&block)
Gitlab::ApplicationContext.with_context(caller_id: self.class.to_s, &block)
end
def delete_queued_jobs(class_name) def delete_queued_jobs(class_name)
Gitlab::BackgroundMigration.steal(class_name) do |job| coordinator_for_tracking_database.steal(class_name) do |job|
job.delete job.delete
false false
...@@ -196,9 +178,21 @@ module Gitlab ...@@ -196,9 +178,21 @@ module Gitlab
private private
def with_migration_context(&block)
Gitlab::ApplicationContext.with_context(caller_id: self.class.to_s, &block)
end
def track_in_database(class_name, arguments) def track_in_database(class_name, arguments)
Gitlab::Database::BackgroundMigrationJob.create!(class_name: class_name, arguments: arguments) Gitlab::Database::BackgroundMigrationJob.create!(class_name: class_name, arguments: arguments)
end end
def coordinator_for_tracking_database
Gitlab::BackgroundMigration.coordinator_for_database(tracking_database)
end
def tracking_database
Gitlab::BackgroundMigration::DEFAULT_TRACKING_DATABASE
end
end end
end end
end end
......
...@@ -22,19 +22,19 @@ RSpec.shared_examples 'marks background migration job records' do ...@@ -22,19 +22,19 @@ RSpec.shared_examples 'marks background migration job records' do
end end
end end
RSpec.shared_examples 'finalized background migration' do RSpec.shared_examples 'finalized background migration' do |worker_class|
it 'processed the scheduled sidekiq queue' do it 'processed the scheduled sidekiq queue' do
queued = Sidekiq::ScheduledSet queued = Sidekiq::ScheduledSet
.new .new
.select do |scheduled| .select do |scheduled|
scheduled.klass == 'BackgroundMigrationWorker' && scheduled.klass == worker_class.name &&
scheduled.args.first == job_class_name scheduled.args.first == job_class_name
end end
expect(queued.size).to eq(0) expect(queued.size).to eq(0)
end end
it 'processed the async sidekiq queue' do it 'processed the async sidekiq queue' do
queued = Sidekiq::Queue.new('BackgroundMigrationWorker') queued = Sidekiq::Queue.new(worker_class.name)
.select { |scheduled| scheduled.klass == job_class_name } .select { |scheduled| scheduled.klass == job_class_name }
expect(queued.size).to eq(0) expect(queued.size).to eq(0)
end end
...@@ -42,8 +42,8 @@ RSpec.shared_examples 'finalized background migration' do ...@@ -42,8 +42,8 @@ RSpec.shared_examples 'finalized background migration' do
include_examples 'removed tracked jobs', 'pending' include_examples 'removed tracked jobs', 'pending'
end end
RSpec.shared_examples 'finalized tracked background migration' do RSpec.shared_examples 'finalized tracked background migration' do |worker_class|
include_examples 'finalized background migration' include_examples 'finalized background migration', worker_class
include_examples 'removed tracked jobs', 'succeeded' include_examples 'removed tracked jobs', 'succeeded'
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment