Commit d68e7f4c authored by pbair's avatar pbair

Use migration database to enqueue bg migrations

Update the background migration helpers to enqueue background jobs
for the target tracking database. For now we default to using main for
all migrations, since we don't have additional databases yet.

This changes not only enqueuing operations, but other related functions
like stealing scheduled jobs from the queue.
parent fbfb7416
...@@ -36,6 +36,8 @@ module Gitlab ...@@ -36,6 +36,8 @@ module Gitlab
attr_reader :worker_class attr_reader :worker_class
delegate :minimum_interval, :perform_in, to: :worker_class
def queue def queue
@queue ||= worker_class.sidekiq_options['queue'] @queue ||= worker_class.sidekiq_options['queue']
end end
......
# frozen_string_literal: true # frozen_string_literal: true
module Gitlab module Gitlab
module Database module Database
module Migrations module Migrations
...@@ -45,11 +44,11 @@ module Gitlab ...@@ -45,11 +44,11 @@ module Gitlab
raise "#{model_class} does not have an ID column of #{primary_column_name} to use for batch ranges" unless model_class.column_names.include?(primary_column_name.to_s) raise "#{model_class} does not have an ID column of #{primary_column_name} to use for batch ranges" unless model_class.column_names.include?(primary_column_name.to_s)
raise "#{primary_column_name} is not an integer column" unless model_class.columns_hash[primary_column_name.to_s].type == :integer raise "#{primary_column_name} is not an integer column" unless model_class.columns_hash[primary_column_name.to_s].type == :integer
job_coordinator = coordinator_for_tracking_database
# To not overload the worker too much we enforce a minimum interval both # To not overload the worker too much we enforce a minimum interval both
# when scheduling and performing jobs. # when scheduling and performing jobs.
if delay_interval < BackgroundMigrationWorker.minimum_interval delay_interval = [delay_interval, job_coordinator.minimum_interval].max
delay_interval = BackgroundMigrationWorker.minimum_interval
end
final_delay = 0 final_delay = 0
batch_counter = 0 batch_counter = 0
...@@ -60,14 +59,14 @@ module Gitlab ...@@ -60,14 +59,14 @@ module Gitlab
start_id, end_id = relation.pluck(min, max).first start_id, end_id = relation.pluck(min, max).first
# `BackgroundMigrationWorker.bulk_perform_in` schedules all jobs for # `SingleDatabaseWorker.bulk_perform_in` schedules all jobs for
# the same time, which is not helpful in most cases where we wish to # the same time, which is not helpful in most cases where we wish to
# spread the work over time. # spread the work over time.
final_delay = initial_delay + delay_interval * index final_delay = initial_delay + delay_interval * index
full_job_arguments = [start_id, end_id] + other_job_arguments full_job_arguments = [start_id, end_id] + other_job_arguments
track_in_database(job_class_name, full_job_arguments) if track_jobs track_in_database(job_class_name, full_job_arguments) if track_jobs
migrate_in(final_delay, job_class_name, full_job_arguments) migrate_in(final_delay, job_class_name, full_job_arguments, coordinator: job_coordinator)
batch_counter += 1 batch_counter += 1
end end
...@@ -91,9 +90,11 @@ module Gitlab ...@@ -91,9 +90,11 @@ module Gitlab
# delay_interval - The duration between each job's scheduled time # delay_interval - The duration between each job's scheduled time
# batch_size - The maximum number of jobs to fetch to memory from the database. # batch_size - The maximum number of jobs to fetch to memory from the database.
def requeue_background_migration_jobs_by_range_at_intervals(job_class_name, delay_interval, batch_size: BATCH_SIZE, initial_delay: 0) def requeue_background_migration_jobs_by_range_at_intervals(job_class_name, delay_interval, batch_size: BATCH_SIZE, initial_delay: 0)
job_coordinator = coordinator_for_tracking_database
# To not overload the worker too much we enforce a minimum interval both # To not overload the worker too much we enforce a minimum interval both
# when scheduling and performing jobs. # when scheduling and performing jobs.
delay_interval = [delay_interval, BackgroundMigrationWorker.minimum_interval].max delay_interval = [delay_interval, job_coordinator.minimum_interval].max
final_delay = 0 final_delay = 0
job_counter = 0 job_counter = 0
...@@ -103,7 +104,7 @@ module Gitlab ...@@ -103,7 +104,7 @@ module Gitlab
job_batch.each do |job| job_batch.each do |job|
final_delay = initial_delay + delay_interval * job_counter final_delay = initial_delay + delay_interval * job_counter
migrate_in(final_delay, job_class_name, job.arguments) migrate_in(final_delay, job_class_name, job.arguments, coordinator: job_coordinator)
job_counter += 1 job_counter += 1
end end
...@@ -132,17 +133,20 @@ module Gitlab ...@@ -132,17 +133,20 @@ module Gitlab
# This method does not garauntee that all jobs completed successfully. # This method does not garauntee that all jobs completed successfully.
# It can only be used if the previous background migration used the queue_background_migration_jobs_by_range_at_intervals helper. # It can only be used if the previous background migration used the queue_background_migration_jobs_by_range_at_intervals helper.
def finalize_background_migration(class_name, delete_tracking_jobs: ['succeeded']) def finalize_background_migration(class_name, delete_tracking_jobs: ['succeeded'])
job_coordinator = coordinator_for_tracking_database
# Empty the sidekiq queue. # Empty the sidekiq queue.
Gitlab::BackgroundMigration.steal(class_name) job_coordinator.steal(class_name)
# Process pending tracked jobs. # Process pending tracked jobs.
jobs = Gitlab::Database::BackgroundMigrationJob.pending.for_migration_class(class_name) jobs = Gitlab::Database::BackgroundMigrationJob.pending.for_migration_class(class_name)
jobs.find_each do |job| jobs.find_each do |job|
BackgroundMigrationWorker.new.perform(job.class_name, job.arguments) job_coordinator.perform(job.class_name, job.arguments)
end end
# Empty the sidekiq queue. # Empty the sidekiq queue.
Gitlab::BackgroundMigration.steal(class_name) job_coordinator.steal(class_name)
# Delete job tracking rows. # Delete job tracking rows.
delete_job_tracking(class_name, status: delete_tracking_jobs) if delete_tracking_jobs delete_job_tracking(class_name, status: delete_tracking_jobs) if delete_tracking_jobs
...@@ -152,36 +156,14 @@ module Gitlab ...@@ -152,36 +156,14 @@ module Gitlab
Rails.env.test? || Rails.env.development? Rails.env.test? || Rails.env.development?
end end
def migrate_async(*args) def migrate_in(*args, coordinator: coordinator_for_tracking_database)
with_migration_context do
BackgroundMigrationWorker.perform_async(*args)
end
end
def migrate_in(*args)
with_migration_context do
BackgroundMigrationWorker.perform_in(*args)
end
end
def bulk_migrate_in(*args)
with_migration_context do
BackgroundMigrationWorker.bulk_perform_in(*args)
end
end
def bulk_migrate_async(*args)
with_migration_context do with_migration_context do
BackgroundMigrationWorker.bulk_perform_async(*args) coordinator.perform_in(*args)
end end
end end
def with_migration_context(&block)
Gitlab::ApplicationContext.with_context(caller_id: self.class.to_s, &block)
end
def delete_queued_jobs(class_name) def delete_queued_jobs(class_name)
Gitlab::BackgroundMigration.steal(class_name) do |job| coordinator_for_tracking_database.steal(class_name) do |job|
job.delete job.delete
false false
...@@ -196,9 +178,21 @@ module Gitlab ...@@ -196,9 +178,21 @@ module Gitlab
private private
def with_migration_context(&block)
Gitlab::ApplicationContext.with_context(caller_id: self.class.to_s, &block)
end
def track_in_database(class_name, arguments) def track_in_database(class_name, arguments)
Gitlab::Database::BackgroundMigrationJob.create!(class_name: class_name, arguments: arguments) Gitlab::Database::BackgroundMigrationJob.create!(class_name: class_name, arguments: arguments)
end end
def coordinator_for_tracking_database
Gitlab::BackgroundMigration.coordinator_for_database(tracking_database)
end
def tracking_database
Gitlab::BackgroundMigration::DEFAULT_TRACKING_DATABASE
end
end end
end end
end end
......
...@@ -22,19 +22,19 @@ RSpec.shared_examples 'marks background migration job records' do ...@@ -22,19 +22,19 @@ RSpec.shared_examples 'marks background migration job records' do
end end
end end
RSpec.shared_examples 'finalized background migration' do RSpec.shared_examples 'finalized background migration' do |worker_class|
it 'processed the scheduled sidekiq queue' do it 'processed the scheduled sidekiq queue' do
queued = Sidekiq::ScheduledSet queued = Sidekiq::ScheduledSet
.new .new
.select do |scheduled| .select do |scheduled|
scheduled.klass == 'BackgroundMigrationWorker' && scheduled.klass == worker_class.name &&
scheduled.args.first == job_class_name scheduled.args.first == job_class_name
end end
expect(queued.size).to eq(0) expect(queued.size).to eq(0)
end end
it 'processed the async sidekiq queue' do it 'processed the async sidekiq queue' do
queued = Sidekiq::Queue.new('BackgroundMigrationWorker') queued = Sidekiq::Queue.new(worker_class.name)
.select { |scheduled| scheduled.klass == job_class_name } .select { |scheduled| scheduled.klass == job_class_name }
expect(queued.size).to eq(0) expect(queued.size).to eq(0)
end end
...@@ -42,8 +42,8 @@ RSpec.shared_examples 'finalized background migration' do ...@@ -42,8 +42,8 @@ RSpec.shared_examples 'finalized background migration' do
include_examples 'removed tracked jobs', 'pending' include_examples 'removed tracked jobs', 'pending'
end end
RSpec.shared_examples 'finalized tracked background migration' do RSpec.shared_examples 'finalized tracked background migration' do |worker_class|
include_examples 'finalized background migration' include_examples 'finalized background migration', worker_class
include_examples 'removed tracked jobs', 'succeeded' include_examples 'removed tracked jobs', 'succeeded'
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment