Commit 03d04754 authored by Heinrich Lee Yu's avatar Heinrich Lee Yu Committed by Matthias Käppler

Retry failed or stuck batched migration jobs

parent 3b316758
---
title: Add index to batched migration jobs status
merge_request: 60248
author:
type: other
# frozen_string_literal: true
class AddIndexToBatchedMigrationJobsStatus < ActiveRecord::Migration[6.0]
include Gitlab::Database::MigrationHelpers
disable_ddl_transaction!
INDEX_NAME = 'index_batched_jobs_on_batched_migration_id_and_status'
def up
add_concurrent_index :batched_background_migration_jobs, [:batched_background_migration_id, :status], name: INDEX_NAME
end
def down
remove_concurrent_index_by_name :batched_background_migration_jobs, INDEX_NAME
end
end
306bb2bc3bfd20a57f1ac473e32596e7b7e7b6c2ae41c3fe5a7f45c551ce9207
\ No newline at end of file
...@@ -22115,6 +22115,8 @@ CREATE INDEX index_badges_on_project_id ON badges USING btree (project_id); ...@@ -22115,6 +22115,8 @@ CREATE INDEX index_badges_on_project_id ON badges USING btree (project_id);
CREATE INDEX index_batched_jobs_by_batched_migration_id_and_id ON batched_background_migration_jobs USING btree (batched_background_migration_id, id); CREATE INDEX index_batched_jobs_by_batched_migration_id_and_id ON batched_background_migration_jobs USING btree (batched_background_migration_id, id);
CREATE INDEX index_batched_jobs_on_batched_migration_id_and_status ON batched_background_migration_jobs USING btree (batched_background_migration_id, status);
CREATE INDEX index_batched_migrations_on_job_table_and_column_name ON batched_background_migrations USING btree (job_class_name, table_name, column_name); CREATE INDEX index_batched_migrations_on_job_table_and_column_name ON batched_background_migrations USING btree (job_class_name, table_name, column_name);
CREATE INDEX index_board_assignees_on_assignee_id ON board_assignees USING btree (assignee_id); CREATE INDEX index_board_assignees_on_assignee_id ON board_assignees USING btree (assignee_id);
...@@ -4,10 +4,23 @@ module Gitlab ...@@ -4,10 +4,23 @@ module Gitlab
module Database module Database
module BackgroundMigration module BackgroundMigration
class BatchedJob < ActiveRecord::Base # rubocop:disable Rails/ApplicationRecord class BatchedJob < ActiveRecord::Base # rubocop:disable Rails/ApplicationRecord
include FromUnion
self.table_name = :batched_background_migration_jobs self.table_name = :batched_background_migration_jobs
MAX_ATTEMPTS = 3
STUCK_JOBS_TIMEOUT = 1.hour.freeze
belongs_to :batched_migration, foreign_key: :batched_background_migration_id belongs_to :batched_migration, foreign_key: :batched_background_migration_id
scope :active, -> { where(status: [:pending, :running]) }
scope :stuck, -> { active.where('updated_at <= ?', STUCK_JOBS_TIMEOUT.ago) }
scope :retriable, -> {
failed_jobs = where(status: :failed).where('attempts < ?', MAX_ATTEMPTS)
from_union([failed_jobs, self.stuck])
}
enum status: { enum status: {
pending: 0, pending: 0,
running: 1, running: 1,
......
...@@ -20,7 +20,8 @@ module Gitlab ...@@ -20,7 +20,8 @@ module Gitlab
paused: 0, paused: 0,
active: 1, active: 1,
aborted: 2, aborted: 2,
finished: 3 finished: 3,
failed: 4
} }
attribute :pause_ms, :integer, default: 100 attribute :pause_ms, :integer, default: 100
......
...@@ -19,7 +19,7 @@ module Gitlab ...@@ -19,7 +19,7 @@ module Gitlab
# #
# Note that this method is primarily intended to called by a scheduled worker. # Note that this method is primarily intended to called by a scheduled worker.
def run_migration_job(active_migration) def run_migration_job(active_migration)
if next_batched_job = create_next_batched_job!(active_migration) if next_batched_job = find_or_create_next_batched_job(active_migration)
migration_wrapper.perform(next_batched_job) migration_wrapper.perform(next_batched_job)
active_migration.optimize! active_migration.optimize!
...@@ -48,12 +48,12 @@ module Gitlab ...@@ -48,12 +48,12 @@ module Gitlab
attr_reader :migration_wrapper attr_reader :migration_wrapper
def create_next_batched_job!(active_migration) def find_or_create_next_batched_job(active_migration)
next_batch_range = find_next_batch_range(active_migration) if next_batch_range = find_next_batch_range(active_migration)
return if next_batch_range.nil?
active_migration.create_batched_job!(next_batch_range.min, next_batch_range.max) active_migration.create_batched_job!(next_batch_range.min, next_batch_range.max)
else
active_migration.batched_jobs.retriable.first
end
end end
def find_next_batch_range(active_migration) def find_next_batch_range(active_migration)
...@@ -82,9 +82,15 @@ module Gitlab ...@@ -82,9 +82,15 @@ module Gitlab
end end
def finish_active_migration(active_migration) def finish_active_migration(active_migration)
return if active_migration.batched_jobs.active.exists?
if active_migration.batched_jobs.failed.exists?
active_migration.failed!
else
active_migration.finished! active_migration.finished!
end end
end end
end end
end end
end
end end
...@@ -31,7 +31,7 @@ module Gitlab ...@@ -31,7 +31,7 @@ module Gitlab
private private
def start_tracking_execution(tracking_record) def start_tracking_execution(tracking_record)
tracking_record.update!(attempts: tracking_record.attempts + 1, status: :running, started_at: Time.current) tracking_record.update!(attempts: tracking_record.attempts + 1, status: :running, started_at: Time.current, finished_at: nil, metrics: {})
end end
def execute_batch(tracking_record) def execute_batch(tracking_record)
......
...@@ -9,6 +9,42 @@ RSpec.describe Gitlab::Database::BackgroundMigration::BatchedJob, type: :model d ...@@ -9,6 +9,42 @@ RSpec.describe Gitlab::Database::BackgroundMigration::BatchedJob, type: :model d
it { is_expected.to belong_to(:batched_migration).with_foreign_key(:batched_background_migration_id) } it { is_expected.to belong_to(:batched_migration).with_foreign_key(:batched_background_migration_id) }
end end
describe 'scopes' do
let_it_be(:fixed_time) { Time.new(2021, 04, 27, 10, 00, 00, 00) }
let_it_be(:pending_job) { create(:batched_background_migration_job, status: :pending, updated_at: fixed_time) }
let_it_be(:running_job) { create(:batched_background_migration_job, status: :running, updated_at: fixed_time) }
let_it_be(:stuck_job) { create(:batched_background_migration_job, status: :pending, updated_at: fixed_time - described_class::STUCK_JOBS_TIMEOUT) }
let_it_be(:failed_job) { create(:batched_background_migration_job, status: :failed, attempts: 1) }
before_all do
create(:batched_background_migration_job, status: :failed, attempts: described_class::MAX_ATTEMPTS)
create(:batched_background_migration_job, status: :succeeded)
end
before do
travel_to fixed_time
end
describe '.active' do
it 'returns active jobs' do
expect(described_class.active).to contain_exactly(pending_job, running_job, stuck_job)
end
end
describe '.stuck' do
it 'returns stuck jobs' do
expect(described_class.stuck).to contain_exactly(stuck_job)
end
end
describe '.retriable' do
it 'returns retriable jobs' do
expect(described_class.retriable).to contain_exactly(failed_job, stuck_job)
end
end
end
describe 'delegated batched_migration attributes' do describe 'delegated batched_migration attributes' do
let(:batched_job) { build(:batched_background_migration_job) } let(:batched_job) { build(:batched_background_migration_job) }
let(:batched_migration) { batched_job.batched_migration } let(:batched_migration) { batched_job.batched_migration }
......
...@@ -17,9 +17,9 @@ RSpec.describe Gitlab::Database::BackgroundMigration::BatchedMigrationRunner do ...@@ -17,9 +17,9 @@ RSpec.describe Gitlab::Database::BackgroundMigration::BatchedMigrationRunner do
end end
it 'marks the migration as finished' do it 'marks the migration as finished' do
relation = Gitlab::Database::BackgroundMigration::BatchedMigration.finished.where(id: migration.id) runner.run_migration_job(migration)
expect { runner.run_migration_job(migration) }.to change { relation.count }.by(1) expect(migration.reload).to be_finished
end end
end end
...@@ -92,7 +92,7 @@ RSpec.describe Gitlab::Database::BackgroundMigration::BatchedMigrationRunner do ...@@ -92,7 +92,7 @@ RSpec.describe Gitlab::Database::BackgroundMigration::BatchedMigrationRunner do
let!(:event3) { create(:event) } let!(:event3) { create(:event) }
let!(:migration) do let!(:migration) do
create(:batched_background_migration, :active, batch_size: 2, min_value: event1.id, max_value: event3.id) create(:batched_background_migration, :active, batch_size: 2, min_value: event1.id, max_value: event2.id)
end end
let!(:previous_job) do let!(:previous_job) do
...@@ -101,14 +101,24 @@ RSpec.describe Gitlab::Database::BackgroundMigration::BatchedMigrationRunner do ...@@ -101,14 +101,24 @@ RSpec.describe Gitlab::Database::BackgroundMigration::BatchedMigrationRunner do
min_value: event1.id, min_value: event1.id,
max_value: event2.id, max_value: event2.id,
batch_size: 2, batch_size: 2,
sub_batch_size: 1) sub_batch_size: 1,
status: :succeeded
)
end end
let(:job_relation) do let(:job_relation) do
Gitlab::Database::BackgroundMigration::BatchedJob.where(batched_background_migration_id: migration.id) Gitlab::Database::BackgroundMigration::BatchedJob.where(batched_background_migration_id: migration.id)
end end
context 'when the migration has no batches remaining' do
it_behaves_like 'it has completed the migration'
end
context 'when the migration has batches to process' do context 'when the migration has batches to process' do
before do
migration.update!(max_value: event3.id)
end
it 'runs the migration job for the next batch' do it 'runs the migration job for the next batch' do
expect(migration_wrapper).to receive(:perform) do |job_record| expect(migration_wrapper).to receive(:perform) do |job_record|
expect(job_record).to eq(job_relation.last) expect(job_record).to eq(job_relation.last)
...@@ -132,17 +142,82 @@ RSpec.describe Gitlab::Database::BackgroundMigration::BatchedMigrationRunner do ...@@ -132,17 +142,82 @@ RSpec.describe Gitlab::Database::BackgroundMigration::BatchedMigrationRunner do
end end
end end
context 'when the migration has no batches remaining' do context 'when migration has failed jobs' do
before do before do
create(:batched_background_migration_job, previous_job.update!(status: :failed)
batched_migration: migration,
min_value: event3.id,
max_value: event3.id,
batch_size: 2,
sub_batch_size: 1)
end end
it_behaves_like 'it has completed the migration' it 'retries the failed job' do
expect(migration_wrapper).to receive(:perform) do |job_record|
expect(job_record).to eq(previous_job)
end
expect { runner.run_migration_job(migration) }.to change { job_relation.count }.by(0)
end
context 'when failed job has reached the maximum number of attempts' do
before do
previous_job.update!(attempts: Gitlab::Database::BackgroundMigration::BatchedJob::MAX_ATTEMPTS)
end
it 'marks the migration as failed' do
expect(migration_wrapper).not_to receive(:perform)
expect { runner.run_migration_job(migration) }.to change { job_relation.count }.by(0)
expect(migration).to be_failed
end
end
end
context 'when migration has stuck jobs' do
before do
previous_job.update!(status: :running, updated_at: 1.hour.ago - Gitlab::Database::BackgroundMigration::BatchedJob::STUCK_JOBS_TIMEOUT)
end
it 'retries the stuck job' do
expect(migration_wrapper).to receive(:perform) do |job_record|
expect(job_record).to eq(previous_job)
end
expect { runner.run_migration_job(migration.reload) }.to change { job_relation.count }.by(0)
end
end
context 'when migration has possible stuck jobs' do
before do
previous_job.update!(status: :running, updated_at: 1.hour.from_now - Gitlab::Database::BackgroundMigration::BatchedJob::STUCK_JOBS_TIMEOUT)
end
it 'keeps the migration active' do
expect(migration_wrapper).not_to receive(:perform)
expect { runner.run_migration_job(migration) }.to change { job_relation.count }.by(0)
expect(migration.reload).to be_active
end
end
context 'when the migration has batches to process and failed jobs' do
before do
migration.update!(max_value: event3.id)
previous_job.update!(status: :failed)
end
it 'runs next batch then retries the failed job' do
expect(migration_wrapper).to receive(:perform) do |job_record|
expect(job_record).to eq(job_relation.last)
job_record.update!(status: :succeeded)
end
expect { runner.run_migration_job(migration) }.to change { job_relation.count }.by(1)
expect(migration_wrapper).to receive(:perform) do |job_record|
expect(job_record).to eq(previous_job)
end
expect { runner.run_migration_job(migration.reload) }.to change { job_relation.count }.by(0)
end
end end
end end
end end
...@@ -189,10 +264,12 @@ RSpec.describe Gitlab::Database::BackgroundMigration::BatchedMigrationRunner do ...@@ -189,10 +264,12 @@ RSpec.describe Gitlab::Database::BackgroundMigration::BatchedMigrationRunner do
it 'runs all jobs inline until finishing the migration' do it 'runs all jobs inline until finishing the migration' do
expect(migration_wrapper).to receive(:perform) do |job_record| expect(migration_wrapper).to receive(:perform) do |job_record|
expect(job_record).to eq(job_relation.first) expect(job_record).to eq(job_relation.first)
job_record.update!(status: :succeeded)
end end
expect(migration_wrapper).to receive(:perform) do |job_record| expect(migration_wrapper).to receive(:perform) do |job_record|
expect(job_record).to eq(job_relation.last) expect(job_record).to eq(job_relation.last)
job_record.update!(status: :succeeded)
end end
expect { runner.run_entire_migration(migration) }.to change { job_relation.count }.by(2) expect { runner.run_entire_migration(migration) }.to change { job_relation.count }.by(2)
......
...@@ -49,6 +49,42 @@ RSpec.describe Gitlab::Database::BackgroundMigration::BatchedMigrationWrapper, ' ...@@ -49,6 +49,42 @@ RSpec.describe Gitlab::Database::BackgroundMigration::BatchedMigrationWrapper, '
end end
end end
context 'when running a job that failed previously' do
let!(:job_record) do
create(:batched_background_migration_job,
batched_migration: active_migration,
pause_ms: pause_ms,
attempts: 1,
status: :failed,
finished_at: 1.hour.ago,
metrics: { 'my_metrics' => 'some_value' }
)
end
it 'increments attempts and updates other fields' do
updated_metrics = { 'updated_metrics' => 'some_value' }
expect(job_instance).to receive(:perform)
expect(job_instance).to receive(:batch_metrics).and_return(updated_metrics)
expect(job_record).to receive(:update!).with(
hash_including(attempts: 2, status: :running, finished_at: nil, metrics: {})
).and_call_original
freeze_time do
subject
job_record.reload
expect(job_record).not_to be_failed
expect(job_record.attempts).to eq(2)
expect(job_record.started_at).to eq(Time.current)
expect(job_record.finished_at).to eq(Time.current)
expect(job_record.metrics).to eq(updated_metrics)
end
end
end
context 'reporting prometheus metrics' do context 'reporting prometheus metrics' do
let(:labels) { job_record.batched_migration.prometheus_labels } let(:labels) { job_record.batched_migration.prometheus_labels }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment