Commit 9016f1df authored by pbair's avatar pbair

Generic BG migration jobs use the correct context

Introduce a new base class for background migration jobs that is
initialized with the current execution context of the migration.

Also update generic migration jobs to use the provided execution context
rather than referencing the workers or base models directly.
parent 0d70ef75
# frozen_string_literal: true
module Gitlab
module BackgroundMigration
# Simple base class for background migration job classes which are executed through the sidekiq queue.
#
# Any job class that inherits from the base class will have connection to the tracking database set on
# initialization.
class BaseJob
def initialize(connection:)
@connection = connection
end
def perform(*arguments)
raise NotImplementedError, "subclasses of #{self.class.name} must implement #{__method__}"
end
private
attr_reader :connection
end
end
end
...@@ -81,7 +81,7 @@ module Gitlab ...@@ -81,7 +81,7 @@ module Gitlab
def perform(class_name, arguments) def perform(class_name, arguments)
with_shared_connection do with_shared_connection do
migration_class_for(class_name).new.perform(*arguments) migration_instance_for(class_name).perform(*arguments)
end end
end end
...@@ -115,6 +115,16 @@ module Gitlab ...@@ -115,6 +115,16 @@ module Gitlab
enqueued_job?([retry_set], migration_class) enqueued_job?([retry_set], migration_class)
end end
def migration_instance_for(class_name)
migration_class = migration_class_for(class_name)
if migration_class < Gitlab::BackgroundMigration::BaseJob
migration_class.new(connection: connection)
else
migration_class.new
end
end
def migration_class_for(class_name) def migration_class_for(class_name)
Gitlab::BackgroundMigration.const_get(class_name, false) Gitlab::BackgroundMigration.const_get(class_name, false)
end end
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
module Gitlab module Gitlab
module Database module Database
class BackgroundMigrationJob < ActiveRecord::Base # rubocop:disable Rails/ApplicationRecord class BackgroundMigrationJob < SharedModel
include EachBatch include EachBatch
include BulkInsertSafe include BulkInsertSafe
......
...@@ -4,7 +4,7 @@ module Gitlab ...@@ -4,7 +4,7 @@ module Gitlab
module Database module Database
module PartitioningMigrationHelpers module PartitioningMigrationHelpers
# Class that will generically copy data from a given table into its corresponding partitioned table # Class that will generically copy data from a given table into its corresponding partitioned table
class BackfillPartitionedTable class BackfillPartitionedTable < ::Gitlab::BackgroundMigration::BaseJob
include ::Gitlab::Database::DynamicModelHelpers include ::Gitlab::Database::DynamicModelHelpers
SUB_BATCH_SIZE = 2_500 SUB_BATCH_SIZE = 2_500
...@@ -21,7 +21,7 @@ module Gitlab ...@@ -21,7 +21,7 @@ module Gitlab
return return
end end
bulk_copy = BulkCopy.new(source_table, partitioned_table, source_column) bulk_copy = BulkCopy.new(source_table, partitioned_table, source_column, connection: connection)
parent_batch_relation = relation_scoped_to_range(source_table, source_column, start_id, stop_id) parent_batch_relation = relation_scoped_to_range(source_table, source_column, start_id, stop_id)
parent_batch_relation.each_batch(of: SUB_BATCH_SIZE) do |sub_batch| parent_batch_relation.each_batch(of: SUB_BATCH_SIZE) do |sub_batch|
...@@ -36,10 +36,6 @@ module Gitlab ...@@ -36,10 +36,6 @@ module Gitlab
private private
def connection
ActiveRecord::Base.connection
end
def transaction_open? def transaction_open?
connection.transaction_open? connection.transaction_open?
end end
...@@ -53,7 +49,8 @@ module Gitlab ...@@ -53,7 +49,8 @@ module Gitlab
end end
def relation_scoped_to_range(source_table, source_key_column, start_id, stop_id) def relation_scoped_to_range(source_table, source_key_column, start_id, stop_id)
define_batchable_model(source_table).where(source_key_column => start_id..stop_id) define_batchable_model(source_table)
.where(source_key_column => start_id..stop_id)
end end
def mark_jobs_as_succeeded(*arguments) def mark_jobs_as_succeeded(*arguments)
...@@ -64,12 +61,13 @@ module Gitlab ...@@ -64,12 +61,13 @@ module Gitlab
class BulkCopy class BulkCopy
DELIMITER = ', ' DELIMITER = ', '
attr_reader :source_table, :destination_table, :source_column attr_reader :source_table, :destination_table, :source_column, :connection
def initialize(source_table, destination_table, source_column) def initialize(source_table, destination_table, source_column, connection:)
@source_table = source_table @source_table = source_table
@destination_table = destination_table @destination_table = destination_table
@source_column = source_column @source_column = source_column
@connection = connection
end end
def copy_between(start_id, stop_id) def copy_between(start_id, stop_id)
...@@ -85,10 +83,6 @@ module Gitlab ...@@ -85,10 +83,6 @@ module Gitlab
private private
def connection
@connection ||= ActiveRecord::Base.connection
end
def column_listing def column_listing
@column_listing ||= connection.columns(source_table).map(&:name).join(DELIMITER) @column_listing ||= connection.columns(source_table).map(&:name).join(DELIMITER)
end end
......
...@@ -406,7 +406,8 @@ module Gitlab ...@@ -406,7 +406,8 @@ module Gitlab
end end
def copy_missed_records(source_table_name, partitioned_table_name, source_column) def copy_missed_records(source_table_name, partitioned_table_name, source_column)
backfill_table = BackfillPartitionedTable.new backfill_table = BackfillPartitionedTable.new(connection: connection)
relation = ::Gitlab::Database::BackgroundMigrationJob.pending relation = ::Gitlab::Database::BackgroundMigrationJob.pending
.for_partitioning_migration(MIGRATION_CLASS_NAME, source_table_name) .for_partitioning_migration(MIGRATION_CLASS_NAME, source_table_name)
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::BackgroundMigration::BaseJob, '#perform' do
let(:connection) { double(:connection) }
let(:test_job_class) { Class.new(described_class) }
let(:test_job) { test_job_class.new(connection: connection) }
describe '#perform' do
it 'raises an error if not overridden by a subclass' do
expect { test_job.perform }.to raise_error(NotImplementedError, /must implement perform/)
end
end
end
...@@ -202,23 +202,50 @@ RSpec.describe Gitlab::BackgroundMigration::JobCoordinator do ...@@ -202,23 +202,50 @@ RSpec.describe Gitlab::BackgroundMigration::JobCoordinator do
end end
describe '#perform' do describe '#perform' do
let(:migration) { spy(:migration) } let(:connection) { double(:connection) }
let(:connection) { double('connection') }
before do before do
stub_const('Gitlab::BackgroundMigration::Foo', migration)
allow(coordinator).to receive(:connection).and_return(connection) allow(coordinator).to receive(:connection).and_return(connection)
end end
it 'performs a background migration with the configured shared connection' do context 'when the background migration does not inherit from BaseJob' do
expect(coordinator).to receive(:with_shared_connection).and_call_original let(:migration_class) { Class.new }
before do
stub_const('Gitlab::BackgroundMigration::Foo', migration_class)
end
it 'performs a background migration with the configured shared connection' do
expect(coordinator).to receive(:with_shared_connection).and_call_original
expect_next_instance_of(migration_class) do |migration|
expect(migration).to receive(:perform).with(10, 20).once do
expect(Gitlab::Database::SharedModel.connection).to be(connection)
end
end
coordinator.perform('Foo', [10, 20])
end
end
context 'when the background migration inherits from BaseJob' do
let(:migration_class) { Class.new(::Gitlab::BackgroundMigration::BaseJob) }
let(:migration) { double(:migration) }
expect(migration).to receive(:perform).with(10, 20).once do before do
expect(Gitlab::Database::SharedModel.connection).to be(connection) stub_const('Gitlab::BackgroundMigration::Foo', migration_class)
end end
coordinator.perform('Foo', [10, 20]) it 'passes the correct connection when constructing the migration' do
expect(coordinator).to receive(:with_shared_connection).and_call_original
expect(migration_class).to receive(:new).with(connection: connection).and_return(migration)
expect(migration).to receive(:perform).with(10, 20).once do
expect(Gitlab::Database::SharedModel.connection).to be(connection)
end
coordinator.perform('Foo', [10, 20])
end
end end
end end
......
...@@ -5,6 +5,8 @@ require 'spec_helper' ...@@ -5,6 +5,8 @@ require 'spec_helper'
RSpec.describe Gitlab::Database::BackgroundMigrationJob do RSpec.describe Gitlab::Database::BackgroundMigrationJob do
it_behaves_like 'having unique enum values' it_behaves_like 'having unique enum values'
it { is_expected.to be_a Gitlab::Database::SharedModel }
describe '.for_migration_execution' do describe '.for_migration_execution' do
let!(:job1) { create(:background_migration_job) } let!(:job1) { create(:background_migration_job) }
let!(:job2) { create(:background_migration_job, arguments: ['hi', 2]) } let!(:job2) { create(:background_migration_job, arguments: ['hi', 2]) }
......
...@@ -3,14 +3,15 @@ ...@@ -3,14 +3,15 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartitionedTable, '#perform' do RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartitionedTable, '#perform' do
subject { described_class.new } subject(:backfill_job) { described_class.new(connection: connection) }
let(:connection) { ActiveRecord::Base.connection }
let(:source_table) { '_test_partitioning_backfills' } let(:source_table) { '_test_partitioning_backfills' }
let(:destination_table) { "#{source_table}_part" } let(:destination_table) { "#{source_table}_part" }
let(:unique_key) { 'id' } let(:unique_key) { 'id' }
before do before do
allow(subject).to receive(:transaction_open?).and_return(false) allow(backfill_job).to receive(:transaction_open?).and_return(false)
end end
context 'when the destination table exists' do context 'when the destination table exists' do
...@@ -50,10 +51,9 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartition ...@@ -50,10 +51,9 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartition
stub_const("#{described_class}::SUB_BATCH_SIZE", 2) stub_const("#{described_class}::SUB_BATCH_SIZE", 2)
stub_const("#{described_class}::PAUSE_SECONDS", pause_seconds) stub_const("#{described_class}::PAUSE_SECONDS", pause_seconds)
allow(subject).to receive(:sleep) allow(backfill_job).to receive(:sleep)
end end
let(:connection) { ActiveRecord::Base.connection }
let(:source_model) { Class.new(ActiveRecord::Base) } let(:source_model) { Class.new(ActiveRecord::Base) }
let(:destination_model) { Class.new(ActiveRecord::Base) } let(:destination_model) { Class.new(ActiveRecord::Base) }
let(:timestamp) { Time.utc(2020, 1, 2).round } let(:timestamp) { Time.utc(2020, 1, 2).round }
...@@ -66,7 +66,7 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartition ...@@ -66,7 +66,7 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartition
it 'copies data into the destination table idempotently' do it 'copies data into the destination table idempotently' do
expect(destination_model.count).to eq(0) expect(destination_model.count).to eq(0)
subject.perform(source1.id, source3.id, source_table, destination_table, unique_key) backfill_job.perform(source1.id, source3.id, source_table, destination_table, unique_key)
expect(destination_model.count).to eq(3) expect(destination_model.count).to eq(3)
...@@ -76,7 +76,7 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartition ...@@ -76,7 +76,7 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartition
expect(destination_record.attributes).to eq(source_record.attributes) expect(destination_record.attributes).to eq(source_record.attributes)
end end
subject.perform(source1.id, source3.id, source_table, destination_table, unique_key) backfill_job.perform(source1.id, source3.id, source_table, destination_table, unique_key)
expect(destination_model.count).to eq(3) expect(destination_model.count).to eq(3)
end end
...@@ -87,13 +87,13 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartition ...@@ -87,13 +87,13 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartition
expect(bulk_copy).to receive(:copy_between).with(source3.id, source3.id) expect(bulk_copy).to receive(:copy_between).with(source3.id, source3.id)
end end
subject.perform(source1.id, source3.id, source_table, destination_table, unique_key) backfill_job.perform(source1.id, source3.id, source_table, destination_table, unique_key)
end end
it 'pauses after copying each sub-batch' do it 'pauses after copying each sub-batch' do
expect(subject).to receive(:sleep).with(pause_seconds).twice expect(backfill_job).to receive(:sleep).with(pause_seconds).twice
subject.perform(source1.id, source3.id, source_table, destination_table, unique_key) backfill_job.perform(source1.id, source3.id, source_table, destination_table, unique_key)
end end
it 'marks each job record as succeeded after processing' do it 'marks each job record as succeeded after processing' do
...@@ -103,7 +103,7 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartition ...@@ -103,7 +103,7 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartition
expect(::Gitlab::Database::BackgroundMigrationJob).to receive(:mark_all_as_succeeded).and_call_original expect(::Gitlab::Database::BackgroundMigrationJob).to receive(:mark_all_as_succeeded).and_call_original
expect do expect do
subject.perform(source1.id, source3.id, source_table, destination_table, unique_key) backfill_job.perform(source1.id, source3.id, source_table, destination_table, unique_key)
end.to change { ::Gitlab::Database::BackgroundMigrationJob.succeeded.count }.from(0).to(1) end.to change { ::Gitlab::Database::BackgroundMigrationJob.succeeded.count }.from(0).to(1)
end end
...@@ -111,24 +111,24 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartition ...@@ -111,24 +111,24 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartition
create(:background_migration_job, class_name: "::#{described_class.name}", create(:background_migration_job, class_name: "::#{described_class.name}",
arguments: [source1.id, source3.id, source_table, destination_table, unique_key]) arguments: [source1.id, source3.id, source_table, destination_table, unique_key])
jobs_updated = subject.perform(source1.id, source3.id, source_table, destination_table, unique_key) jobs_updated = backfill_job.perform(source1.id, source3.id, source_table, destination_table, unique_key)
expect(jobs_updated).to eq(1) expect(jobs_updated).to eq(1)
end end
context 'when the job is run within an explicit transaction block' do context 'when the job is run within an explicit transaction block' do
let(:mock_connection) { double('connection') } subject(:backfill_job) { described_class.new(connection: mock_connection) }
before do let(:mock_connection) { double('connection') }
allow(subject).to receive(:connection).and_return(mock_connection)
allow(subject).to receive(:transaction_open?).and_return(true)
end
it 'raises an error before copying data' do it 'raises an error before copying data' do
expect(backfill_job).to receive(:transaction_open?).and_call_original
expect(mock_connection).to receive(:transaction_open?).and_return(true)
expect(mock_connection).not_to receive(:execute) expect(mock_connection).not_to receive(:execute)
expect do expect do
subject.perform(1, 100, source_table, destination_table, unique_key) backfill_job.perform(1, 100, source_table, destination_table, unique_key)
end.to raise_error(/Aborting job to backfill partitioned #{source_table}/) end.to raise_error(/Aborting job to backfill partitioned #{source_table}/)
expect(destination_model.count).to eq(0) expect(destination_model.count).to eq(0)
...@@ -137,24 +137,25 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartition ...@@ -137,24 +137,25 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartition
end end
context 'when the destination table does not exist' do context 'when the destination table does not exist' do
subject(:backfill_job) { described_class.new(connection: mock_connection) }
let(:mock_connection) { double('connection') } let(:mock_connection) { double('connection') }
let(:mock_logger) { double('logger') } let(:mock_logger) { double('logger') }
before do before do
allow(subject).to receive(:connection).and_return(mock_connection) allow(backfill_job).to receive(:logger).and_return(mock_logger)
allow(subject).to receive(:logger).and_return(mock_logger)
expect(mock_connection).to receive(:table_exists?).with(destination_table).and_return(false)
allow(mock_logger).to receive(:warn) allow(mock_logger).to receive(:warn)
end end
it 'exits without attempting to copy data' do it 'exits without attempting to copy data' do
expect(mock_connection).to receive(:table_exists?).with(destination_table).and_return(false)
expect(mock_connection).not_to receive(:execute) expect(mock_connection).not_to receive(:execute)
subject.perform(1, 100, source_table, destination_table, unique_key) subject.perform(1, 100, source_table, destination_table, unique_key)
end end
it 'logs a warning message that the job was skipped' do it 'logs a warning message that the job was skipped' do
expect(mock_connection).to receive(:table_exists?).with(destination_table).and_return(false)
expect(mock_logger).to receive(:warn).with(/#{destination_table} does not exist/) expect(mock_logger).to receive(:warn).with(/#{destination_table} does not exist/)
subject.perform(1, 100, source_table, destination_table, unique_key) subject.perform(1, 100, source_table, destination_table, unique_key)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment