Commit eeb94aef authored by Patrick Bajao's avatar Patrick Bajao

Merge branch '296874-improve-merge-requests-cleanup-refs-scheduling' into 'master'

Schedule MergeRequestCleanupRefsWorker more efficiently

See merge request gitlab-org/gitlab!65647
parents 3c1f7a6f 0080186a
# frozen_string_literal: true
class MergeRequest::CleanupSchedule < ApplicationRecord
STATUSES = {
unstarted: 0,
running: 1,
completed: 2,
failed: 3
}.freeze
belongs_to :merge_request, inverse_of: :cleanup_schedule
validates :scheduled_at, presence: true
def self.scheduled_merge_request_ids(limit)
where('completed_at IS NULL AND scheduled_at <= NOW()')
state_machine :status, initial: :unstarted do
state :unstarted, value: STATUSES[:unstarted]
state :running, value: STATUSES[:running]
state :completed, value: STATUSES[:completed]
state :failed, value: STATUSES[:failed]
event :run do
transition unstarted: :running
end
event :retry do
transition running: :unstarted
end
event :complete do
transition running: :completed
end
event :mark_as_failed do
transition running: :failed
end
before_transition to: [:completed] do |cleanup_schedule, _transition|
cleanup_schedule.completed_at = Time.current
end
before_transition from: :running, to: [:unstarted, :failed] do |cleanup_schedule, _transition|
cleanup_schedule.failed_count += 1
end
end
scope :scheduled_and_unstarted, -> {
where('completed_at IS NULL AND scheduled_at <= NOW() AND status = ?', STATUSES[:unstarted])
.order('scheduled_at DESC')
.limit(limit)
.pluck(:merge_request_id)
}
def self.start_next
MergeRequest::CleanupSchedule.transaction do
cleanup_schedule = scheduled_and_unstarted.lock('FOR UPDATE SKIP LOCKED').first
next if cleanup_schedule.blank?
cleanup_schedule.run!
cleanup_schedule
end
end
end
......@@ -2,6 +2,8 @@
class MergeRequestCleanupRefsWorker
include ApplicationWorker
include LimitedCapacity::Worker
include Gitlab::Utils::StrongMemoize
sidekiq_options retry: 3
......@@ -9,20 +11,60 @@ class MergeRequestCleanupRefsWorker
tags :exclude_from_kubernetes
idempotent!
def perform(merge_request_id)
return unless Feature.enabled?(:merge_request_refs_cleanup, default_enabled: false)
# Hard-coded to 4 for now. Will be configurable later on via application settings.
# This means, there can only be 4 jobs running at the same time at maximum.
MAX_RUNNING_JOBS = 4
FAILURE_THRESHOLD = 3
merge_request = MergeRequest.find_by_id(merge_request_id)
def perform_work
return unless Feature.enabled?(:merge_request_refs_cleanup, default_enabled: false)
unless merge_request
logger.error("Failed to find merge request with ID: #{merge_request_id}")
logger.error('No existing merge request to be cleaned up.')
return
end
result = ::MergeRequests::CleanupRefsService.new(merge_request).execute
log_extra_metadata_on_done(:merge_request_id, merge_request.id)
return if result[:status] == :success
result = MergeRequests::CleanupRefsService.new(merge_request).execute
logger.error("Failed cleanup refs of merge request (#{merge_request_id}): #{result[:message]}")
if result[:status] == :success
merge_request_cleanup_schedule.complete!
else
if merge_request_cleanup_schedule.failed_count < FAILURE_THRESHOLD
merge_request_cleanup_schedule.retry!
else
merge_request_cleanup_schedule.mark_as_failed!
end
log_extra_metadata_on_done(:message, result[:message])
end
log_extra_metadata_on_done(:status, merge_request_cleanup_schedule.status)
end
def remaining_work_count
MergeRequest::CleanupSchedule
.scheduled_and_unstarted
.limit(max_running_jobs)
.count
end
def max_running_jobs
MAX_RUNNING_JOBS
end
private
def merge_request
strong_memoize(:merge_request) do
merge_request_cleanup_schedule&.merge_request
end
end
def merge_request_cleanup_schedule
strong_memoize(:merge_request_cleanup_schedule) do
MergeRequest::CleanupSchedule.start_next
end
end
end
......@@ -10,21 +10,10 @@ class ScheduleMergeRequestCleanupRefsWorker
tags :exclude_from_kubernetes
idempotent!
# Based on existing data, MergeRequestCleanupRefsWorker can run 3 jobs per
# second. This means that 180 jobs can be performed but since there are some
# spikes from time time, it's better to give it some allowance.
LIMIT = 180
DELAY = 10.seconds
BATCH_SIZE = 30
def perform
return if Gitlab::Database.read_only?
return unless Feature.enabled?(:merge_request_refs_cleanup, default_enabled: false)
ids = MergeRequest::CleanupSchedule.scheduled_merge_request_ids(LIMIT).map { |id| [id] }
MergeRequestCleanupRefsWorker.bulk_perform_in(DELAY, ids, batch_size: BATCH_SIZE) # rubocop:disable Scalability/BulkPerformWithContext
log_extra_metadata_on_done(:merge_requests_count, ids.size)
MergeRequestCleanupRefsWorker.perform_with_capacity
end
end
# frozen_string_literal: true
class AddStatusToMergeRequestCleanupSchedules < ActiveRecord::Migration[6.1]
include Gitlab::Database::MigrationHelpers
INDEX_NAME = 'index_merge_request_cleanup_schedules_on_status'
disable_ddl_transaction!
def up
unless column_exists?(:merge_request_cleanup_schedules, :status)
add_column(:merge_request_cleanup_schedules, :status, :integer, limit: 2, default: 0, null: false)
end
add_concurrent_index(:merge_request_cleanup_schedules, :status, name: INDEX_NAME)
end
def down
remove_concurrent_index_by_name(:merge_request_cleanup_schedules, INDEX_NAME)
if column_exists?(:merge_request_cleanup_schedules, :status)
remove_column(:merge_request_cleanup_schedules, :status)
end
end
end
# frozen_string_literal: true
class AddFailedCountToMergeRequestCleanupSchedules < ActiveRecord::Migration[6.1]
include Gitlab::Database::MigrationHelpers
def change
add_column :merge_request_cleanup_schedules, :failed_count, :integer, default: 0, null: false
end
end
# frozen_string_literal: true
class UpdateMergeRequestCleanupSchedulesScheduledAtIndex < ActiveRecord::Migration[6.1]
include Gitlab::Database::MigrationHelpers
INDEX_NAME = 'index_mr_cleanup_schedules_timestamps_status'
OLD_INDEX_NAME = 'index_mr_cleanup_schedules_timestamps'
disable_ddl_transaction!
def up
add_concurrent_index(:merge_request_cleanup_schedules, :scheduled_at, where: 'completed_at IS NULL AND status = 0', name: INDEX_NAME)
remove_concurrent_index_by_name(:merge_request_cleanup_schedules, OLD_INDEX_NAME)
end
def down
remove_concurrent_index_by_name(:merge_request_cleanup_schedules, INDEX_NAME)
add_concurrent_index(:merge_request_cleanup_schedules, :scheduled_at, where: 'completed_at IS NULL', name: OLD_INDEX_NAME)
end
end
98d4deaf0564119c1ee44d76d3a30bff1a0fceb7cab67c5dbef576faef62ddf5
\ No newline at end of file
77f6db1d2aeebdefd76c96966da6c9e4ce5da2c92a42f6ac2398b35fa21c680f
\ No newline at end of file
2899d954a199fa52bf6ab4beca5f22dcb9f9f0312e658f1307d1a7355394f1bb
\ No newline at end of file
......@@ -14711,7 +14711,9 @@ CREATE TABLE merge_request_cleanup_schedules (
scheduled_at timestamp with time zone NOT NULL,
completed_at timestamp with time zone,
created_at timestamp with time zone NOT NULL,
updated_at timestamp with time zone NOT NULL
updated_at timestamp with time zone NOT NULL,
status smallint DEFAULT 0 NOT NULL,
failed_count integer DEFAULT 0 NOT NULL
);
CREATE SEQUENCE merge_request_cleanup_schedules_merge_request_id_seq
......@@ -23988,6 +23990,8 @@ CREATE INDEX index_merge_request_blocks_on_blocked_merge_request_id ON merge_req
CREATE UNIQUE INDEX index_merge_request_cleanup_schedules_on_merge_request_id ON merge_request_cleanup_schedules USING btree (merge_request_id);
CREATE INDEX index_merge_request_cleanup_schedules_on_status ON merge_request_cleanup_schedules USING btree (status);
CREATE UNIQUE INDEX index_merge_request_diff_commit_users_on_name_and_email ON merge_request_diff_commit_users USING btree (name, email);
CREATE INDEX index_merge_request_diff_commits_on_sha ON merge_request_diff_commits USING btree (sha);
......@@ -24120,7 +24124,7 @@ CREATE INDEX index_mirror_data_non_scheduled_or_started ON project_mirror_data U
CREATE UNIQUE INDEX index_mr_blocks_on_blocking_and_blocked_mr_ids ON merge_request_blocks USING btree (blocking_merge_request_id, blocked_merge_request_id);
CREATE INDEX index_mr_cleanup_schedules_timestamps ON merge_request_cleanup_schedules USING btree (scheduled_at) WHERE (completed_at IS NULL);
CREATE INDEX index_mr_cleanup_schedules_timestamps_status ON merge_request_cleanup_schedules USING btree (scheduled_at) WHERE ((completed_at IS NULL) AND (status = 0));
CREATE UNIQUE INDEX index_mr_context_commits_on_merge_request_id_and_sha ON merge_request_context_commits USING btree (merge_request_id, sha);
......@@ -3,6 +3,19 @@
FactoryBot.define do
factory :merge_request_cleanup_schedule, class: 'MergeRequest::CleanupSchedule' do
merge_request
scheduled_at { Time.current }
scheduled_at { 1.day.ago }
trait :running do
status { MergeRequest::CleanupSchedule::STATUSES[:running] }
end
trait :completed do
status { MergeRequest::CleanupSchedule::STATUSES[:completed] }
completed_at { Time.current }
end
trait :failed do
status { MergeRequest::CleanupSchedule::STATUSES[:failed] }
end
end
end
......@@ -11,22 +11,125 @@ RSpec.describe MergeRequest::CleanupSchedule do
it { is_expected.to validate_presence_of(:scheduled_at) }
end
describe '.scheduled_merge_request_ids' do
let_it_be(:mr_cleanup_schedule_1) { create(:merge_request_cleanup_schedule, scheduled_at: 2.days.ago) }
let_it_be(:mr_cleanup_schedule_2) { create(:merge_request_cleanup_schedule, scheduled_at: 1.day.ago) }
let_it_be(:mr_cleanup_schedule_3) { create(:merge_request_cleanup_schedule, scheduled_at: 1.day.ago, completed_at: Time.current) }
let_it_be(:mr_cleanup_schedule_4) { create(:merge_request_cleanup_schedule, scheduled_at: 4.days.ago) }
let_it_be(:mr_cleanup_schedule_5) { create(:merge_request_cleanup_schedule, scheduled_at: 3.days.ago) }
let_it_be(:mr_cleanup_schedule_6) { create(:merge_request_cleanup_schedule, scheduled_at: 1.day.from_now) }
let_it_be(:mr_cleanup_schedule_7) { create(:merge_request_cleanup_schedule, scheduled_at: 5.days.ago) }
it 'only includes incomplete schedule within the specified limit' do
expect(described_class.scheduled_merge_request_ids(4)).to eq([
mr_cleanup_schedule_2.merge_request_id,
mr_cleanup_schedule_1.merge_request_id,
mr_cleanup_schedule_5.merge_request_id,
mr_cleanup_schedule_4.merge_request_id
describe 'state machine transitions' do
let(:cleanup_schedule) { create(:merge_request_cleanup_schedule) }
it 'sets status to unstarted by default' do
expect(cleanup_schedule).to be_unstarted
end
describe '#run' do
it 'sets the status to running' do
cleanup_schedule.run
expect(cleanup_schedule.reload).to be_running
end
context 'when previous status is not unstarted' do
let(:cleanup_schedule) { create(:merge_request_cleanup_schedule, :running) }
it 'does not change status' do
expect { cleanup_schedule.run }.not_to change(cleanup_schedule, :status)
end
end
end
describe '#retry' do
let(:cleanup_schedule) { create(:merge_request_cleanup_schedule, :running) }
it 'sets the status to unstarted' do
cleanup_schedule.retry
expect(cleanup_schedule.reload).to be_unstarted
end
it 'increments failed_count' do
expect { cleanup_schedule.retry }.to change(cleanup_schedule, :failed_count).by(1)
end
context 'when previous status is not running' do
let(:cleanup_schedule) { create(:merge_request_cleanup_schedule) }
it 'does not change status' do
expect { cleanup_schedule.retry }.not_to change(cleanup_schedule, :status)
end
end
end
describe '#complete' do
let(:cleanup_schedule) { create(:merge_request_cleanup_schedule, :running) }
it 'sets the status to completed' do
cleanup_schedule.complete
expect(cleanup_schedule.reload).to be_completed
end
it 'sets the completed_at' do
expect { cleanup_schedule.complete }.to change(cleanup_schedule, :completed_at)
end
context 'when previous status is not running' do
let(:cleanup_schedule) { create(:merge_request_cleanup_schedule, :completed) }
it 'does not change status' do
expect { cleanup_schedule.complete }.not_to change(cleanup_schedule, :status)
end
end
end
describe '#mark_as_failed' do
let(:cleanup_schedule) { create(:merge_request_cleanup_schedule, :running) }
it 'sets the status to failed' do
cleanup_schedule.mark_as_failed
expect(cleanup_schedule.reload).to be_failed
end
it 'increments failed_count' do
expect { cleanup_schedule.mark_as_failed }.to change(cleanup_schedule, :failed_count).by(1)
end
context 'when previous status is not running' do
let(:cleanup_schedule) { create(:merge_request_cleanup_schedule, :failed) }
it 'does not change status' do
expect { cleanup_schedule.mark_as_failed }.not_to change(cleanup_schedule, :status)
end
end
end
end
describe '.scheduled_and_unstarted' do
let!(:cleanup_schedule_1) { create(:merge_request_cleanup_schedule, scheduled_at: 2.days.ago) }
let!(:cleanup_schedule_2) { create(:merge_request_cleanup_schedule, scheduled_at: 1.day.ago) }
let!(:cleanup_schedule_3) { create(:merge_request_cleanup_schedule, :completed, scheduled_at: 1.day.ago) }
let!(:cleanup_schedule_4) { create(:merge_request_cleanup_schedule, scheduled_at: 4.days.ago) }
let!(:cleanup_schedule_5) { create(:merge_request_cleanup_schedule, scheduled_at: 3.days.ago) }
let!(:cleanup_schedule_6) { create(:merge_request_cleanup_schedule, scheduled_at: 1.day.from_now) }
let!(:cleanup_schedule_7) { create(:merge_request_cleanup_schedule, :failed, scheduled_at: 5.days.ago) }
it 'returns records that are scheduled before or on current time and unstarted (ordered by scheduled first)' do
expect(described_class.scheduled_and_unstarted).to eq([
cleanup_schedule_2,
cleanup_schedule_1,
cleanup_schedule_5,
cleanup_schedule_4
])
end
end
describe '.start_next' do
let!(:cleanup_schedule_1) { create(:merge_request_cleanup_schedule, :completed, scheduled_at: 1.day.ago) }
let!(:cleanup_schedule_2) { create(:merge_request_cleanup_schedule, scheduled_at: 2.days.ago) }
let!(:cleanup_schedule_3) { create(:merge_request_cleanup_schedule, :running, scheduled_at: 1.day.ago) }
let!(:cleanup_schedule_4) { create(:merge_request_cleanup_schedule, scheduled_at: 3.days.ago) }
let!(:cleanup_schedule_5) { create(:merge_request_cleanup_schedule, :failed, scheduled_at: 3.days.ago) }
it 'finds the next scheduled and unstarted then marked it as running' do
expect(described_class.start_next).to eq(cleanup_schedule_2)
expect(cleanup_schedule_2.reload).to be_running
end
end
end
......@@ -3,18 +3,41 @@
require 'spec_helper'
RSpec.describe MergeRequestCleanupRefsWorker do
describe '#perform' do
context 'when merge request exists' do
let(:merge_request) { create(:merge_request) }
let(:job_args) { merge_request.id }
let(:worker) { described_class.new }
include_examples 'an idempotent worker' do
it 'calls MergeRequests::CleanupRefsService#execute' do
expect_next_instance_of(MergeRequests::CleanupRefsService, merge_request) do |svc|
expect(svc).to receive(:execute).and_call_original
end.twice
describe '#perform_work' do
context 'when next cleanup schedule is found' do
let(:failed_count) { 0 }
let!(:cleanup_schedule) { create(:merge_request_cleanup_schedule, failed_count: failed_count) }
subject
it 'marks the cleanup schedule as completed on success' do
stub_cleanup_service(status: :success)
worker.perform_work
expect(cleanup_schedule.reload).to be_completed
expect(cleanup_schedule.completed_at).to be_present
end
context 'when service fails' do
before do
stub_cleanup_service(status: :error)
worker.perform_work
end
it 'marks the cleanup schedule as unstarted and track the failure' do
expect(cleanup_schedule.reload).to be_unstarted
expect(cleanup_schedule.failed_count).to eq(1)
expect(cleanup_schedule.completed_at).to be_nil
end
context "and cleanup schedule has already failed #{described_class::FAILURE_THRESHOLD} times" do
let(:failed_count) { described_class::FAILURE_THRESHOLD }
it 'marks the cleanup schedule as failed and track the failure' do
expect(cleanup_schedule.reload).to be_failed
expect(cleanup_schedule.failed_count).to eq(described_class::FAILURE_THRESHOLD + 1)
expect(cleanup_schedule.completed_at).to be_nil
end
end
end
......@@ -23,20 +46,52 @@ RSpec.describe MergeRequestCleanupRefsWorker do
stub_feature_flags(merge_request_refs_cleanup: false)
end
it 'does not clean up the merge request' do
it 'does nothing' do
expect(MergeRequests::CleanupRefsService).not_to receive(:new)
perform_multiple(1)
worker.perform_work
end
end
end
context 'when merge request does not exist' do
it 'does not call MergeRequests::CleanupRefsService' do
context 'when there is no next cleanup schedule found' do
it 'does nothing' do
expect(MergeRequests::CleanupRefsService).not_to receive(:new)
perform_multiple(1)
worker.perform_work
end
end
end
describe '#remaining_work_count' do
let_it_be(:unstarted) { create_list(:merge_request_cleanup_schedule, 2) }
let_it_be(:running) { create_list(:merge_request_cleanup_schedule, 2, :running) }
let_it_be(:completed) { create_list(:merge_request_cleanup_schedule, 2, :completed) }
it 'returns number of scheduled and unstarted cleanup schedule records' do
expect(worker.remaining_work_count).to eq(unstarted.count)
end
context 'when count exceeds max_running_jobs' do
before do
create_list(:merge_request_cleanup_schedule, worker.max_running_jobs)
end
it 'gets capped at max_running_jobs' do
expect(worker.remaining_work_count).to eq(worker.max_running_jobs)
end
end
end
describe '#max_running_jobs' do
it 'returns the value of MAX_RUNNING_JOBS' do
expect(worker.max_running_jobs).to eq(described_class::MAX_RUNNING_JOBS)
end
end
def stub_cleanup_service(result)
expect_next_instance_of(MergeRequests::CleanupRefsService, cleanup_schedule.merge_request) do |svc|
expect(svc).to receive(:execute).and_return(result)
end
end
end
......@@ -6,16 +6,9 @@ RSpec.describe ScheduleMergeRequestCleanupRefsWorker do
subject(:worker) { described_class.new }
describe '#perform' do
before do
allow(MergeRequest::CleanupSchedule)
.to receive(:scheduled_merge_request_ids)
.with(described_class::LIMIT)
.and_return([1, 2, 3, 4])
end
it 'does nothing if the database is read-only' do
allow(Gitlab::Database).to receive(:read_only?).and_return(true)
expect(MergeRequestCleanupRefsWorker).not_to receive(:bulk_perform_in)
expect(MergeRequestCleanupRefsWorker).not_to receive(:perform_with_capacity)
worker.perform
end
......@@ -26,25 +19,17 @@ RSpec.describe ScheduleMergeRequestCleanupRefsWorker do
end
it 'does not schedule any merge request clean ups' do
expect(MergeRequestCleanupRefsWorker).not_to receive(:bulk_perform_in)
expect(MergeRequestCleanupRefsWorker).not_to receive(:perform_with_capacity)
worker.perform
end
end
include_examples 'an idempotent worker' do
it 'schedules MergeRequestCleanupRefsWorker to be performed by batch' do
expect(MergeRequestCleanupRefsWorker)
.to receive(:bulk_perform_in)
.with(
described_class::DELAY,
[[1], [2], [3], [4]],
batch_size: described_class::BATCH_SIZE
)
it 'schedules MergeRequestCleanupRefsWorker to be performed with capacity' do
expect(MergeRequestCleanupRefsWorker).to receive(:perform_with_capacity).twice
expect(worker).to receive(:log_extra_metadata_on_done).with(:merge_requests_count, 4)
worker.perform
subject
end
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment