Commit d51c931d authored by Douglas Barbosa Alexandre's avatar Douglas Barbosa Alexandre

Merge branch 'pb-provide-context-for-bg-migrations' into 'master'

Generic BG migration jobs use the correct context

See merge request gitlab-org/gitlab!77717
parents 59a43d0b 9016f1df
# frozen_string_literal: true
module Gitlab
module BackgroundMigration
# Simple base class for background migration job classes which are executed through the sidekiq queue.
#
# Any job class that inherits from the base class will have connection to the tracking database set on
# initialization.
class BaseJob
def initialize(connection:)
@connection = connection
end
def perform(*arguments)
raise NotImplementedError, "subclasses of #{self.class.name} must implement #{__method__}"
end
private
attr_reader :connection
end
end
end
...@@ -81,7 +81,7 @@ module Gitlab ...@@ -81,7 +81,7 @@ module Gitlab
def perform(class_name, arguments) def perform(class_name, arguments)
with_shared_connection do with_shared_connection do
migration_class_for(class_name).new.perform(*arguments) migration_instance_for(class_name).perform(*arguments)
end end
end end
...@@ -115,6 +115,16 @@ module Gitlab ...@@ -115,6 +115,16 @@ module Gitlab
enqueued_job?([retry_set], migration_class) enqueued_job?([retry_set], migration_class)
end end
def migration_instance_for(class_name)
migration_class = migration_class_for(class_name)
if migration_class < Gitlab::BackgroundMigration::BaseJob
migration_class.new(connection: connection)
else
migration_class.new
end
end
def migration_class_for(class_name) def migration_class_for(class_name)
Gitlab::BackgroundMigration.const_get(class_name, false) Gitlab::BackgroundMigration.const_get(class_name, false)
end end
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
module Gitlab module Gitlab
module Database module Database
class BackgroundMigrationJob < ActiveRecord::Base # rubocop:disable Rails/ApplicationRecord class BackgroundMigrationJob < SharedModel
include EachBatch include EachBatch
include BulkInsertSafe include BulkInsertSafe
......
...@@ -4,7 +4,7 @@ module Gitlab ...@@ -4,7 +4,7 @@ module Gitlab
module Database module Database
module PartitioningMigrationHelpers module PartitioningMigrationHelpers
# Class that will generically copy data from a given table into its corresponding partitioned table # Class that will generically copy data from a given table into its corresponding partitioned table
class BackfillPartitionedTable class BackfillPartitionedTable < ::Gitlab::BackgroundMigration::BaseJob
include ::Gitlab::Database::DynamicModelHelpers include ::Gitlab::Database::DynamicModelHelpers
SUB_BATCH_SIZE = 2_500 SUB_BATCH_SIZE = 2_500
...@@ -21,7 +21,7 @@ module Gitlab ...@@ -21,7 +21,7 @@ module Gitlab
return return
end end
bulk_copy = BulkCopy.new(source_table, partitioned_table, source_column) bulk_copy = BulkCopy.new(source_table, partitioned_table, source_column, connection: connection)
parent_batch_relation = relation_scoped_to_range(source_table, source_column, start_id, stop_id) parent_batch_relation = relation_scoped_to_range(source_table, source_column, start_id, stop_id)
parent_batch_relation.each_batch(of: SUB_BATCH_SIZE) do |sub_batch| parent_batch_relation.each_batch(of: SUB_BATCH_SIZE) do |sub_batch|
...@@ -36,10 +36,6 @@ module Gitlab ...@@ -36,10 +36,6 @@ module Gitlab
private private
def connection
ActiveRecord::Base.connection
end
def transaction_open? def transaction_open?
connection.transaction_open? connection.transaction_open?
end end
...@@ -53,7 +49,8 @@ module Gitlab ...@@ -53,7 +49,8 @@ module Gitlab
end end
def relation_scoped_to_range(source_table, source_key_column, start_id, stop_id) def relation_scoped_to_range(source_table, source_key_column, start_id, stop_id)
define_batchable_model(source_table).where(source_key_column => start_id..stop_id) define_batchable_model(source_table)
.where(source_key_column => start_id..stop_id)
end end
def mark_jobs_as_succeeded(*arguments) def mark_jobs_as_succeeded(*arguments)
...@@ -64,12 +61,13 @@ module Gitlab ...@@ -64,12 +61,13 @@ module Gitlab
class BulkCopy class BulkCopy
DELIMITER = ', ' DELIMITER = ', '
attr_reader :source_table, :destination_table, :source_column attr_reader :source_table, :destination_table, :source_column, :connection
def initialize(source_table, destination_table, source_column) def initialize(source_table, destination_table, source_column, connection:)
@source_table = source_table @source_table = source_table
@destination_table = destination_table @destination_table = destination_table
@source_column = source_column @source_column = source_column
@connection = connection
end end
def copy_between(start_id, stop_id) def copy_between(start_id, stop_id)
...@@ -85,10 +83,6 @@ module Gitlab ...@@ -85,10 +83,6 @@ module Gitlab
private private
def connection
@connection ||= ActiveRecord::Base.connection
end
def column_listing def column_listing
@column_listing ||= connection.columns(source_table).map(&:name).join(DELIMITER) @column_listing ||= connection.columns(source_table).map(&:name).join(DELIMITER)
end end
......
...@@ -406,7 +406,8 @@ module Gitlab ...@@ -406,7 +406,8 @@ module Gitlab
end end
def copy_missed_records(source_table_name, partitioned_table_name, source_column) def copy_missed_records(source_table_name, partitioned_table_name, source_column)
backfill_table = BackfillPartitionedTable.new backfill_table = BackfillPartitionedTable.new(connection: connection)
relation = ::Gitlab::Database::BackgroundMigrationJob.pending relation = ::Gitlab::Database::BackgroundMigrationJob.pending
.for_partitioning_migration(MIGRATION_CLASS_NAME, source_table_name) .for_partitioning_migration(MIGRATION_CLASS_NAME, source_table_name)
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::BackgroundMigration::BaseJob, '#perform' do
let(:connection) { double(:connection) }
let(:test_job_class) { Class.new(described_class) }
let(:test_job) { test_job_class.new(connection: connection) }
describe '#perform' do
it 'raises an error if not overridden by a subclass' do
expect { test_job.perform }.to raise_error(NotImplementedError, /must implement perform/)
end
end
end
...@@ -202,23 +202,50 @@ RSpec.describe Gitlab::BackgroundMigration::JobCoordinator do ...@@ -202,23 +202,50 @@ RSpec.describe Gitlab::BackgroundMigration::JobCoordinator do
end end
describe '#perform' do describe '#perform' do
let(:migration) { spy(:migration) } let(:connection) { double(:connection) }
let(:connection) { double('connection') }
before do before do
stub_const('Gitlab::BackgroundMigration::Foo', migration)
allow(coordinator).to receive(:connection).and_return(connection) allow(coordinator).to receive(:connection).and_return(connection)
end end
it 'performs a background migration with the configured shared connection' do context 'when the background migration does not inherit from BaseJob' do
expect(coordinator).to receive(:with_shared_connection).and_call_original let(:migration_class) { Class.new }
before do
stub_const('Gitlab::BackgroundMigration::Foo', migration_class)
end
it 'performs a background migration with the configured shared connection' do
expect(coordinator).to receive(:with_shared_connection).and_call_original
expect_next_instance_of(migration_class) do |migration|
expect(migration).to receive(:perform).with(10, 20).once do
expect(Gitlab::Database::SharedModel.connection).to be(connection)
end
end
coordinator.perform('Foo', [10, 20])
end
end
context 'when the background migration inherits from BaseJob' do
let(:migration_class) { Class.new(::Gitlab::BackgroundMigration::BaseJob) }
let(:migration) { double(:migration) }
expect(migration).to receive(:perform).with(10, 20).once do before do
expect(Gitlab::Database::SharedModel.connection).to be(connection) stub_const('Gitlab::BackgroundMigration::Foo', migration_class)
end end
coordinator.perform('Foo', [10, 20]) it 'passes the correct connection when constructing the migration' do
expect(coordinator).to receive(:with_shared_connection).and_call_original
expect(migration_class).to receive(:new).with(connection: connection).and_return(migration)
expect(migration).to receive(:perform).with(10, 20).once do
expect(Gitlab::Database::SharedModel.connection).to be(connection)
end
coordinator.perform('Foo', [10, 20])
end
end end
end end
......
...@@ -5,6 +5,8 @@ require 'spec_helper' ...@@ -5,6 +5,8 @@ require 'spec_helper'
RSpec.describe Gitlab::Database::BackgroundMigrationJob do RSpec.describe Gitlab::Database::BackgroundMigrationJob do
it_behaves_like 'having unique enum values' it_behaves_like 'having unique enum values'
it { is_expected.to be_a Gitlab::Database::SharedModel }
describe '.for_migration_execution' do describe '.for_migration_execution' do
let!(:job1) { create(:background_migration_job) } let!(:job1) { create(:background_migration_job) }
let!(:job2) { create(:background_migration_job, arguments: ['hi', 2]) } let!(:job2) { create(:background_migration_job, arguments: ['hi', 2]) }
......
...@@ -3,14 +3,15 @@ ...@@ -3,14 +3,15 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartitionedTable, '#perform' do RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartitionedTable, '#perform' do
subject { described_class.new } subject(:backfill_job) { described_class.new(connection: connection) }
let(:connection) { ActiveRecord::Base.connection }
let(:source_table) { '_test_partitioning_backfills' } let(:source_table) { '_test_partitioning_backfills' }
let(:destination_table) { "#{source_table}_part" } let(:destination_table) { "#{source_table}_part" }
let(:unique_key) { 'id' } let(:unique_key) { 'id' }
before do before do
allow(subject).to receive(:transaction_open?).and_return(false) allow(backfill_job).to receive(:transaction_open?).and_return(false)
end end
context 'when the destination table exists' do context 'when the destination table exists' do
...@@ -50,10 +51,9 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartition ...@@ -50,10 +51,9 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartition
stub_const("#{described_class}::SUB_BATCH_SIZE", 2) stub_const("#{described_class}::SUB_BATCH_SIZE", 2)
stub_const("#{described_class}::PAUSE_SECONDS", pause_seconds) stub_const("#{described_class}::PAUSE_SECONDS", pause_seconds)
allow(subject).to receive(:sleep) allow(backfill_job).to receive(:sleep)
end end
let(:connection) { ActiveRecord::Base.connection }
let(:source_model) { Class.new(ActiveRecord::Base) } let(:source_model) { Class.new(ActiveRecord::Base) }
let(:destination_model) { Class.new(ActiveRecord::Base) } let(:destination_model) { Class.new(ActiveRecord::Base) }
let(:timestamp) { Time.utc(2020, 1, 2).round } let(:timestamp) { Time.utc(2020, 1, 2).round }
...@@ -66,7 +66,7 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartition ...@@ -66,7 +66,7 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartition
it 'copies data into the destination table idempotently' do it 'copies data into the destination table idempotently' do
expect(destination_model.count).to eq(0) expect(destination_model.count).to eq(0)
subject.perform(source1.id, source3.id, source_table, destination_table, unique_key) backfill_job.perform(source1.id, source3.id, source_table, destination_table, unique_key)
expect(destination_model.count).to eq(3) expect(destination_model.count).to eq(3)
...@@ -76,7 +76,7 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartition ...@@ -76,7 +76,7 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartition
expect(destination_record.attributes).to eq(source_record.attributes) expect(destination_record.attributes).to eq(source_record.attributes)
end end
subject.perform(source1.id, source3.id, source_table, destination_table, unique_key) backfill_job.perform(source1.id, source3.id, source_table, destination_table, unique_key)
expect(destination_model.count).to eq(3) expect(destination_model.count).to eq(3)
end end
...@@ -87,13 +87,13 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartition ...@@ -87,13 +87,13 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartition
expect(bulk_copy).to receive(:copy_between).with(source3.id, source3.id) expect(bulk_copy).to receive(:copy_between).with(source3.id, source3.id)
end end
subject.perform(source1.id, source3.id, source_table, destination_table, unique_key) backfill_job.perform(source1.id, source3.id, source_table, destination_table, unique_key)
end end
it 'pauses after copying each sub-batch' do it 'pauses after copying each sub-batch' do
expect(subject).to receive(:sleep).with(pause_seconds).twice expect(backfill_job).to receive(:sleep).with(pause_seconds).twice
subject.perform(source1.id, source3.id, source_table, destination_table, unique_key) backfill_job.perform(source1.id, source3.id, source_table, destination_table, unique_key)
end end
it 'marks each job record as succeeded after processing' do it 'marks each job record as succeeded after processing' do
...@@ -103,7 +103,7 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartition ...@@ -103,7 +103,7 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartition
expect(::Gitlab::Database::BackgroundMigrationJob).to receive(:mark_all_as_succeeded).and_call_original expect(::Gitlab::Database::BackgroundMigrationJob).to receive(:mark_all_as_succeeded).and_call_original
expect do expect do
subject.perform(source1.id, source3.id, source_table, destination_table, unique_key) backfill_job.perform(source1.id, source3.id, source_table, destination_table, unique_key)
end.to change { ::Gitlab::Database::BackgroundMigrationJob.succeeded.count }.from(0).to(1) end.to change { ::Gitlab::Database::BackgroundMigrationJob.succeeded.count }.from(0).to(1)
end end
...@@ -111,24 +111,24 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartition ...@@ -111,24 +111,24 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartition
create(:background_migration_job, class_name: "::#{described_class.name}", create(:background_migration_job, class_name: "::#{described_class.name}",
arguments: [source1.id, source3.id, source_table, destination_table, unique_key]) arguments: [source1.id, source3.id, source_table, destination_table, unique_key])
jobs_updated = subject.perform(source1.id, source3.id, source_table, destination_table, unique_key) jobs_updated = backfill_job.perform(source1.id, source3.id, source_table, destination_table, unique_key)
expect(jobs_updated).to eq(1) expect(jobs_updated).to eq(1)
end end
context 'when the job is run within an explicit transaction block' do context 'when the job is run within an explicit transaction block' do
let(:mock_connection) { double('connection') } subject(:backfill_job) { described_class.new(connection: mock_connection) }
before do let(:mock_connection) { double('connection') }
allow(subject).to receive(:connection).and_return(mock_connection)
allow(subject).to receive(:transaction_open?).and_return(true)
end
it 'raises an error before copying data' do it 'raises an error before copying data' do
expect(backfill_job).to receive(:transaction_open?).and_call_original
expect(mock_connection).to receive(:transaction_open?).and_return(true)
expect(mock_connection).not_to receive(:execute) expect(mock_connection).not_to receive(:execute)
expect do expect do
subject.perform(1, 100, source_table, destination_table, unique_key) backfill_job.perform(1, 100, source_table, destination_table, unique_key)
end.to raise_error(/Aborting job to backfill partitioned #{source_table}/) end.to raise_error(/Aborting job to backfill partitioned #{source_table}/)
expect(destination_model.count).to eq(0) expect(destination_model.count).to eq(0)
...@@ -137,24 +137,25 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartition ...@@ -137,24 +137,25 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartition
end end
context 'when the destination table does not exist' do context 'when the destination table does not exist' do
subject(:backfill_job) { described_class.new(connection: mock_connection) }
let(:mock_connection) { double('connection') } let(:mock_connection) { double('connection') }
let(:mock_logger) { double('logger') } let(:mock_logger) { double('logger') }
before do before do
allow(subject).to receive(:connection).and_return(mock_connection) allow(backfill_job).to receive(:logger).and_return(mock_logger)
allow(subject).to receive(:logger).and_return(mock_logger)
expect(mock_connection).to receive(:table_exists?).with(destination_table).and_return(false)
allow(mock_logger).to receive(:warn) allow(mock_logger).to receive(:warn)
end end
it 'exits without attempting to copy data' do it 'exits without attempting to copy data' do
expect(mock_connection).to receive(:table_exists?).with(destination_table).and_return(false)
expect(mock_connection).not_to receive(:execute) expect(mock_connection).not_to receive(:execute)
subject.perform(1, 100, source_table, destination_table, unique_key) subject.perform(1, 100, source_table, destination_table, unique_key)
end end
it 'logs a warning message that the job was skipped' do it 'logs a warning message that the job was skipped' do
expect(mock_connection).to receive(:table_exists?).with(destination_table).and_return(false)
expect(mock_logger).to receive(:warn).with(/#{destination_table} does not exist/) expect(mock_logger).to receive(:warn).with(/#{destination_table} does not exist/)
subject.perform(1, 100, source_table, destination_table, unique_key) subject.perform(1, 100, source_table, destination_table, unique_key)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment