Commit 7fecba3b authored by Patrick Bair's avatar Patrick Bair Committed by Mayra Cabrera

Track execution of BackgroundMigration jobs

Add a new table to track execution of BackgroundMigration jobs. This
will give better insight into the execution of these jobs, as well as
make it easier for cleanup migrations to determine what data may have
been missed.
parent 1834d622
---
title: Add background_migration_jobs table to trace background migrations
merge_request: 35913
author:
type: added
# frozen_string_literal: true
class CreateBackgroundMigrationJobs < ActiveRecord::Migration[6.0]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
disable_ddl_transaction!
def up
unless table_exists?(:background_migration_jobs)
create_table :background_migration_jobs do |t|
t.timestamps_with_timezone
t.integer :status, null: false, limit: 2, default: 0
t.text :class_name, null: false
t.jsonb :arguments, null: false
t.index [:class_name, :arguments]
t.index [:class_name, :status, :id]
end
end
add_text_limit :background_migration_jobs, :class_name, 200
end
def down
drop_table :background_migration_jobs
end
end
...@@ -9413,6 +9413,25 @@ CREATE TABLE public.aws_roles ( ...@@ -9413,6 +9413,25 @@ CREATE TABLE public.aws_roles (
role_external_id character varying(64) NOT NULL role_external_id character varying(64) NOT NULL
); );
CREATE TABLE public.background_migration_jobs (
id bigint NOT NULL,
created_at timestamp with time zone NOT NULL,
updated_at timestamp with time zone NOT NULL,
status smallint DEFAULT 0 NOT NULL,
class_name text NOT NULL,
arguments jsonb NOT NULL,
CONSTRAINT check_b0de0a5852 CHECK ((char_length(class_name) <= 200))
);
CREATE SEQUENCE public.background_migration_jobs_id_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
ALTER SEQUENCE public.background_migration_jobs_id_seq OWNED BY public.background_migration_jobs.id;
CREATE TABLE public.backup_labels ( CREATE TABLE public.backup_labels (
id integer NOT NULL, id integer NOT NULL,
title character varying, title character varying,
...@@ -16370,6 +16389,8 @@ ALTER TABLE ONLY public.audit_events ALTER COLUMN id SET DEFAULT nextval('public ...@@ -16370,6 +16389,8 @@ ALTER TABLE ONLY public.audit_events ALTER COLUMN id SET DEFAULT nextval('public
ALTER TABLE ONLY public.award_emoji ALTER COLUMN id SET DEFAULT nextval('public.award_emoji_id_seq'::regclass); ALTER TABLE ONLY public.award_emoji ALTER COLUMN id SET DEFAULT nextval('public.award_emoji_id_seq'::regclass);
ALTER TABLE ONLY public.background_migration_jobs ALTER COLUMN id SET DEFAULT nextval('public.background_migration_jobs_id_seq'::regclass);
ALTER TABLE ONLY public.badges ALTER COLUMN id SET DEFAULT nextval('public.badges_id_seq'::regclass); ALTER TABLE ONLY public.badges ALTER COLUMN id SET DEFAULT nextval('public.badges_id_seq'::regclass);
ALTER TABLE ONLY public.board_assignees ALTER COLUMN id SET DEFAULT nextval('public.board_assignees_id_seq'::regclass); ALTER TABLE ONLY public.board_assignees ALTER COLUMN id SET DEFAULT nextval('public.board_assignees_id_seq'::regclass);
...@@ -17255,6 +17276,9 @@ ALTER TABLE ONLY public.award_emoji ...@@ -17255,6 +17276,9 @@ ALTER TABLE ONLY public.award_emoji
ALTER TABLE ONLY public.aws_roles ALTER TABLE ONLY public.aws_roles
ADD CONSTRAINT aws_roles_pkey PRIMARY KEY (user_id); ADD CONSTRAINT aws_roles_pkey PRIMARY KEY (user_id);
ALTER TABLE ONLY public.background_migration_jobs
ADD CONSTRAINT background_migration_jobs_pkey PRIMARY KEY (id);
ALTER TABLE ONLY public.backup_labels ALTER TABLE ONLY public.backup_labels
ADD CONSTRAINT backup_labels_pkey PRIMARY KEY (id); ADD CONSTRAINT backup_labels_pkey PRIMARY KEY (id);
...@@ -18620,6 +18644,10 @@ CREATE UNIQUE INDEX index_aws_roles_on_role_external_id ON public.aws_roles USIN ...@@ -18620,6 +18644,10 @@ CREATE UNIQUE INDEX index_aws_roles_on_role_external_id ON public.aws_roles USIN
CREATE UNIQUE INDEX index_aws_roles_on_user_id ON public.aws_roles USING btree (user_id); CREATE UNIQUE INDEX index_aws_roles_on_user_id ON public.aws_roles USING btree (user_id);
CREATE INDEX index_background_migration_jobs_on_class_name_and_arguments ON public.background_migration_jobs USING btree (class_name, arguments);
CREATE INDEX index_background_migration_jobs_on_class_name_and_status_and_id ON public.background_migration_jobs USING btree (class_name, status, id);
CREATE INDEX index_badges_on_group_id ON public.badges USING btree (group_id); CREATE INDEX index_badges_on_group_id ON public.badges USING btree (group_id);
CREATE INDEX index_badges_on_project_id ON public.badges USING btree (project_id); CREATE INDEX index_badges_on_project_id ON public.badges USING btree (project_id);
...@@ -23641,6 +23669,7 @@ COPY "schema_migrations" (version) FROM STDIN; ...@@ -23641,6 +23669,7 @@ COPY "schema_migrations" (version) FROM STDIN;
20200630091656 20200630091656
20200630110826 20200630110826
20200701093859 20200701093859
20200701205710
20200702123805 20200702123805
20200703154822 20200703154822
20200704143633 20200704143633
......
# frozen_string_literal: true
module Gitlab
module Database
class BackgroundMigrationJob < ActiveRecord::Base # rubocop:disable Rails/ApplicationRecord
self.table_name = :background_migration_jobs
scope :for_migration_execution, -> (class_name, arguments) do
where('class_name = ? AND arguments = ?', class_name, arguments.to_json)
end
enum status: {
pending: 0,
succeeded: 1
}
def self.mark_all_as_succeeded(class_name, arguments)
self.pending.for_migration_execution(class_name, arguments)
.update_all("status = #{statuses[:succeeded]}, updated_at = NOW()")
end
end
end
end
...@@ -64,6 +64,10 @@ module Gitlab ...@@ -64,6 +64,10 @@ module Gitlab
# delay_interval - The duration between each job's scheduled time (must respond to `to_f`) # delay_interval - The duration between each job's scheduled time (must respond to `to_f`)
# batch_size - The maximum number of rows per job # batch_size - The maximum number of rows per job
# other_arguments - Other arguments to send to the job # other_arguments - Other arguments to send to the job
# track_jobs - When this flag is set, creates a record in the background_migration_jobs table for each job that
# is scheduled to be run. These records can be used to trace execution of the background job, but there is no
# builtin support to manage that automatically at this time. You should only set this flag if you are aware of
# how it works, and intend to manually cleanup the database records in your background job.
# #
# *Returns the final migration delay* # *Returns the final migration delay*
# #
...@@ -83,7 +87,7 @@ module Gitlab ...@@ -83,7 +87,7 @@ module Gitlab
# # do something # # do something
# end # end
# end # end
def queue_background_migration_jobs_by_range_at_intervals(model_class, job_class_name, delay_interval, batch_size: BACKGROUND_MIGRATION_BATCH_SIZE, other_job_arguments: [], initial_delay: 0) def queue_background_migration_jobs_by_range_at_intervals(model_class, job_class_name, delay_interval, batch_size: BACKGROUND_MIGRATION_BATCH_SIZE, other_job_arguments: [], initial_delay: 0, track_jobs: false)
raise "#{model_class} does not have an ID to use for batch ranges" unless model_class.column_names.include?('id') raise "#{model_class} does not have an ID to use for batch ranges" unless model_class.column_names.include?('id')
# To not overload the worker too much we enforce a minimum interval both # To not overload the worker too much we enforce a minimum interval both
...@@ -101,7 +105,10 @@ module Gitlab ...@@ -101,7 +105,10 @@ module Gitlab
# the same time, which is not helpful in most cases where we wish to # the same time, which is not helpful in most cases where we wish to
# spread the work over time. # spread the work over time.
final_delay = initial_delay + delay_interval * index final_delay = initial_delay + delay_interval * index
migrate_in(final_delay, job_class_name, [start_id, end_id] + other_job_arguments) full_job_arguments = [start_id, end_id] + other_job_arguments
track_in_database(job_class_name, full_job_arguments) if track_jobs
migrate_in(final_delay, job_class_name, full_job_arguments)
end end
final_delay final_delay
...@@ -138,6 +145,12 @@ module Gitlab ...@@ -138,6 +145,12 @@ module Gitlab
def with_migration_context(&block) def with_migration_context(&block)
Gitlab::ApplicationContext.with_context(caller_id: self.class.to_s, &block) Gitlab::ApplicationContext.with_context(caller_id: self.class.to_s, &block)
end end
private
def track_in_database(class_name, arguments)
Gitlab::Database::BackgroundMigrationJob.create!(class_name: class_name, arguments: arguments)
end
end end
end end
end end
......
...@@ -27,11 +27,13 @@ module Gitlab ...@@ -27,11 +27,13 @@ module Gitlab
parent_batch_relation = relation_scoped_to_range(source_table, source_column, start_id, stop_id) parent_batch_relation = relation_scoped_to_range(source_table, source_column, start_id, stop_id)
parent_batch_relation.each_batch(of: SUB_BATCH_SIZE) do |sub_batch| parent_batch_relation.each_batch(of: SUB_BATCH_SIZE) do |sub_batch|
start_id, stop_id = sub_batch.pluck(Arel.sql("MIN(#{source_column}), MAX(#{source_column})")).first sub_start_id, sub_stop_id = sub_batch.pluck(Arel.sql("MIN(#{source_column}), MAX(#{source_column})")).first
bulk_copy.copy_between(start_id, stop_id) bulk_copy.copy_between(sub_start_id, sub_stop_id)
sleep(PAUSE_SECONDS) sleep(PAUSE_SECONDS)
end end
mark_jobs_as_succeeded(start_id, stop_id, source_table, partitioned_table, source_column)
end end
private private
...@@ -56,6 +58,10 @@ module Gitlab ...@@ -56,6 +58,10 @@ module Gitlab
define_batchable_model(source_table).where(source_key_column => start_id..stop_id) define_batchable_model(source_table).where(source_key_column => start_id..stop_id)
end end
def mark_jobs_as_succeeded(*arguments)
BackgroundMigrationJob.mark_all_as_succeeded(self.class.name, arguments)
end
# Helper class to copy data between two tables via upserts # Helper class to copy data between two tables via upserts
class BulkCopy class BulkCopy
DELIMITER = ', ' DELIMITER = ', '
......
...@@ -258,7 +258,8 @@ module Gitlab ...@@ -258,7 +258,8 @@ module Gitlab
MIGRATION_CLASS_NAME, MIGRATION_CLASS_NAME,
BATCH_INTERVAL, BATCH_INTERVAL,
batch_size: BATCH_SIZE, batch_size: BATCH_SIZE,
other_job_arguments: [source_table_name.to_s, partitioned_table_name, source_key]) other_job_arguments: [source_table_name.to_s, partitioned_table_name, source_key],
track_jobs: true)
end end
end end
end end
......
# frozen_string_literal: true
FactoryBot.define do
factory :background_migration_job, class: '::Gitlab::Database::BackgroundMigrationJob' do
class_name { 'TestJob' }
status { :pending }
arguments { [] }
trait :succeeded do
status { :succeeded }
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::Database::BackgroundMigrationJob do
it_behaves_like 'having unique enum values'
describe '.for_migration_execution' do
let!(:job1) { create(:background_migration_job) }
let!(:job2) { create(:background_migration_job, arguments: ['hi', 2]) }
let!(:job3) { create(:background_migration_job, class_name: 'OtherJob', arguments: ['hi', 2]) }
it 'returns jobs matching class_name and arguments' do
relation = described_class.for_migration_execution('TestJob', ['hi', 2])
expect(relation.count).to eq(1)
expect(relation.first).to have_attributes(class_name: 'TestJob', arguments: ['hi', 2])
end
end
describe '.mark_all_as_succeeded' do
let!(:job1) { create(:background_migration_job, arguments: [1, 100]) }
let!(:job2) { create(:background_migration_job, arguments: [1, 100]) }
let!(:job3) { create(:background_migration_job, arguments: [101, 200]) }
let!(:job4) { create(:background_migration_job, class_name: 'OtherJob', arguments: [1, 100]) }
it 'marks all matching jobs as succeeded' do
expect { described_class.mark_all_as_succeeded('TestJob', [1, 100]) }
.to change { described_class.succeeded.count }.from(0).to(2)
expect(job1.reload).to be_succeeded
expect(job2.reload).to be_succeeded
expect(job3.reload).to be_pending
expect(job4.reload).to be_pending
end
context 'when previous matching jobs have already succeeded' do
let(:initial_time) { Time.now.round }
let!(:job1) { create(:background_migration_job, :succeeded, created_at: initial_time, updated_at: initial_time) }
it 'does not update non-pending jobs' do
Timecop.freeze(initial_time + 1.day) do
expect { described_class.mark_all_as_succeeded('TestJob', [1, 100]) }
.to change { described_class.succeeded.count }.from(1).to(2)
end
expect(job1.reload.updated_at).to eq(initial_time)
expect(job2.reload).to be_succeeded
expect(job3.reload).to be_pending
expect(job4.reload).to be_pending
end
end
end
end
...@@ -156,6 +156,37 @@ RSpec.describe Gitlab::Database::Migrations::BackgroundMigrationHelpers do ...@@ -156,6 +156,37 @@ RSpec.describe Gitlab::Database::Migrations::BackgroundMigrationHelpers do
end end
end end
end end
context 'with track_jobs option' do
it 'creates a record for each job in the database' do
Sidekiq::Testing.fake! do
expect do
model.queue_background_migration_jobs_by_range_at_intervals(User, 'FooJob', 10.minutes,
other_job_arguments: [1, 2], track_jobs: true)
end.to change { Gitlab::Database::BackgroundMigrationJob.count }.from(0).to(1)
expect(BackgroundMigrationWorker.jobs.size).to eq(1)
tracked_job = Gitlab::Database::BackgroundMigrationJob.first
expect(tracked_job.class_name).to eq('FooJob')
expect(tracked_job.arguments).to eq([id1, id3, 1, 2])
expect(tracked_job).to be_pending
end
end
end
context 'without track_jobs option' do
it 'does not create records in the database' do
Sidekiq::Testing.fake! do
expect do
model.queue_background_migration_jobs_by_range_at_intervals(User, 'FooJob', 10.minutes, other_job_arguments: [1, 2])
end.not_to change { Gitlab::Database::BackgroundMigrationJob.count }
expect(BackgroundMigrationWorker.jobs.size).to eq(1)
end
end
end
end end
context "when the model doesn't have an ID column" do context "when the model doesn't have an ID column" do
......
...@@ -96,6 +96,17 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartition ...@@ -96,6 +96,17 @@ RSpec.describe Gitlab::Database::PartitioningMigrationHelpers::BackfillPartition
subject.perform(source1.id, source3.id, source_table, destination_table, unique_key) subject.perform(source1.id, source3.id, source_table, destination_table, unique_key)
end end
it 'marks each job record as succeeded after processing' do
create(:background_migration_job, class_name: described_class.name,
arguments: [source1.id, source3.id, source_table, destination_table, unique_key])
expect(::Gitlab::Database::BackgroundMigrationJob).to receive(:mark_all_as_succeeded).and_call_original
expect do
subject.perform(source1.id, source3.id, source_table, destination_table, unique_key)
end.to change { ::Gitlab::Database::BackgroundMigrationJob.succeeded.count }.from(0).to(1)
end
context 'when the feature flag is disabled' do context 'when the feature flag is disabled' do
let(:mock_connection) { double('connection') } let(:mock_connection) { double('connection') }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment