Commit c7fe7c79 authored by Mayra Cabrera's avatar Mayra Cabrera

Merge branch...

Merge branch '218428-use-background-migration-to-copy-historic-data-between-non-partitioned-and-partitioned-tables' into 'master'

Use BG Migration to copy historic data to a partitioned table

See merge request gitlab-org/gitlab!33891
parents 1b91dd8e 460acd7d
# frozen_string_literal: true
module Gitlab
module Database
module DynamicModelHelpers
def define_batchable_model(table_name)
Class.new(ActiveRecord::Base) do
include EachBatch
self.table_name = table_name
self.inheritance_column = :_type_disabled
end
end
end
end
end
......@@ -3,10 +3,10 @@
module Gitlab
module Database
module MigrationHelpers
include Migrations::BackgroundMigrationHelpers
# https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS
MAX_IDENTIFIER_NAME_LENGTH = 63
BACKGROUND_MIGRATION_BATCH_SIZE = 1000 # Number of rows to process per job
BACKGROUND_MIGRATION_JOB_BUFFER_SIZE = 1000 # Number of jobs to bulk queue at a time
PERMITTED_TIMESTAMP_COLUMNS = %i[created_at updated_at deleted_at].to_set.freeze
DEFAULT_TIMESTAMP_COLUMNS = %i[created_at updated_at].freeze
......@@ -786,10 +786,6 @@ module Gitlab
end
end
def perform_background_migration_inline?
Rails.env.test? || Rails.env.development?
end
# Performs a concurrent column rename when using PostgreSQL.
def install_rename_triggers_for_postgresql(trigger, table, old, new)
execute <<-EOF.strip_heredoc
......@@ -973,106 +969,6 @@ into similar problems in the future (e.g. when new tables are created).
end
end
# Bulk queues background migration jobs for an entire table, batched by ID range.
# "Bulk" meaning many jobs will be pushed at a time for efficiency.
# If you need a delay interval per job, then use `queue_background_migration_jobs_by_range_at_intervals`.
#
# model_class - The table being iterated over
# job_class_name - The background migration job class as a string
# batch_size - The maximum number of rows per job
#
# Example:
#
# class Route < ActiveRecord::Base
# include EachBatch
# self.table_name = 'routes'
# end
#
# bulk_queue_background_migration_jobs_by_range(Route, 'ProcessRoutes')
#
# Where the model_class includes EachBatch, and the background migration exists:
#
# class Gitlab::BackgroundMigration::ProcessRoutes
# def perform(start_id, end_id)
# # do something
# end
# end
def bulk_queue_background_migration_jobs_by_range(model_class, job_class_name, batch_size: BACKGROUND_MIGRATION_BATCH_SIZE)
raise "#{model_class} does not have an ID to use for batch ranges" unless model_class.column_names.include?('id')
jobs = []
table_name = model_class.quoted_table_name
model_class.each_batch(of: batch_size) do |relation|
start_id, end_id = relation.pluck("MIN(#{table_name}.id)", "MAX(#{table_name}.id)").first
if jobs.length >= BACKGROUND_MIGRATION_JOB_BUFFER_SIZE
# Note: This code path generally only helps with many millions of rows
# We push multiple jobs at a time to reduce the time spent in
# Sidekiq/Redis operations. We're using this buffer based approach so we
# don't need to run additional queries for every range.
bulk_migrate_async(jobs)
jobs.clear
end
jobs << [job_class_name, [start_id, end_id]]
end
bulk_migrate_async(jobs) unless jobs.empty?
end
# Queues background migration jobs for an entire table, batched by ID range.
# Each job is scheduled with a `delay_interval` in between.
# If you use a small interval, then some jobs may run at the same time.
#
# model_class - The table or relation being iterated over
# job_class_name - The background migration job class as a string
# delay_interval - The duration between each job's scheduled time (must respond to `to_f`)
# batch_size - The maximum number of rows per job
# other_arguments - Other arguments to send to the job
#
# *Returns the final migration delay*
#
# Example:
#
# class Route < ActiveRecord::Base
# include EachBatch
# self.table_name = 'routes'
# end
#
# queue_background_migration_jobs_by_range_at_intervals(Route, 'ProcessRoutes', 1.minute)
#
# Where the model_class includes EachBatch, and the background migration exists:
#
# class Gitlab::BackgroundMigration::ProcessRoutes
# def perform(start_id, end_id)
# # do something
# end
# end
def queue_background_migration_jobs_by_range_at_intervals(model_class, job_class_name, delay_interval, batch_size: BACKGROUND_MIGRATION_BATCH_SIZE, other_job_arguments: [], initial_delay: 0)
raise "#{model_class} does not have an ID to use for batch ranges" unless model_class.column_names.include?('id')
# To not overload the worker too much we enforce a minimum interval both
# when scheduling and performing jobs.
if delay_interval < BackgroundMigrationWorker.minimum_interval
delay_interval = BackgroundMigrationWorker.minimum_interval
end
final_delay = 0
model_class.each_batch(of: batch_size) do |relation, index|
start_id, end_id = relation.pluck(Arel.sql('MIN(id), MAX(id)')).first
# `BackgroundMigrationWorker.bulk_perform_in` schedules all jobs for
# the same time, which is not helpful in most cases where we wish to
# spread the work over time.
final_delay = initial_delay + delay_interval * index
migrate_in(final_delay, job_class_name, [start_id, end_id] + other_job_arguments)
end
final_delay
end
# Fetches indexes on a column by name for postgres.
#
# This will include indexes using an expression on the column, for example:
......@@ -1131,30 +1027,6 @@ into similar problems in the future (e.g. when new tables are created).
execute(sql)
end
def migrate_async(*args)
with_migration_context do
BackgroundMigrationWorker.perform_async(*args)
end
end
def migrate_in(*args)
with_migration_context do
BackgroundMigrationWorker.perform_in(*args)
end
end
def bulk_migrate_in(*args)
with_migration_context do
BackgroundMigrationWorker.bulk_perform_in(*args)
end
end
def bulk_migrate_async(*args)
with_migration_context do
BackgroundMigrationWorker.bulk_perform_async(*args)
end
end
# Returns the name for a check constraint
#
# type:
......@@ -1437,10 +1309,6 @@ into similar problems in the future (e.g. when new tables are created).
your migration class
ERROR
end
def with_migration_context(&block)
Gitlab::ApplicationContext.with_context(caller_id: self.class.to_s, &block)
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Database
module Migrations
module BackgroundMigrationHelpers
BACKGROUND_MIGRATION_BATCH_SIZE = 1_000 # Number of rows to process per job
BACKGROUND_MIGRATION_JOB_BUFFER_SIZE = 1_000 # Number of jobs to bulk queue at a time
# Bulk queues background migration jobs for an entire table, batched by ID range.
# "Bulk" meaning many jobs will be pushed at a time for efficiency.
# If you need a delay interval per job, then use `queue_background_migration_jobs_by_range_at_intervals`.
#
# model_class - The table being iterated over
# job_class_name - The background migration job class as a string
# batch_size - The maximum number of rows per job
#
# Example:
#
# class Route < ActiveRecord::Base
# include EachBatch
# self.table_name = 'routes'
# end
#
# bulk_queue_background_migration_jobs_by_range(Route, 'ProcessRoutes')
#
# Where the model_class includes EachBatch, and the background migration exists:
#
# class Gitlab::BackgroundMigration::ProcessRoutes
# def perform(start_id, end_id)
# # do something
# end
# end
def bulk_queue_background_migration_jobs_by_range(model_class, job_class_name, batch_size: BACKGROUND_MIGRATION_BATCH_SIZE)
raise "#{model_class} does not have an ID to use for batch ranges" unless model_class.column_names.include?('id')
jobs = []
table_name = model_class.quoted_table_name
model_class.each_batch(of: batch_size) do |relation|
start_id, end_id = relation.pluck("MIN(#{table_name}.id)", "MAX(#{table_name}.id)").first
if jobs.length >= BACKGROUND_MIGRATION_JOB_BUFFER_SIZE
# Note: This code path generally only helps with many millions of rows
# We push multiple jobs at a time to reduce the time spent in
# Sidekiq/Redis operations. We're using this buffer based approach so we
# don't need to run additional queries for every range.
bulk_migrate_async(jobs)
jobs.clear
end
jobs << [job_class_name, [start_id, end_id]]
end
bulk_migrate_async(jobs) unless jobs.empty?
end
# Queues background migration jobs for an entire table, batched by ID range.
# Each job is scheduled with a `delay_interval` in between.
# If you use a small interval, then some jobs may run at the same time.
#
# model_class - The table or relation being iterated over
# job_class_name - The background migration job class as a string
# delay_interval - The duration between each job's scheduled time (must respond to `to_f`)
# batch_size - The maximum number of rows per job
# other_arguments - Other arguments to send to the job
#
# *Returns the final migration delay*
#
# Example:
#
# class Route < ActiveRecord::Base
# include EachBatch
# self.table_name = 'routes'
# end
#
# queue_background_migration_jobs_by_range_at_intervals(Route, 'ProcessRoutes', 1.minute)
#
# Where the model_class includes EachBatch, and the background migration exists:
#
# class Gitlab::BackgroundMigration::ProcessRoutes
# def perform(start_id, end_id)
# # do something
# end
# end
def queue_background_migration_jobs_by_range_at_intervals(model_class, job_class_name, delay_interval, batch_size: BACKGROUND_MIGRATION_BATCH_SIZE, other_job_arguments: [], initial_delay: 0)
raise "#{model_class} does not have an ID to use for batch ranges" unless model_class.column_names.include?('id')
# To not overload the worker too much we enforce a minimum interval both
# when scheduling and performing jobs.
if delay_interval < BackgroundMigrationWorker.minimum_interval
delay_interval = BackgroundMigrationWorker.minimum_interval
end
final_delay = 0
model_class.each_batch(of: batch_size) do |relation, index|
start_id, end_id = relation.pluck(Arel.sql('MIN(id), MAX(id)')).first
# `BackgroundMigrationWorker.bulk_perform_in` schedules all jobs for
# the same time, which is not helpful in most cases where we wish to
# spread the work over time.
final_delay = initial_delay + delay_interval * index
migrate_in(final_delay, job_class_name, [start_id, end_id] + other_job_arguments)
end
final_delay
end
def perform_background_migration_inline?
Rails.env.test? || Rails.env.development?
end
def migrate_async(*args)
with_migration_context do
BackgroundMigrationWorker.perform_async(*args)
end
end
def migrate_in(*args)
with_migration_context do
BackgroundMigrationWorker.perform_in(*args)
end
end
def bulk_migrate_in(*args)
with_migration_context do
BackgroundMigrationWorker.bulk_perform_in(*args)
end
end
def bulk_migrate_async(*args)
with_migration_context do
BackgroundMigrationWorker.bulk_perform_async(*args)
end
end
def with_migration_context(&block)
Gitlab::ApplicationContext.with_context(caller_id: self.class.to_s, &block)
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Database
module PartitioningMigrationHelpers
# Class that will generically copy data from a given table into its corresponding partitioned table
class BackfillPartitionedTable
include ::Gitlab::Database::DynamicModelHelpers
SUB_BATCH_SIZE = 2_500
PAUSE_SECONDS = 0.25
def perform(start_id, stop_id, source_table, partitioned_table, source_column)
return unless Feature.enabled?(:backfill_partitioned_audit_events, default_enabled: true)
if transaction_open?
raise "Aborting job to backfill partitioned #{source_table} table! Do not run this job in a transaction block!"
end
unless table_exists?(partitioned_table)
logger.warn "exiting backfill migration because partitioned table #{partitioned_table} does not exist. " \
"This could be due to the migration being rolled back after migration jobs were enqueued in sidekiq"
return
end
bulk_copy = BulkCopy.new(source_table, partitioned_table, source_column)
parent_batch_relation = relation_scoped_to_range(source_table, source_column, start_id, stop_id)
parent_batch_relation.each_batch(of: SUB_BATCH_SIZE) do |sub_batch|
start_id, stop_id = sub_batch.pluck(Arel.sql("MIN(#{source_column}), MAX(#{source_column})")).first
bulk_copy.copy_between(start_id, stop_id)
sleep(PAUSE_SECONDS)
end
end
private
def connection
ActiveRecord::Base.connection
end
def transaction_open?
connection.transaction_open?
end
def table_exists?(table)
connection.table_exists?(table)
end
def logger
@logger ||= ::Gitlab::BackgroundMigration::Logger.build
end
def relation_scoped_to_range(source_table, source_key_column, start_id, stop_id)
define_batchable_model(source_table).where(source_key_column => start_id..stop_id)
end
# Helper class to copy data between two tables via upserts
class BulkCopy
DELIMITER = ', '
attr_reader :source_table, :destination_table, :source_column
def initialize(source_table, destination_table, source_column)
@source_table = source_table
@destination_table = destination_table
@source_column = source_column
end
def copy_between(start_id, stop_id)
connection.execute(<<~SQL)
INSERT INTO #{destination_table} (#{column_listing})
SELECT #{column_listing}
FROM #{source_table}
WHERE #{source_column} BETWEEN #{start_id} AND #{stop_id}
FOR UPDATE
ON CONFLICT (#{conflict_targets}) DO NOTHING
SQL
end
private
def connection
@connection ||= ActiveRecord::Base.connection
end
def column_listing
@column_listing ||= connection.columns(source_table).map(&:name).join(DELIMITER)
end
def conflict_targets
connection.primary_key(destination_table).join(DELIMITER)
end
end
end
end
end
end
......@@ -99,7 +99,7 @@ module Gitlab
drop_function(fn_name, if_exists: true)
else
create_or_replace_fk_function(fn_name, final_keys)
create_trigger(trigger_name, fn_name, fires: "AFTER DELETE ON #{to_table}")
create_trigger(to_table, trigger_name, fn_name, fires: 'AFTER DELETE')
end
end
end
......
......@@ -5,10 +5,16 @@ module Gitlab
module PartitioningMigrationHelpers
module TableManagementHelpers
include ::Gitlab::Database::SchemaHelpers
include ::Gitlab::Database::DynamicModelHelpers
include ::Gitlab::Database::Migrations::BackgroundMigrationHelpers
WHITELISTED_TABLES = %w[audit_events].freeze
ALLOWED_TABLES = %w[audit_events].freeze
ERROR_SCOPE = 'table partitioning'
MIGRATION_CLASS_NAME = "::#{module_parent_name}::BackfillPartitionedTable"
BATCH_INTERVAL = 2.minutes.freeze
BATCH_SIZE = 50_000
# Creates a partitioned copy of an existing table, using a RANGE partitioning strategy on a timestamp column.
# One partition is created per month between the given `min_date` and `max_date`.
#
......@@ -23,7 +29,7 @@ module Gitlab
# :max_date - a date specifying the upper bounds of the partitioning range
#
def partition_table_by_date(table_name, column_name, min_date:, max_date:)
assert_table_is_whitelisted(table_name)
assert_table_is_allowed(table_name)
assert_not_in_transaction_block(scope: ERROR_SCOPE)
raise "max_date #{max_date} must be greater than min_date #{min_date}" if min_date >= max_date
......@@ -35,9 +41,11 @@ module Gitlab
raise "partition column #{column_name} does not exist on #{table_name}" if partition_column.nil?
new_table_name = partitioned_table_name(table_name)
create_range_partitioned_copy(new_table_name, table_name, partition_column, primary_key)
create_daterange_partitions(new_table_name, partition_column.name, min_date, max_date)
create_sync_trigger(table_name, new_table_name, primary_key)
create_trigger_to_sync_tables(table_name, new_table_name, primary_key)
enqueue_background_migration(table_name, new_table_name, primary_key)
end
# Clean up a partitioned copy of an existing table. This deletes the partitioned table and all partitions.
......@@ -47,7 +55,7 @@ module Gitlab
# drop_partitioned_table_for :audit_events
#
def drop_partitioned_table_for(table_name)
assert_table_is_whitelisted(table_name)
assert_table_is_allowed(table_name)
assert_not_in_transaction_block(scope: ERROR_SCOPE)
with_lock_retries do
......@@ -64,10 +72,10 @@ module Gitlab
private
def assert_table_is_whitelisted(table_name)
return if WHITELISTED_TABLES.include?(table_name.to_s)
def assert_table_is_allowed(table_name)
return if ALLOWED_TABLES.include?(table_name.to_s)
raise "partitioning helpers are in active development, and #{table_name} is not whitelisted for use, " \
raise "partitioning helpers are in active development, and #{table_name} is not allowed for use, " \
"for more information please contact the database team"
end
......@@ -125,7 +133,8 @@ module Gitlab
min_date = min_date.beginning_of_month.to_date
max_date = max_date.next_month.beginning_of_month.to_date
create_range_partition_safely("#{table_name}_000000", table_name, 'MINVALUE', to_sql_date_literal(min_date))
upper_bound = to_sql_date_literal(min_date)
create_range_partition_safely("#{table_name}_000000", table_name, 'MINVALUE', upper_bound)
while min_date < max_date
partition_name = "#{table_name}_#{min_date.strftime('%Y%m')}"
......@@ -154,7 +163,7 @@ module Gitlab
create_range_partition(partition_name, table_name, lower_bound, upper_bound)
end
def create_sync_trigger(source_table, target_table, unique_key)
def create_trigger_to_sync_tables(source_table, target_table, unique_key)
function_name = sync_function_name(source_table)
trigger_name = sync_trigger_name(source_table)
......@@ -162,11 +171,19 @@ module Gitlab
create_sync_function(function_name, target_table, unique_key)
create_comment('FUNCTION', function_name, "Partitioning migration: table sync for #{source_table} table")
create_trigger(trigger_name, function_name, fires: "AFTER INSERT OR UPDATE OR DELETE ON #{source_table}")
create_sync_trigger(source_table, trigger_name, function_name)
end
end
def create_sync_function(name, target_table, unique_key)
if function_exists?(name)
# rubocop:disable Gitlab/RailsLogger
Rails.logger.warn "Partitioning sync function not created because it already exists" \
" (this may be due to an aborted migration or similar): function name: #{name}"
# rubocop:enable Gitlab/RailsLogger
return
end
delimiter = ",\n "
column_names = connection.columns(target_table).map(&:name)
set_statements = build_set_statements(column_names, unique_key)
......@@ -190,7 +207,30 @@ module Gitlab
end
def build_set_statements(column_names, unique_key)
column_names.reject { |name| name == unique_key }.map { |column_name| "#{column_name} = NEW.#{column_name}" }
column_names.reject { |name| name == unique_key }.map { |name| "#{name} = NEW.#{name}" }
end
def create_sync_trigger(table_name, trigger_name, function_name)
if trigger_exists?(table_name, trigger_name)
# rubocop:disable Gitlab/RailsLogger
Rails.logger.warn "Partitioning sync trigger not created because it already exists" \
" (this may be due to an aborted migration or similar): trigger name: #{trigger_name}"
# rubocop:enable Gitlab/RailsLogger
return
end
create_trigger(table_name, trigger_name, function_name, fires: 'AFTER INSERT OR UPDATE OR DELETE')
end
def enqueue_background_migration(source_table, partitioned_table, source_key)
model_class = define_batchable_model(source_table)
queue_background_migration_jobs_by_range_at_intervals(
model_class,
MIGRATION_CLASS_NAME,
BATCH_INTERVAL,
batch_size: BATCH_SIZE,
other_job_arguments: [source_table.to_s, partitioned_table, source_key])
end
end
end
......
......@@ -16,15 +16,30 @@ module Gitlab
SQL
end
def create_trigger(name, function_name, fires: nil)
def function_exists?(name)
connection.select_value("SELECT 1 FROM pg_proc WHERE proname = '#{name}'")
end
def create_trigger(table_name, name, function_name, fires:)
execute(<<~SQL)
CREATE TRIGGER #{name}
#{fires}
#{fires} ON #{table_name}
FOR EACH ROW
EXECUTE PROCEDURE #{function_name}()
SQL
end
def trigger_exists?(table_name, name)
connection.select_value(<<~SQL)
SELECT 1
FROM pg_trigger
INNER JOIN pg_class
ON pg_trigger.tgrelid = pg_class.oid
WHERE pg_class.relname = '#{table_name}'
AND pg_trigger.tgname = '#{name}'
SQL
end
def drop_function(name, if_exists: true)
exists_clause = optional_clause(if_exists, "IF EXISTS")
execute("DROP FUNCTION #{exists_clause} #{name}()")
......
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::Database::DynamicModelHelpers do
describe '#define_batchable_model' do
subject { including_class.new.define_batchable_model(table_name) }
let(:including_class) { Class.new.include(described_class) }
let(:table_name) { 'projects' }
it 'is an ActiveRecord model' do
expect(subject.ancestors).to include(ActiveRecord::Base)
end
it 'includes EachBatch' do
expect(subject.included_modules).to include(EachBatch)
end
it 'has the correct table name' do
expect(subject.table_name).to eq(table_name)
end
it 'has the inheritance type column disable' do
expect(subject.inheritance_column).to eq('_type_disabled')
end
end
end
......@@ -1215,166 +1215,6 @@ describe Gitlab::Database::MigrationHelpers do
end
end
describe '#bulk_queue_background_migration_jobs_by_range' do
context 'when the model has an ID column' do
let!(:id1) { create(:user).id }
let!(:id2) { create(:user).id }
let!(:id3) { create(:user).id }
before do
User.class_eval do
include EachBatch
end
end
context 'with enough rows to bulk queue jobs more than once' do
before do
stub_const('Gitlab::Database::MigrationHelpers::BACKGROUND_MIGRATION_JOB_BUFFER_SIZE', 1)
end
it 'queues jobs correctly' do
Sidekiq::Testing.fake! do
model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2)
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id2]])
expect(BackgroundMigrationWorker.jobs[1]['args']).to eq(['FooJob', [id3, id3]])
end
end
it 'queues jobs in groups of buffer size 1' do
expect(BackgroundMigrationWorker).to receive(:bulk_perform_async).with([['FooJob', [id1, id2]]])
expect(BackgroundMigrationWorker).to receive(:bulk_perform_async).with([['FooJob', [id3, id3]]])
model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2)
end
end
context 'with not enough rows to bulk queue jobs more than once' do
it 'queues jobs correctly' do
Sidekiq::Testing.fake! do
model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2)
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id2]])
expect(BackgroundMigrationWorker.jobs[1]['args']).to eq(['FooJob', [id3, id3]])
end
end
it 'queues jobs in bulk all at once (big buffer size)' do
expect(BackgroundMigrationWorker).to receive(:bulk_perform_async).with([['FooJob', [id1, id2]],
['FooJob', [id3, id3]]])
model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2)
end
end
context 'without specifying batch_size' do
it 'queues jobs correctly' do
Sidekiq::Testing.fake! do
model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob')
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id3]])
end
end
end
end
context "when the model doesn't have an ID column" do
it 'raises error (for now)' do
expect do
model.bulk_queue_background_migration_jobs_by_range(ProjectAuthorization, 'FooJob')
end.to raise_error(StandardError, /does not have an ID/)
end
end
end
describe '#queue_background_migration_jobs_by_range_at_intervals' do
context 'when the model has an ID column' do
let!(:id1) { create(:user).id }
let!(:id2) { create(:user).id }
let!(:id3) { create(:user).id }
around do |example|
Timecop.freeze { example.run }
end
before do
User.class_eval do
include EachBatch
end
end
it 'returns the final expected delay' do
Sidekiq::Testing.fake! do
final_delay = model.queue_background_migration_jobs_by_range_at_intervals(User, 'FooJob', 10.minutes, batch_size: 2)
expect(final_delay.to_f).to eq(20.minutes.to_f)
end
end
it 'returns zero when nothing gets queued' do
Sidekiq::Testing.fake! do
final_delay = model.queue_background_migration_jobs_by_range_at_intervals(User.none, 'FooJob', 10.minutes)
expect(final_delay).to eq(0)
end
end
context 'with batch_size option' do
it 'queues jobs correctly' do
Sidekiq::Testing.fake! do
model.queue_background_migration_jobs_by_range_at_intervals(User, 'FooJob', 10.minutes, batch_size: 2)
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id2]])
expect(BackgroundMigrationWorker.jobs[0]['at']).to eq(10.minutes.from_now.to_f)
expect(BackgroundMigrationWorker.jobs[1]['args']).to eq(['FooJob', [id3, id3]])
expect(BackgroundMigrationWorker.jobs[1]['at']).to eq(20.minutes.from_now.to_f)
end
end
end
context 'without batch_size option' do
it 'queues jobs correctly' do
Sidekiq::Testing.fake! do
model.queue_background_migration_jobs_by_range_at_intervals(User, 'FooJob', 10.minutes)
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id3]])
expect(BackgroundMigrationWorker.jobs[0]['at']).to eq(10.minutes.from_now.to_f)
end
end
end
context 'with other_job_arguments option' do
it 'queues jobs correctly' do
Sidekiq::Testing.fake! do
model.queue_background_migration_jobs_by_range_at_intervals(User, 'FooJob', 10.minutes, other_job_arguments: [1, 2])
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id3, 1, 2]])
expect(BackgroundMigrationWorker.jobs[0]['at']).to eq(10.minutes.from_now.to_f)
end
end
end
context 'with initial_delay option' do
it 'queues jobs correctly' do
Sidekiq::Testing.fake! do
model.queue_background_migration_jobs_by_range_at_intervals(User, 'FooJob', 10.minutes, other_job_arguments: [1, 2], initial_delay: 10.minutes)
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id3, 1, 2]])
expect(BackgroundMigrationWorker.jobs[0]['at']).to eq(20.minutes.from_now.to_f)
end
end
end
end
context "when the model doesn't have an ID column" do
it 'raises error (for now)' do
expect do
model.queue_background_migration_jobs_by_range_at_intervals(ProjectAuthorization, 'FooJob', 10.seconds)
end.to raise_error(StandardError, /does not have an ID/)
end
end
end
describe '#change_column_type_using_background_migration' do
let!(:issue) { create(:issue, :closed, closed_at: Time.zone.now) }
......@@ -1485,26 +1325,6 @@ describe Gitlab::Database::MigrationHelpers do
end
end
describe '#perform_background_migration_inline?' do
it 'returns true in a test environment' do
stub_rails_env('test')
expect(model.perform_background_migration_inline?).to eq(true)
end
it 'returns true in a development environment' do
stub_rails_env('development')
expect(model.perform_background_migration_inline?).to eq(true)
end
it 'returns false in a production environment' do
stub_rails_env('production')
expect(model.perform_background_migration_inline?).to eq(false)
end
end
describe '#index_exists_by_name?' do
it 'returns true if an index exists' do
ActiveRecord::Base.connection.execute(
......@@ -1973,62 +1793,6 @@ describe Gitlab::Database::MigrationHelpers do
end
end
describe '#migrate_async' do
it 'calls BackgroundMigrationWorker.perform_async' do
expect(BackgroundMigrationWorker).to receive(:perform_async).with("Class", "hello", "world")
model.migrate_async("Class", "hello", "world")
end
it 'pushes a context with the current class name as caller_id' do
expect(Gitlab::ApplicationContext).to receive(:with_context).with(caller_id: model.class.to_s)
model.migrate_async('Class', 'hello', 'world')
end
end
describe '#migrate_in' do
it 'calls BackgroundMigrationWorker.perform_in' do
expect(BackgroundMigrationWorker).to receive(:perform_in).with(10.minutes, 'Class', 'Hello', 'World')
model.migrate_in(10.minutes, 'Class', 'Hello', 'World')
end
it 'pushes a context with the current class name as caller_id' do
expect(Gitlab::ApplicationContext).to receive(:with_context).with(caller_id: model.class.to_s)
model.migrate_in(10.minutes, 'Class', 'Hello', 'World')
end
end
describe '#bulk_migrate_async' do
it 'calls BackgroundMigrationWorker.bulk_perform_async' do
expect(BackgroundMigrationWorker).to receive(:bulk_perform_async).with([%w(Class hello world)])
model.bulk_migrate_async([%w(Class hello world)])
end
it 'pushes a context with the current class name as caller_id' do
expect(Gitlab::ApplicationContext).to receive(:with_context).with(caller_id: model.class.to_s)
model.bulk_migrate_async([%w(Class hello world)])
end
end
describe '#bulk_migrate_in' do
it 'calls BackgroundMigrationWorker.bulk_perform_in_' do
expect(BackgroundMigrationWorker).to receive(:bulk_perform_in).with(10.minutes, [%w(Class hello world)])
model.bulk_migrate_in(10.minutes, [%w(Class hello world)])
end
it 'pushes a context with the current class name as caller_id' do
expect(Gitlab::ApplicationContext).to receive(:with_context).with(caller_id: model.class.to_s)
model.bulk_migrate_in(10.minutes, [%w(Class hello world)])
end
end
describe '#check_constraint_name' do
it 'returns a valid constraint name' do
name = model.check_constraint_name(:this_is_a_very_long_table_name,
......
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::Database::Migrations::BackgroundMigrationHelpers do
let(:model) do
ActiveRecord::Migration.new.extend(described_class)
end
describe '#bulk_queue_background_migration_jobs_by_range' do
context 'when the model has an ID column' do
let!(:id1) { create(:user).id }
let!(:id2) { create(:user).id }
let!(:id3) { create(:user).id }
before do
User.class_eval do
include EachBatch
end
end
context 'with enough rows to bulk queue jobs more than once' do
before do
stub_const('Gitlab::Database::Migrations::BackgroundMigrationHelpers::BACKGROUND_MIGRATION_JOB_BUFFER_SIZE', 1)
end
it 'queues jobs correctly' do
Sidekiq::Testing.fake! do
model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2)
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id2]])
expect(BackgroundMigrationWorker.jobs[1]['args']).to eq(['FooJob', [id3, id3]])
end
end
it 'queues jobs in groups of buffer size 1' do
expect(BackgroundMigrationWorker).to receive(:bulk_perform_async).with([['FooJob', [id1, id2]]])
expect(BackgroundMigrationWorker).to receive(:bulk_perform_async).with([['FooJob', [id3, id3]]])
model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2)
end
end
context 'with not enough rows to bulk queue jobs more than once' do
it 'queues jobs correctly' do
Sidekiq::Testing.fake! do
model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2)
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id2]])
expect(BackgroundMigrationWorker.jobs[1]['args']).to eq(['FooJob', [id3, id3]])
end
end
it 'queues jobs in bulk all at once (big buffer size)' do
expect(BackgroundMigrationWorker).to receive(:bulk_perform_async).with([['FooJob', [id1, id2]],
['FooJob', [id3, id3]]])
model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2)
end
end
context 'without specifying batch_size' do
it 'queues jobs correctly' do
Sidekiq::Testing.fake! do
model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob')
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id3]])
end
end
end
end
context "when the model doesn't have an ID column" do
it 'raises error (for now)' do
expect do
model.bulk_queue_background_migration_jobs_by_range(ProjectAuthorization, 'FooJob')
end.to raise_error(StandardError, /does not have an ID/)
end
end
end
describe '#queue_background_migration_jobs_by_range_at_intervals' do
context 'when the model has an ID column' do
let!(:id1) { create(:user).id }
let!(:id2) { create(:user).id }
let!(:id3) { create(:user).id }
around do |example|
Timecop.freeze { example.run }
end
before do
User.class_eval do
include EachBatch
end
end
it 'returns the final expected delay' do
Sidekiq::Testing.fake! do
final_delay = model.queue_background_migration_jobs_by_range_at_intervals(User, 'FooJob', 10.minutes, batch_size: 2)
expect(final_delay.to_f).to eq(20.minutes.to_f)
end
end
it 'returns zero when nothing gets queued' do
Sidekiq::Testing.fake! do
final_delay = model.queue_background_migration_jobs_by_range_at_intervals(User.none, 'FooJob', 10.minutes)
expect(final_delay).to eq(0)
end
end
context 'with batch_size option' do
it 'queues jobs correctly' do
Sidekiq::Testing.fake! do
model.queue_background_migration_jobs_by_range_at_intervals(User, 'FooJob', 10.minutes, batch_size: 2)
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id2]])
expect(BackgroundMigrationWorker.jobs[0]['at']).to eq(10.minutes.from_now.to_f)
expect(BackgroundMigrationWorker.jobs[1]['args']).to eq(['FooJob', [id3, id3]])
expect(BackgroundMigrationWorker.jobs[1]['at']).to eq(20.minutes.from_now.to_f)
end
end
end
context 'without batch_size option' do
it 'queues jobs correctly' do
Sidekiq::Testing.fake! do
model.queue_background_migration_jobs_by_range_at_intervals(User, 'FooJob', 10.minutes)
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id3]])
expect(BackgroundMigrationWorker.jobs[0]['at']).to eq(10.minutes.from_now.to_f)
end
end
end
context 'with other_job_arguments option' do
it 'queues jobs correctly' do
Sidekiq::Testing.fake! do
model.queue_background_migration_jobs_by_range_at_intervals(User, 'FooJob', 10.minutes, other_job_arguments: [1, 2])
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id3, 1, 2]])
expect(BackgroundMigrationWorker.jobs[0]['at']).to eq(10.minutes.from_now.to_f)
end
end
end
context 'with initial_delay option' do
it 'queues jobs correctly' do
Sidekiq::Testing.fake! do
model.queue_background_migration_jobs_by_range_at_intervals(User, 'FooJob', 10.minutes, other_job_arguments: [1, 2], initial_delay: 10.minutes)
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq(['FooJob', [id1, id3, 1, 2]])
expect(BackgroundMigrationWorker.jobs[0]['at']).to eq(20.minutes.from_now.to_f)
end
end
end
end
context "when the model doesn't have an ID column" do
it 'raises error (for now)' do
expect do
model.queue_background_migration_jobs_by_range_at_intervals(ProjectAuthorization, 'FooJob', 10.seconds)
end.to raise_error(StandardError, /does not have an ID/)
end
end
end
describe '#perform_background_migration_inline?' do
it 'returns true in a test environment' do
stub_rails_env('test')
expect(model.perform_background_migration_inline?).to eq(true)
end
it 'returns true in a development environment' do
stub_rails_env('development')
expect(model.perform_background_migration_inline?).to eq(true)
end
it 'returns false in a production environment' do
stub_rails_env('production')
expect(model.perform_background_migration_inline?).to eq(false)
end
end
describe '#migrate_async' do
it 'calls BackgroundMigrationWorker.perform_async' do
expect(BackgroundMigrationWorker).to receive(:perform_async).with("Class", "hello", "world")
model.migrate_async("Class", "hello", "world")
end
it 'pushes a context with the current class name as caller_id' do
expect(Gitlab::ApplicationContext).to receive(:with_context).with(caller_id: model.class.to_s)
model.migrate_async('Class', 'hello', 'world')
end
end
describe '#migrate_in' do
it 'calls BackgroundMigrationWorker.perform_in' do
expect(BackgroundMigrationWorker).to receive(:perform_in).with(10.minutes, 'Class', 'Hello', 'World')
model.migrate_in(10.minutes, 'Class', 'Hello', 'World')
end
it 'pushes a context with the current class name as caller_id' do
expect(Gitlab::ApplicationContext).to receive(:with_context).with(caller_id: model.class.to_s)
model.migrate_in(10.minutes, 'Class', 'Hello', 'World')
end
end
describe '#bulk_migrate_async' do
it 'calls BackgroundMigrationWorker.bulk_perform_async' do
expect(BackgroundMigrationWorker).to receive(:bulk_perform_async).with([%w(Class hello world)])
model.bulk_migrate_async([%w(Class hello world)])
end
it 'pushes a context with the current class name as caller_id' do
expect(Gitlab::ApplicationContext).to receive(:with_context).with(caller_id: model.class.to_s)
model.bulk_migrate_async([%w(Class hello world)])
end
end
describe '#bulk_migrate_in' do
it 'calls BackgroundMigrationWorker.bulk_perform_in_' do
expect(BackgroundMigrationWorker).to receive(:bulk_perform_in).with(10.minutes, [%w(Class hello world)])
model.bulk_migrate_in(10.minutes, [%w(Class hello world)])
end
it 'pushes a context with the current class name as caller_id' do
expect(Gitlab::ApplicationContext).to receive(:with_context).with(caller_id: model.class.to_s)
model.bulk_migrate_in(10.minutes, [%w(Class hello world)])
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartitionedTable, '#perform' do
subject { described_class.new }
let(:source_table) { '_test_partitioning_backfills' }
let(:destination_table) { "#{source_table}_part" }
let(:unique_key) { 'id' }
before do
allow(subject).to receive(:transaction_open?).and_return(false)
end
context 'when the destination table exists' do
before do
connection.execute(<<~SQL)
CREATE TABLE #{source_table} (
id serial NOT NULL PRIMARY KEY,
col1 int NOT NULL,
col2 text NOT NULL,
created_at timestamptz NOT NULL
)
SQL
connection.execute(<<~SQL)
CREATE TABLE #{destination_table} (
id serial NOT NULL,
col1 int NOT NULL,
col2 text NOT NULL,
created_at timestamptz NOT NULL,
PRIMARY KEY (id, created_at)
) PARTITION BY RANGE (created_at)
SQL
connection.execute(<<~SQL)
CREATE TABLE #{destination_table}_202001 PARTITION OF #{destination_table}
FOR VALUES FROM ('2020-01-01') TO ('2020-02-01')
SQL
connection.execute(<<~SQL)
CREATE TABLE #{destination_table}_202002 PARTITION OF #{destination_table}
FOR VALUES FROM ('2020-02-01') TO ('2020-03-01')
SQL
source_model.table_name = source_table
destination_model.table_name = destination_table
stub_const("#{described_class}::SUB_BATCH_SIZE", 2)
stub_const("#{described_class}::PAUSE_SECONDS", pause_seconds)
allow(subject).to receive(:sleep)
end
let(:connection) { ActiveRecord::Base.connection }
let(:source_model) { Class.new(ActiveRecord::Base) }
let(:destination_model) { Class.new(ActiveRecord::Base) }
let(:timestamp) { Time.utc(2020, 1, 2).round }
let(:pause_seconds) { 1 }
let!(:source1) { create_source_record(timestamp) }
let!(:source2) { create_source_record(timestamp + 1.day) }
let!(:source3) { create_source_record(timestamp + 1.month) }
it 'copies data into the destination table idempotently' do
expect(destination_model.count).to eq(0)
subject.perform(source1.id, source3.id, source_table, destination_table, unique_key)
expect(destination_model.count).to eq(3)
source_model.find_each do |source_record|
destination_record = destination_model.find_by_id(source_record.id)
expect(destination_record.attributes).to eq(source_record.attributes)
end
subject.perform(source1.id, source3.id, source_table, destination_table, unique_key)
expect(destination_model.count).to eq(3)
end
it 'breaks the assigned batch into smaller batches' do
expect_next_instance_of(described_class::BulkCopy) do |bulk_copy|
expect(bulk_copy).to receive(:copy_between).with(source1.id, source2.id)
expect(bulk_copy).to receive(:copy_between).with(source3.id, source3.id)
end
subject.perform(source1.id, source3.id, source_table, destination_table, unique_key)
end
it 'pauses after copying each sub-batch' do
expect(subject).to receive(:sleep).with(pause_seconds).twice
subject.perform(source1.id, source3.id, source_table, destination_table, unique_key)
end
context 'when the feature flag is disabled' do
let(:mock_connection) { double('connection') }
before do
allow(subject).to receive(:connection).and_return(mock_connection)
stub_feature_flags(backfill_partitioned_audit_events: false)
end
it 'exits without attempting to copy data' do
expect(mock_connection).not_to receive(:execute)
subject.perform(1, 100, source_table, destination_table, unique_key)
expect(destination_model.count).to eq(0)
end
end
context 'when the job is run within an explicit transaction block' do
let(:mock_connection) { double('connection') }
before do
allow(subject).to receive(:connection).and_return(mock_connection)
allow(subject).to receive(:transaction_open?).and_return(true)
end
it 'raises an error before copying data' do
expect(mock_connection).not_to receive(:execute)
expect do
subject.perform(1, 100, source_table, destination_table, unique_key)
end.to raise_error(/Aborting job to backfill partitioned #{source_table}/)
expect(destination_model.count).to eq(0)
end
end
end
context 'when the destination table does not exist' do
let(:mock_connection) { double('connection') }
let(:mock_logger) { double('logger') }
before do
allow(subject).to receive(:connection).and_return(mock_connection)
allow(subject).to receive(:logger).and_return(mock_logger)
expect(mock_connection).to receive(:table_exists?).with(destination_table).and_return(false)
allow(mock_logger).to receive(:warn)
end
it 'exits without attempting to copy data' do
expect(mock_connection).not_to receive(:execute)
subject.perform(1, 100, source_table, destination_table, unique_key)
end
it 'logs a warning message that the job was skipped' do
expect(mock_logger).to receive(:warn).with(/#{destination_table} does not exist/)
subject.perform(1, 100, source_table, destination_table, unique_key)
end
end
def create_source_record(timestamp)
source_model.create!(col1: 123, col2: 'original value', created_at: timestamp)
end
end
......@@ -25,7 +25,7 @@ describe Gitlab::Database::PartitioningMigrationHelpers::TableManagementHelpers
allow(migration).to receive(:partitioned_table_name).and_return(partitioned_table)
allow(migration).to receive(:sync_function_name).and_return(function_name)
allow(migration).to receive(:sync_trigger_name).and_return(trigger_name)
allow(migration).to receive(:assert_table_is_whitelisted)
allow(migration).to receive(:assert_table_is_allowed)
end
describe '#partition_table_by_date' do
......@@ -33,15 +33,19 @@ describe Gitlab::Database::PartitioningMigrationHelpers::TableManagementHelpers
let(:old_primary_key) { 'id' }
let(:new_primary_key) { [old_primary_key, partition_column] }
context 'when the table is not whitelisted' do
let(:template_table) { :this_table_is_not_whitelisted }
before do
allow(migration).to receive(:queue_background_migration_jobs_by_range_at_intervals)
end
context 'when the table is not allowed' do
let(:template_table) { :this_table_is_not_allowed }
it 'raises an error' do
expect(migration).to receive(:assert_table_is_whitelisted).with(template_table).and_call_original
expect(migration).to receive(:assert_table_is_allowed).with(template_table).and_call_original
expect do
migration.partition_table_by_date template_table, partition_column, min_date: min_date, max_date: max_date
end.to raise_error(/#{template_table} is not whitelisted for use/)
end.to raise_error(/#{template_table} is not allowed for use/)
end
end
......@@ -237,6 +241,36 @@ describe Gitlab::Database::PartitioningMigrationHelpers::TableManagementHelpers
expect(model.find(second_todo.id).attributes).to eq(second_todo.attributes)
end
end
describe 'copying historic data to the partitioned table' do
let(:template_table) { 'todos' }
let(:migration_class) { '::Gitlab::Database::PartitioningMigrationHelpers::BackfillPartitionedTable' }
let(:sub_batch_size) { described_class::SUB_BATCH_SIZE }
let(:pause_seconds) { described_class::PAUSE_SECONDS }
let!(:first_id) { create(:todo).id }
let!(:second_id) { create(:todo).id }
let!(:third_id) { create(:todo).id }
before do
stub_const("#{described_class.name}::BATCH_SIZE", 2)
expect(migration).to receive(:queue_background_migration_jobs_by_range_at_intervals).and_call_original
end
it 'enqueues jobs to copy each batch of data' do
Sidekiq::Testing.fake! do
migration.partition_table_by_date template_table, partition_column, min_date: min_date, max_date: max_date
expect(BackgroundMigrationWorker.jobs.size).to eq(2)
first_job_arguments = [first_id, second_id, template_table, partitioned_table, 'id']
expect(BackgroundMigrationWorker.jobs[0]['args']).to eq([migration_class, first_job_arguments])
second_job_arguments = [third_id, third_id, template_table, partitioned_table, 'id']
expect(BackgroundMigrationWorker.jobs[1]['args']).to eq([migration_class, second_job_arguments])
end
end
end
end
describe '#drop_partitioned_table_for' do
......@@ -244,15 +278,15 @@ describe Gitlab::Database::PartitioningMigrationHelpers::TableManagementHelpers
%w[000000 201912 202001 202002].map { |suffix| "#{partitioned_table}_#{suffix}" }.unshift(partitioned_table)
end
context 'when the table is not whitelisted' do
let(:template_table) { :this_table_is_not_whitelisted }
context 'when the table is not allowed' do
let(:template_table) { :this_table_is_not_allowed }
it 'raises an error' do
expect(migration).to receive(:assert_table_is_whitelisted).with(template_table).and_call_original
expect(migration).to receive(:assert_table_is_allowed).with(template_table).and_call_original
expect do
migration.drop_partitioned_table_for template_table
end.to raise_error(/#{template_table} is not whitelisted for use/)
end.to raise_error(/#{template_table} is not allowed for use/)
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment