Commit b1194aa8 authored by Quang-Minh Nguyen's avatar Quang-Minh Nguyen Committed by Vitali Tatarintev

Implement Gitlab mirror scheduling tracker

This tracker is used to track the number of projects are being
scheduled. This number is used in UpdateAllMirrorsWorker to control its
rescheduling frequency. This number is also a replacement for
UpdateAllMirrorsWorker#queue_size, in an initiative to remove the
dependency to Sidekiq queues.

We made two attempts to resolve this issue, but not successfully:
- https://gitlab.com/gitlab-org/gitlab/-/merge_requests/80711
- https://gitlab.com/gitlab-org/gitlab/-/merge_requests/79097

Changelog: other
parent 778297d8
---
name: update_all_mirrors_job_tracker
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/79097
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/351420
milestone: '14.8'
name: mirror_scheduling_tracking
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/81249
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/353440
milestone: '14.9'
type: development
group: group::scalability
default_enabled: false
---
name: project_import_schedule_worker_job_tracker
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/79097
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/351408
milestone: '14.8'
type: development
group: group::scalability
default_enabled: false
......@@ -4,10 +4,6 @@ class ProjectImportScheduleWorker
ImportStateNotFound = Class.new(StandardError)
include ApplicationWorker
# At the moment, this inclusion is to enable job tracking ability. In the
# future, the capacity management should be moved to this worker instead of
# UpdateAllMirrorsWorker
include LimitedCapacity::Worker
data_consistency :always
prepend WaitableWorker
......@@ -25,7 +21,7 @@ class ProjectImportScheduleWorker
tags :needs_own_queue
def perform(project_id)
job_tracker.register(jid, capacity) if job_tracking?
::Gitlab::Mirror.untrack_scheduling(project_id) if scheduling_tracking_enabled?
return if Gitlab::Database.read_only?
......@@ -35,17 +31,11 @@ class ProjectImportScheduleWorker
with_context(project: project) do
project.import_state.schedule
end
ensure
job_tracker.remove(jid) if job_tracking?
end
private
def capacity
Gitlab::Mirror.available_capacity
end
def job_tracking?
Feature.enabled?(:project_import_schedule_worker_job_tracker, default_enabled: :yaml)
def scheduling_tracking_enabled?
Feature.enabled?(:mirror_scheduling_tracking, default_enabled: :yaml)
end
end
......@@ -9,7 +9,7 @@ class UpdateAllMirrorsWorker # rubocop:disable Scalability/IdempotentWorker
data_consistency :sticky
LEASE_TIMEOUT = 5.minutes
SCHEDULE_WAIT_TIMEOUT = 4.minutes
SCHEDULE_WAIT_TIMEOUT = 2.minutes
LEASE_KEY = 'update_all_mirrors'
RESCHEDULE_WAIT = 1.second
......@@ -18,14 +18,12 @@ class UpdateAllMirrorsWorker # rubocop:disable Scalability/IdempotentWorker
scheduled = 0
with_lease do
clean_project_import_jobs_tracking
scheduled = schedule_mirrors!
if scheduled > 0
# Wait for all ProjectImportScheduleWorker jobs to complete
# Wait for all ProjectImportScheduleWorker jobs to be picked up
deadline = Time.current + SCHEDULE_WAIT_TIMEOUT
sleep 1 while pending_project_import_jobs? && Time.current < deadline
sleep 1 while pending_project_import_scheduling? && Time.current < deadline
end
end
......@@ -44,6 +42,16 @@ class UpdateAllMirrorsWorker # rubocop:disable Scalability/IdempotentWorker
# rubocop: disable CodeReuse/ActiveRecord
def schedule_mirrors!
# Clean up mirror scheduling counter before schedule mirrors. After this job is executed, there are some cases:
# - There are no projects to be scheduled, the job exits early, the counter is not used.
# - All projects transition to scheduled states. The counter must be equal to 0.
# - The timeout of 4 minutes is exceeded. In this case, another job will be
# rescheduled, regardless of the value of the counter.
# Therefore, the scheduling counter should reset the counter before entering
# the scheduling phase. In addition, this clean-up task prevents a project
# id from being stuck in the list forever.
::Gitlab::Mirror.reset_scheduling if scheduling_tracking_enabled?
capacity = Gitlab::Mirror.available_capacity
# Ignore mirrors that become due for scheduling once work begins, so we
......@@ -133,6 +141,11 @@ class UpdateAllMirrorsWorker # rubocop:disable Scalability/IdempotentWorker
# rubocop: enable CodeReuse/ActiveRecord
def schedule_projects_in_batch(projects)
return if projects.empty?
# projects were materialized at this stage
::Gitlab::Mirror.track_scheduling(projects.map(&:id)) if scheduling_tracking_enabled?
ProjectImportScheduleWorker.bulk_perform_async_with_contexts(
projects,
arguments_proc: -> (project) { project.id },
......@@ -154,25 +167,15 @@ class UpdateAllMirrorsWorker # rubocop:disable Scalability/IdempotentWorker
end
# rubocop: enable CodeReuse/ActiveRecord
def job_tracker
@job_tracker ||= LimitedCapacity::JobTracker.new(ProjectImportScheduleWorker.name)
end
def pending_project_import_jobs?
if job_tracker_enabled?
job_tracker.count > 0
def pending_project_import_scheduling?
if scheduling_tracking_enabled?
::Gitlab::Mirror.current_scheduling > 0
else
ProjectImportScheduleWorker.queue_size > 0
end
end
def clean_project_import_jobs_tracking
# Clean-up completed jobs with stale status
job_tracker.clean_up if job_tracker_enabled?
end
def job_tracker_enabled?
Feature.enabled?(:project_import_schedule_worker_job_tracker, default_enabled: :yaml) &&
Feature.enabled?(:update_all_mirrors_job_tracker, default_enabled: :yaml)
def scheduling_tracking_enabled?
Feature.enabled?(:mirror_scheduling_tracking, default_enabled: :yaml)
end
end
......@@ -5,6 +5,7 @@ module Gitlab
# Runs scheduler every minute
SCHEDULER_CRON = '* * * * *'
PULL_CAPACITY_KEY = 'MIRROR_PULL_CAPACITY'
SCHEDULING_TRACKING_KEY = 'MIRROR_SCHEDULING_TRACKING'
JITTER = 1.minute
# TODO: Dynamically determine mirror update interval based on total number
......@@ -77,6 +78,22 @@ module Gitlab
Gitlab::CurrentSettings.mirror_capacity_threshold
end
def track_scheduling(project_ids)
Gitlab::Redis::SharedState.with { |redis| redis.sadd(SCHEDULING_TRACKING_KEY, project_ids) }
end
def untrack_scheduling(project_id)
Gitlab::Redis::SharedState.with { |redis| redis.srem(SCHEDULING_TRACKING_KEY, project_id) }
end
def reset_scheduling
Gitlab::Redis::SharedState.with { |redis| redis.del(SCHEDULING_TRACKING_KEY) }
end
def current_scheduling
Gitlab::Redis::SharedState.with { |redis| redis.scard(SCHEDULING_TRACKING_KEY) }.to_i
end
private
def update_all_mirrors_cron_job
......
......@@ -52,7 +52,7 @@ RSpec.describe Gitlab::Mirror do
end
end
describe '#max_mirror_capacity_reached?' do
describe '#max_mirror_capacity_reached?', :clean_gitlab_redis_shared_state do
it 'returns true if available capacity is 0' do
expect(described_class).to receive(:available_capacity).and_return(0)
......@@ -64,10 +64,6 @@ RSpec.describe Gitlab::Mirror do
expect(described_class.max_mirror_capacity_reached?).to eq(false)
end
after do
Gitlab::Redis::SharedState.with { |redis| redis.del(Gitlab::Mirror::PULL_CAPACITY_KEY) }
end
end
describe '#reschedule_immediately?' do
......@@ -94,7 +90,7 @@ RSpec.describe Gitlab::Mirror do
end
end
describe '#available_capacity' do
describe '#available_capacity', :clean_gitlab_redis_shared_state do
context 'when redis key does not exist' do
it 'returns mirror_max_capacity' do
expect(described_class.available_capacity).to eq(Gitlab::CurrentSettings.mirror_max_capacity)
......@@ -114,25 +110,17 @@ RSpec.describe Gitlab::Mirror do
expect(described_class.available_capacity).to eq(Gitlab::CurrentSettings.mirror_max_capacity - current_capacity)
end
end
after do
Gitlab::Redis::SharedState.with { |redis| redis.del(Gitlab::Mirror::PULL_CAPACITY_KEY) }
end
end
describe '#increment_capacity' do
describe '#increment_capacity', :clean_gitlab_redis_shared_state do
it 'increments capacity' do
max_capacity = Gitlab::CurrentSettings.mirror_max_capacity
expect { described_class.increment_capacity(1) }.to change { described_class.available_capacity }.from(max_capacity).to(max_capacity - 1)
end
after do
Gitlab::Redis::SharedState.with { |redis| redis.del(Gitlab::Mirror::PULL_CAPACITY_KEY) }
end
end
describe '#decrement_capacity' do
describe '#decrement_capacity', :clean_gitlab_redis_shared_state do
let!(:id) { 1 }
context 'with capacity above 0' do
......@@ -150,9 +138,57 @@ RSpec.describe Gitlab::Mirror do
expect { described_class.decrement_capacity(id) }.not_to change { described_class.available_capacity }
end
end
end
describe '#track_scheduling', :clean_gitlab_redis_shared_state do
it 'increments current scheduling counter' do
expect { described_class.track_scheduling([1, 2, 3, 4]) }.to change { described_class.current_scheduling }.from(0).to(4)
end
it 'excludes existing ids from existing counter' do
described_class.track_scheduling([1, 2, 3])
expect { described_class.track_scheduling([1, 2, 3, 4]) }.to change { described_class.current_scheduling }.from(3).to(4)
end
end
after do
Gitlab::Redis::SharedState.with { |redis| redis.del(Gitlab::Mirror::PULL_CAPACITY_KEY) }
describe '#untrack_scheduling', :clean_gitlab_redis_shared_state do
context 'with scheduling counter above 0' do
before do
described_class.track_scheduling([1, 2, 3])
end
it 'decrements scheduling counter' do
expect { described_class.untrack_scheduling(1) }.to change { described_class.current_scheduling }.from(3).to(2)
end
it 'does not decrement scheduling counter for non-existant id' do
expect { described_class.untrack_scheduling(5) }.not_to change { described_class.current_scheduling }
end
end
context 'with scheduling counter equal to 0' do
it 'does not decrement scheduling counter' do
expect { described_class.untrack_scheduling(1) }.not_to change { described_class.current_scheduling }
end
end
end
describe '#reset_scheduling', :clean_gitlab_redis_shared_state do
context 'with scheduling counter above 0' do
before do
described_class.track_scheduling([1, 2, 3])
end
it 'decrements scheduling counter to 0' do
expect { described_class.reset_scheduling }.to change { described_class.current_scheduling }.from(3).to(0)
end
end
context 'with scheduling counter equal to 0' do
it 'decrements scheduling counter to 0' do
expect { described_class.reset_scheduling }.not_to change { described_class.current_scheduling }
expect(described_class.current_scheduling).to eq(0)
end
end
end
......
......@@ -11,13 +11,9 @@ RSpec.describe ProjectImportScheduleWorker do
let(:job_args) { [project.id] }
let(:job_tracker_instance) { double(LimitedCapacity::JobTracker) }
before do
allow(Gitlab::Mirror).to receive(:available_capacity).and_return(5)
allow(LimitedCapacity::JobTracker).to receive(:new).with('ProjectImportScheduleWorker').and_return(job_tracker_instance)
allow(job_tracker_instance).to receive(:register)
allow(job_tracker_instance).to receive(:remove)
allow(Gitlab::Mirror).to receive(:untrack_scheduling).and_call_original
allow(Project).to receive(:find_by_id).with(project.id).and_return(project)
allow(project).to receive(:add_import_job)
......@@ -39,29 +35,27 @@ RSpec.describe ProjectImportScheduleWorker do
expect(import_state).to be_scheduled
end
context 'project_import_schedule_worker_job_tracker flag is enabled' do
context 'mirror_scheduling_tracking flag is enabled' do
before do
stub_feature_flags(project_import_schedule_worker_job_tracker: true)
stub_feature_flags(mirror_scheduling_tracking: true)
end
it 'tracks the status of the worker' do
subject
expect(job_tracker_instance).to have_received(:register).with(any_args, 5).at_least(:once)
expect(job_tracker_instance).to have_received(:remove).with(any_args).at_least(:once)
expect(Gitlab::Mirror).to have_received(:untrack_scheduling).with(project.id).at_least(:once)
end
end
context 'project_import_schedule_worker_job_tracker flag is disabled' do
context 'mirror_scheduling_tracking flag is disabled' do
before do
stub_feature_flags(project_import_schedule_worker_job_tracker: false)
stub_feature_flags(mirror_scheduling_tracking: false)
end
it 'does not track the status of the worker' do
subject
expect(job_tracker_instance).not_to have_received(:register)
expect(job_tracker_instance).not_to have_received(:remove)
expect(Gitlab::Mirror).not_to have_received(:untrack_scheduling)
end
end
end
......
......@@ -56,37 +56,21 @@ RSpec.describe UpdateAllMirrorsWorker do
end
context 'when updates were scheduled' do
let(:job_tracker_instance) { double(LimitedCapacity::JobTracker) }
before do
allow(worker).to receive(:schedule_mirrors!).and_return(1)
count = 3
allow(Gitlab::Mirror).to receive(:current_scheduling) { |_| count -= 1 }
end
context 'job tracker flags are on' do
context 'mirror_scheduling_tracking flags is on' do
before do
stub_feature_flags(
project_import_schedule_worker_job_tracker: true,
update_all_mirrors_job_tracker: true
)
allow(LimitedCapacity::JobTracker).to receive(:new).with('ProjectImportScheduleWorker').and_return(job_tracker_instance)
count = 3
allow(job_tracker_instance).to receive(:clean_up)
allow(job_tracker_instance).to receive(:register)
allow(job_tracker_instance).to receive(:remove)
allow(job_tracker_instance).to receive(:count) { |_| count -= 1 }
stub_feature_flags(mirror_scheduling_tracking: true)
end
it 'waits until ProjectImportScheduleWorker job tracker returns 0' do
worker.perform
expect(job_tracker_instance).to have_received(:count).exactly(3).times
end
it 'cleans up finished ProjectImportScheduleWorker jobs' do
worker.perform
expect(job_tracker_instance).to have_received(:clean_up).once
expect(Gitlab::Mirror).to have_received(:current_scheduling).exactly(3).times
end
it 'sleeps a bit after scheduling mirrors' do
......@@ -96,12 +80,9 @@ RSpec.describe UpdateAllMirrorsWorker do
end
end
context 'any of job tracker flags is off' do
context 'mirror_scheduling_tracking flags is off' do
before do
stub_feature_flags(
project_import_schedule_worker_job_tracker: true,
update_all_mirrors_job_tracker: false
)
stub_feature_flags(mirror_scheduling_tracking: false)
count = 3
allow(ProjectImportScheduleWorker).to receive(:queue_size) { |_| count -= 1 }
end
......@@ -156,14 +137,6 @@ RSpec.describe UpdateAllMirrorsWorker do
worker.perform
end
it 'does not poll for ProjectImportScheduleWorker jobs to complete' do
expect_next_instance_of(LimitedCapacity::JobTracker) do |instance|
expect(instance).not_to receive(:count)
end
worker.perform
end
it 'does not wait' do
expect(worker).not_to receive(:sleep)
......@@ -172,7 +145,18 @@ RSpec.describe UpdateAllMirrorsWorker do
end
end
describe '#schedule_mirrors!' do
describe '#schedule_mirrors!', :clean_gitlab_redis_shared_state do
before do
# This tests the ability of this worker to clean the state before
# scheduling mirrors
Gitlab::Redis::SharedState.with do |redis|
redis.sadd(Gitlab::Mirror::SCHEDULING_TRACKING_KEY, [1, 2, 3])
end
allow(Gitlab::Mirror).to receive(:track_scheduling).and_call_original
allow(Gitlab::Mirror).to receive(:untrack_scheduling).and_call_original
end
def schedule_mirrors!(capacity:)
allow(Gitlab::Mirror).to receive_messages(available_capacity: capacity)
......@@ -195,6 +179,28 @@ RSpec.describe UpdateAllMirrorsWorker do
projects.each { |project| expect_import_status(project, 'none') }
end
def expect_mirror_scheduling_tracked(*project_batches)
# Expect that Gitlab::Mirror tracks the project IDs
project_batches.each do |project_batch|
expect(Gitlab::Mirror).to have_received(:track_scheduling).ordered.with(
match(project_batch.map(&:id))
)
end
# rubocop:disable Style/CombinableLoops
# And then each project is untracked individually when the status switched
# to scheduled. We need to loop these batches twice to ensure the
# ordering of the `track_scheduling` invocations don't mingle with the
# `untrack_scheduling` invocation.
project_batches.each do |project_batch|
project_batch.each do |project|
expect(Gitlab::Mirror).to have_received(:untrack_scheduling).with(project.id).at_least(:once)
end
end
# rubocop:enable Style/CombinableLoops
expect(::Gitlab::Mirror.current_scheduling).to eq(0)
end
context 'when the instance is unlicensed' do
it 'does not schedule when project does not have repository mirrors available' do
project = create(:project, :mirror)
......@@ -222,6 +228,8 @@ RSpec.describe UpdateAllMirrorsWorker do
schedule_mirrors!(capacity: 3)
expect_import_scheduled(project1, project2)
expect_mirror_scheduling_tracked([project1, project2])
end
end
end
......@@ -262,8 +270,10 @@ RSpec.describe UpdateAllMirrorsWorker do
it 'schedules all available mirrors' do
schedule_mirrors!(capacity: 4)
expect_import_scheduled(licensed_project1, licensed_project2, public_project)
expect_import_not_scheduled(*unlicensed_projects)
expect_import_scheduled(licensed_project1, licensed_project2, public_project)
expect_mirror_scheduling_tracked([licensed_project1, licensed_project2, public_project])
end
end
......@@ -290,6 +300,8 @@ RSpec.describe UpdateAllMirrorsWorker do
expect_import_scheduled(licensed_project1, licensed_project2, public_project)
expect_import_not_scheduled(*unlicensed_projects)
expect_mirror_scheduling_tracked([licensed_project1, licensed_project2, public_project])
end
it 'requests as many batches as necessary' do
......@@ -308,6 +320,8 @@ RSpec.describe UpdateAllMirrorsWorker do
expect_import_scheduled(licensed_project2, public_project)
expect_import_not_scheduled(licensed_project1)
expect_import_not_scheduled(*unlicensed_projects)
expect_mirror_scheduling_tracked([licensed_project2, public_project])
end
it "does not schedule a mirror of an pending_delete project" do
......@@ -318,6 +332,8 @@ RSpec.describe UpdateAllMirrorsWorker do
expect_import_scheduled(licensed_project2, public_project)
expect_import_not_scheduled(licensed_project1)
expect_import_not_scheduled(*unlicensed_projects)
expect_mirror_scheduling_tracked([licensed_project2, public_project])
end
end
......@@ -327,6 +343,8 @@ RSpec.describe UpdateAllMirrorsWorker do
expect_import_scheduled(licensed_project1, licensed_project2, public_project)
expect_import_not_scheduled(*unlicensed_projects)
expect_mirror_scheduling_tracked([licensed_project1, licensed_project2], [public_project])
end
it 'requests as many batches as necessary' do
......@@ -344,6 +362,8 @@ RSpec.describe UpdateAllMirrorsWorker do
expect_import_scheduled(licensed_project1, licensed_project2)
expect_import_not_scheduled(*unlicensed_projects, public_project)
expect_mirror_scheduling_tracked([licensed_project1], [licensed_project2])
end
it 'requests as many batches as necessary' do
......@@ -361,6 +381,8 @@ RSpec.describe UpdateAllMirrorsWorker do
expect_import_scheduled(licensed_project1)
expect_import_not_scheduled(*unlicensed_projects, licensed_project2, public_project)
expect_mirror_scheduling_tracked([licensed_project1])
end
it 'requests as many batches as necessary' do
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment