Commit 0ce1f41a authored by Bob Van Landuyt's avatar Bob Van Landuyt Committed by Adam Hegyi

Wrap BackgroundMigrationWorker#perform in context

This will add the `caller_id` to the context for every job that might
be scheduled from a background migration. So if a background migration
reschedules itself or another migration, this will now be visible in
the metadata.
parent e8f51a55
...@@ -22,17 +22,19 @@ class BackgroundMigrationWorker ...@@ -22,17 +22,19 @@ class BackgroundMigrationWorker
# class_name - The class name of the background migration to run. # class_name - The class name of the background migration to run.
# arguments - The arguments to pass to the migration class. # arguments - The arguments to pass to the migration class.
def perform(class_name, arguments = []) def perform(class_name, arguments = [])
should_perform, ttl = perform_and_ttl(class_name) with_context(caller_id: class_name.to_s) do
should_perform, ttl = perform_and_ttl(class_name)
if should_perform if should_perform
Gitlab::BackgroundMigration.perform(class_name, arguments) Gitlab::BackgroundMigration.perform(class_name, arguments)
else else
# If the lease could not be obtained this means either another process is # If the lease could not be obtained this means either another process is
# running a migration of this class or we ran one recently. In this case # running a migration of this class or we ran one recently. In this case
# we'll reschedule the job in such a way that it is picked up again around # we'll reschedule the job in such a way that it is picked up again around
# the time the lease expires. # the time the lease expires.
self.class self.class
.perform_in(ttl || self.class.minimum_interval, class_name, arguments) .perform_in(ttl || self.class.minimum_interval, class_name, arguments)
end
end end
end end
......
...@@ -72,20 +72,21 @@ migration classes must be defined in the namespace ...@@ -72,20 +72,21 @@ migration classes must be defined in the namespace
## Scheduling ## Scheduling
Scheduling a background migration should be done in a post-deployment migration. Scheduling a background migration should be done in a post-deployment
migration that includes `Gitlab::Database::MigrationHelpers`
To do so, simply use the following code while To do so, simply use the following code while
replacing the class name and arguments with whatever values are necessary for replacing the class name and arguments with whatever values are necessary for
your migration: your migration:
```ruby ```ruby
BackgroundMigrationWorker.perform_async('BackgroundMigrationClassName', [arg1, arg2, ...]) migrate_async('BackgroundMigrationClassName', [arg1, arg2, ...])
``` ```
Usually it's better to enqueue jobs in bulk, for this you can use Usually it's better to enqueue jobs in bulk, for this you can use
`BackgroundMigrationWorker.bulk_perform_async`: `bulk_migrate_async`:
```ruby ```ruby
BackgroundMigrationWorker.bulk_perform_async( bulk_migrate_async(
[['BackgroundMigrationClassName', [1]], [['BackgroundMigrationClassName', [1]],
['BackgroundMigrationClassName', [2]]] ['BackgroundMigrationClassName', [2]]]
) )
...@@ -105,7 +106,7 @@ If you would like to schedule jobs in bulk with a delay, you can use ...@@ -105,7 +106,7 @@ If you would like to schedule jobs in bulk with a delay, you can use
jobs = [['BackgroundMigrationClassName', [1]], jobs = [['BackgroundMigrationClassName', [1]],
['BackgroundMigrationClassName', [2]]] ['BackgroundMigrationClassName', [2]]]
BackgroundMigrationWorker.bulk_perform_in(5.minutes, jobs) bulk_migrate_in(5.minutes, jobs)
``` ```
### Rescheduling background migrations ### Rescheduling background migrations
......
...@@ -688,7 +688,7 @@ module Gitlab ...@@ -688,7 +688,7 @@ module Gitlab
start_id, end_id = batch.pluck('MIN(id), MAX(id)').first start_id, end_id = batch.pluck('MIN(id), MAX(id)').first
max_index = index max_index = index
BackgroundMigrationWorker.perform_in( migrate_in(
index * interval, index * interval,
'CopyColumn', 'CopyColumn',
[table, column, temp_column, start_id, end_id] [table, column, temp_column, start_id, end_id]
...@@ -697,7 +697,7 @@ module Gitlab ...@@ -697,7 +697,7 @@ module Gitlab
# Schedule the renaming of the column to happen (initially) 1 hour after # Schedule the renaming of the column to happen (initially) 1 hour after
# the last batch finished. # the last batch finished.
BackgroundMigrationWorker.perform_in( migrate_in(
(max_index * interval) + 1.hour, (max_index * interval) + 1.hour,
'CleanupConcurrentTypeChange', 'CleanupConcurrentTypeChange',
[table, column, temp_column] [table, column, temp_column]
...@@ -779,7 +779,7 @@ module Gitlab ...@@ -779,7 +779,7 @@ module Gitlab
start_id, end_id = batch.pluck('MIN(id), MAX(id)').first start_id, end_id = batch.pluck('MIN(id), MAX(id)').first
max_index = index max_index = index
BackgroundMigrationWorker.perform_in( migrate_in(
index * interval, index * interval,
'CopyColumn', 'CopyColumn',
[table, old_column, new_column, start_id, end_id] [table, old_column, new_column, start_id, end_id]
...@@ -788,7 +788,7 @@ module Gitlab ...@@ -788,7 +788,7 @@ module Gitlab
# Schedule the renaming of the column to happen (initially) 1 hour after # Schedule the renaming of the column to happen (initially) 1 hour after
# the last batch finished. # the last batch finished.
BackgroundMigrationWorker.perform_in( migrate_in(
(max_index * interval) + 1.hour, (max_index * interval) + 1.hour,
'CleanupConcurrentRename', 'CleanupConcurrentRename',
[table, old_column, new_column] [table, old_column, new_column]
...@@ -1024,14 +1024,14 @@ into similar problems in the future (e.g. when new tables are created). ...@@ -1024,14 +1024,14 @@ into similar problems in the future (e.g. when new tables are created).
# We push multiple jobs at a time to reduce the time spent in # We push multiple jobs at a time to reduce the time spent in
# Sidekiq/Redis operations. We're using this buffer based approach so we # Sidekiq/Redis operations. We're using this buffer based approach so we
# don't need to run additional queries for every range. # don't need to run additional queries for every range.
BackgroundMigrationWorker.bulk_perform_async(jobs) bulk_migrate_async(jobs)
jobs.clear jobs.clear
end end
jobs << [job_class_name, [start_id, end_id]] jobs << [job_class_name, [start_id, end_id]]
end end
BackgroundMigrationWorker.bulk_perform_async(jobs) unless jobs.empty? bulk_migrate_async(jobs) unless jobs.empty?
end end
# Queues background migration jobs for an entire table, batched by ID range. # Queues background migration jobs for an entire table, batched by ID range.
...@@ -1074,7 +1074,7 @@ into similar problems in the future (e.g. when new tables are created). ...@@ -1074,7 +1074,7 @@ into similar problems in the future (e.g. when new tables are created).
# `BackgroundMigrationWorker.bulk_perform_in` schedules all jobs for # `BackgroundMigrationWorker.bulk_perform_in` schedules all jobs for
# the same time, which is not helpful in most cases where we wish to # the same time, which is not helpful in most cases where we wish to
# spread the work over time. # spread the work over time.
BackgroundMigrationWorker.perform_in(delay_interval * index, job_class_name, [start_id, end_id]) migrate_in(delay_interval * index, job_class_name, [start_id, end_id])
end end
end end
...@@ -1133,6 +1133,30 @@ into similar problems in the future (e.g. when new tables are created). ...@@ -1133,6 +1133,30 @@ into similar problems in the future (e.g. when new tables are created).
execute(sql) execute(sql)
end end
def migrate_async(*args)
with_migration_context do
BackgroundMigrationWorker.perform_async(*args)
end
end
def migrate_in(*args)
with_migration_context do
BackgroundMigrationWorker.perform_in(*args)
end
end
def bulk_migrate_in(*args)
with_migration_context do
BackgroundMigrationWorker.bulk_perform_in(*args)
end
end
def bulk_migrate_async(*args)
with_migration_context do
BackgroundMigrationWorker.bulk_perform_async(*args)
end
end
private private
def tables_match?(target_table, foreign_key_table) def tables_match?(target_table, foreign_key_table)
...@@ -1191,6 +1215,10 @@ into similar problems in the future (e.g. when new tables are created). ...@@ -1191,6 +1215,10 @@ into similar problems in the future (e.g. when new tables are created).
your migration class your migration class
ERROR ERROR
end end
def with_migration_context(&block)
Gitlab::ApplicationContext.with_context(caller_id: self.class.to_s, &block)
end
end end
end end
end end
# frozen_string_literal: true
require_relative '../../migration_helpers'
module RuboCop
module Cop
module Migration
class ScheduleAsync < RuboCop::Cop::Cop
include MigrationHelpers
ENFORCED_SINCE = 2020_02_12_00_00_00
MSG = <<~MSG
Don't call the background migration worker directly, use the `#migrate_async`,
`#migrate_in`, `#bulk_migrate_async` or `#bulk_migrate_in` migration helpers
instead.
MSG
def_node_matcher :calls_background_migration_worker?, <<~PATTERN
(send (const nil? :BackgroundMigrationWorker) {:perform_async :perform_in :bulk_perform_async :bulk_perform_in} ... )
PATTERN
def on_send(node)
return unless in_migration?(node)
return if version(node) < ENFORCED_SINCE
add_offense(node, location: :expression) if calls_background_migration_worker?(node)
end
def autocorrect(node)
# This gets rid of the receiver `BackgroundMigrationWorker` and
# replaces `perform` with `schedule`
schedule_method = method_name(node).to_s.sub('perform', 'migrate')
arguments = arguments(node).map(&:source).join(', ')
replacement = "#{schedule_method}(#{arguments})"
lambda do |corrector|
corrector.replace(node.source_range, replacement)
end
end
private
def method_name(node)
node.children.second
end
def arguments(node)
node.children[2..-1]
end
end
end
end
end
...@@ -10,6 +10,10 @@ module RuboCop ...@@ -10,6 +10,10 @@ module RuboCop
dirname(node).end_with?('db/post_migrate', 'db/geo/post_migrate') dirname(node).end_with?('db/post_migrate', 'db/geo/post_migrate')
end end
def version(node)
File.basename(node.location.expression.source_buffer.name).split('_').first.to_i
end
private private
def dirname(node) def dirname(node)
......
...@@ -1893,4 +1893,60 @@ describe Gitlab::Database::MigrationHelpers do ...@@ -1893,4 +1893,60 @@ describe Gitlab::Database::MigrationHelpers do
end end
end end
end end
describe '#migrate_async' do
it 'calls BackgroundMigrationWorker.perform_async' do
expect(BackgroundMigrationWorker).to receive(:perform_async).with("Class", "hello", "world")
model.migrate_async("Class", "hello", "world")
end
it 'pushes a context with the current class name as caller_id' do
expect(Gitlab::ApplicationContext).to receive(:with_context).with(caller_id: model.class.to_s)
model.migrate_async('Class', 'hello', 'world')
end
end
describe '#migrate_in' do
it 'calls BackgroundMigrationWorker.perform_in' do
expect(BackgroundMigrationWorker).to receive(:perform_in).with(10.minutes, 'Class', 'Hello', 'World')
model.migrate_in(10.minutes, 'Class', 'Hello', 'World')
end
it 'pushes a context with the current class name as caller_id' do
expect(Gitlab::ApplicationContext).to receive(:with_context).with(caller_id: model.class.to_s)
model.migrate_in(10.minutes, 'Class', 'Hello', 'World')
end
end
describe '#bulk_migrate_async' do
it 'calls BackgroundMigrationWorker.bulk_perform_async' do
expect(BackgroundMigrationWorker).to receive(:bulk_perform_async).with([%w(Class hello world)])
model.bulk_migrate_async([%w(Class hello world)])
end
it 'pushes a context with the current class name as caller_id' do
expect(Gitlab::ApplicationContext).to receive(:with_context).with(caller_id: model.class.to_s)
model.bulk_migrate_async([%w(Class hello world)])
end
end
describe '#bulk_migrate_in' do
it 'calls BackgroundMigrationWorker.bulk_perform_in_' do
expect(BackgroundMigrationWorker).to receive(:bulk_perform_in).with(10.minutes, [%w(Class hello world)])
model.bulk_migrate_in(10.minutes, [%w(Class hello world)])
end
it 'pushes a context with the current class name as caller_id' do
expect(Gitlab::ApplicationContext).to receive(:with_context).with(caller_id: model.class.to_s)
model.bulk_migrate_in(10.minutes, [%w(Class hello world)])
end
end
end end
# frozen_string_literal: true
require 'fast_spec_helper'
require 'rubocop'
require 'rubocop/rspec/support'
require_relative '../../../../rubocop/cop/migration/schedule_async'
describe RuboCop::Cop::Migration::ScheduleAsync do
include CopHelper
let(:cop) { described_class.new }
let(:source) do
<<~SOURCE
def up
BackgroundMigrationWorker.perform_async(ClazzName, "Bar", "Baz")
end
SOURCE
end
shared_examples 'a disabled cop' do
it 'does not register any offenses' do
inspect_source(source)
expect(cop.offenses).to be_empty
end
end
context 'outside of a migration' do
it_behaves_like 'a disabled cop'
end
context 'in a migration' do
before do
allow(cop).to receive(:in_migration?).and_return(true)
end
context 'in an old migration' do
before do
allow(cop).to receive(:version).and_return(described_class::ENFORCED_SINCE - 5)
end
it_behaves_like 'a disabled cop'
end
context 'that is recent' do
before do
allow(cop).to receive(:version).and_return(described_class::ENFORCED_SINCE + 5)
end
context 'BackgroundMigrationWorker.perform_async' do
it 'adds an offence when calling `BackgroundMigrationWorker.peform_async`' do
inspect_source(source)
expect(cop.offenses.size).to eq(1)
end
it 'autocorrects to the right version' do
correct_source = <<~CORRECT
def up
migrate_async(ClazzName, "Bar", "Baz")
end
CORRECT
expect(autocorrect_source(source)).to eq(correct_source)
end
end
context 'BackgroundMigrationWorker.perform_in' do
let(:source) do
<<~SOURCE
def up
BackgroundMigrationWorker
.perform_in(delay, ClazzName, "Bar", "Baz")
end
SOURCE
end
it 'adds an offence' do
inspect_source(source)
expect(cop.offenses.size).to eq(1)
end
it 'autocorrects to the right version' do
correct_source = <<~CORRECT
def up
migrate_in(delay, ClazzName, "Bar", "Baz")
end
CORRECT
expect(autocorrect_source(source)).to eq(correct_source)
end
end
context 'BackgroundMigrationWorker.bulk_perform_async' do
let(:source) do
<<~SOURCE
def up
BackgroundMigrationWorker
.bulk_perform_async(jobs)
end
SOURCE
end
it 'adds an offence' do
inspect_source(source)
expect(cop.offenses.size).to eq(1)
end
it 'autocorrects to the right version' do
correct_source = <<~CORRECT
def up
bulk_migrate_async(jobs)
end
CORRECT
expect(autocorrect_source(source)).to eq(correct_source)
end
end
context 'BackgroundMigrationWorker.bulk_perform_in' do
let(:source) do
<<~SOURCE
def up
BackgroundMigrationWorker
.bulk_perform_in(5.minutes, jobs)
end
SOURCE
end
it 'adds an offence' do
inspect_source(source)
expect(cop.offenses.size).to eq(1)
end
it 'autocorrects to the right version' do
correct_source = <<~CORRECT
def up
bulk_migrate_in(5.minutes, jobs)
end
CORRECT
expect(autocorrect_source(source)).to eq(correct_source)
end
end
end
end
end
# frozen_string_literal: true
require 'fast_spec_helper'
require 'rubocop'
require 'rspec-parameterized'
require_relative '../../rubocop/migration_helpers'
describe RuboCop::MigrationHelpers do
using RSpec::Parameterized::TableSyntax
subject(:fake_cop) { Class.new { include RuboCop::MigrationHelpers }.new }
let(:node) { double(:node) }
before do
allow(node).to receive_message_chain('location.expression.source_buffer.name')
.and_return(name)
end
describe '#in_migration?' do
where(:name, :expected) do
'/gitlab/db/migrate/20200210184420_create_operations_scopes_table.rb' | true
'/gitlab/db/post_migrate/20200210184420_create_operations_scopes_table.rb' | true
'/gitlab/db/geo/migrate/20200210184420_create_operations_scopes_table.rb' | true
'/gitlab/db/geo/post_migrate/20200210184420_create_operations_scopes_table.rb' | true
'/gitlab/db/elsewhere/20200210184420_create_operations_scopes_table.rb' | false
end
with_them do
it { expect(fake_cop.in_migration?(node)).to eq(expected) }
end
end
describe '#in_post_deployment_migration?' do
where(:name, :expected) do
'/gitlab/db/migrate/20200210184420_create_operations_scopes_table.rb' | false
'/gitlab/db/post_migrate/20200210184420_create_operations_scopes_table.rb' | true
'/gitlab/db/geo/migrate/20200210184420_create_operations_scopes_table.rb' | false
'/gitlab/db/geo/post_migrate/20200210184420_create_operations_scopes_table.rb' | true
'/gitlab/db/elsewhere/20200210184420_create_operations_scopes_table.rb' | false
end
with_them do
it { expect(fake_cop.in_post_deployment_migration?(node)).to eq(expected) }
end
end
describe "#version" do
let(:name) do
'/path/to/gitlab/db/migrate/20200210184420_create_operations_scopes_table.rb'
end
it { expect(fake_cop.version(node)).to eq(20200210184420) }
end
end
...@@ -11,7 +11,7 @@ describe BackgroundMigrationWorker, :clean_gitlab_redis_shared_state do ...@@ -11,7 +11,7 @@ describe BackgroundMigrationWorker, :clean_gitlab_redis_shared_state do
end end
end end
describe '.perform' do describe '#perform' do
it 'performs a background migration' do it 'performs a background migration' do
expect(Gitlab::BackgroundMigration) expect(Gitlab::BackgroundMigration)
.to receive(:perform) .to receive(:perform)
...@@ -52,6 +52,14 @@ describe BackgroundMigrationWorker, :clean_gitlab_redis_shared_state do ...@@ -52,6 +52,14 @@ describe BackgroundMigrationWorker, :clean_gitlab_redis_shared_state do
worker.perform('Foo', [10, 20]) worker.perform('Foo', [10, 20])
end end
it 'sets the class that will be executed as the caller_id' do
expect(Gitlab::BackgroundMigration).to receive(:perform) do
expect(Labkit::Context.current.to_h).to include('meta.caller_id' => 'Foo')
end
worker.perform('Foo', [10, 20])
end
end end
describe '#healthy_database?' do describe '#healthy_database?' do
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment