Commit ef61b0f0 authored by Quang-Minh Nguyen's avatar Quang-Minh Nguyen

Add feature flags to UpdateAllMirrorsWorker

parent 7a6aab54
---
name: project_import_schedule_worker_job_tracker
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/79097
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/351408
milestone: '14.8'
type: development
group: group::scalability
default_enabled: false
---
name: update_all_mirrors_job_tracker
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/79097
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/351420
milestone: '14.8'
type: development
group: group::scalability
default_enabled: false
......@@ -25,7 +25,7 @@ class ProjectImportScheduleWorker
tags :needs_own_queue
def perform(project_id)
job_tracker.register(jid, capacity)
job_tracker.register(jid, capacity) if job_tracking?
return if Gitlab::Database.read_only?
......@@ -36,7 +36,7 @@ class ProjectImportScheduleWorker
project.import_state.schedule
end
ensure
job_tracker.remove(jid)
job_tracker.remove(jid) if job_tracking?
end
private
......@@ -45,7 +45,7 @@ class ProjectImportScheduleWorker
Gitlab::Mirror.available_capacity
end
def job_tracker
@job_tracker ||= LimitedCapacity::JobTracker.new(self.class.name)
def job_tracking?
Feature.enabled?(:project_import_schedule_worker_job_tracker, default_enabled: :yaml)
end
end
......@@ -18,15 +18,14 @@ class UpdateAllMirrorsWorker # rubocop:disable Scalability/IdempotentWorker
scheduled = 0
with_lease do
# Clean-up completed jobs with stale status
job_tracker.clean_up
clean_project_import_jobs_tracking
scheduled = schedule_mirrors!
if scheduled > 0
# Wait for all ProjectImportScheduleWorker jobs to complete
deadline = Time.current + SCHEDULE_WAIT_TIMEOUT
sleep 1 while job_tracker.count > 0 && Time.current < deadline
sleep 1 while pending_project_import_jobs? && Time.current < deadline
end
end
......@@ -34,7 +33,7 @@ class UpdateAllMirrorsWorker # rubocop:disable Scalability/IdempotentWorker
return unless scheduled > 0
# Wait to give some jobs a chance to complete
Kernel.sleep(RESCHEDULE_WAIT)
sleep(RESCHEDULE_WAIT)
# If there's capacity left now (some jobs completed),
# reschedule this job to enqueue more work.
......@@ -163,4 +162,22 @@ class UpdateAllMirrorsWorker # rubocop:disable Scalability/IdempotentWorker
def job_tracker
@job_tracker ||= LimitedCapacity::JobTracker.new(ProjectImportScheduleWorker.name)
end
def pending_project_import_jobs?
if job_tracker_enabled?
job_tracker.count > 0
else
ProjectImportScheduleWorker.queue_size > 0
end
end
def clean_project_import_jobs_tracking
# Clean-up completed jobs with stale status
job_tracker.clean_up if job_tracker_enabled?
end
def job_tracker_enabled?
Feature.enabled?(:project_import_schedule_worker_job_tracker, default_enabled: :yaml) &&
Feature.enabled?(:update_all_mirrors_job_tracker, default_enabled: :yaml)
end
end
......@@ -39,6 +39,11 @@ RSpec.describe ProjectImportScheduleWorker do
expect(import_state).to be_scheduled
end
context 'project_import_schedule_worker_job_tracker flag is enabled' do
before do
stub_feature_flags(project_import_schedule_worker_job_tracker: true)
end
it 'tracks the status of the worker' do
subject
......@@ -47,6 +52,20 @@ RSpec.describe ProjectImportScheduleWorker do
end
end
context 'project_import_schedule_worker_job_tracker flag is disabled' do
before do
stub_feature_flags(project_import_schedule_worker_job_tracker: false)
end
it 'does not track the status of the worker' do
subject
expect(job_tracker_instance).not_to have_received(:register)
expect(job_tracker_instance).not_to have_received(:remove)
end
end
end
context 'project is not found' do
it 'raises ImportStateNotFound' do
expect { subject.perform(-1) }.to raise_error(described_class::ImportStateNotFound)
......
......@@ -60,6 +60,14 @@ RSpec.describe UpdateAllMirrorsWorker do
before do
allow(worker).to receive(:schedule_mirrors!).and_return(1)
end
context 'job tracker flags are on' do
before do
stub_feature_flags(
project_import_schedule_worker_job_tracker: true,
update_all_mirrors_job_tracker: true
)
allow(LimitedCapacity::JobTracker).to receive(:new).with('ProjectImportScheduleWorker').and_return(job_tracker_instance)
count = 3
......@@ -69,10 +77,10 @@ RSpec.describe UpdateAllMirrorsWorker do
allow(job_tracker_instance).to receive(:count) { |_| count -= 1 }
end
it 'sleeps a bit after scheduling mirrors' do
expect(Kernel).to receive(:sleep).with(described_class::RESCHEDULE_WAIT)
it 'waits until ProjectImportScheduleWorker job tracker returns 0' do
worker.perform
expect(job_tracker_instance).to have_received(:count).exactly(3).times
end
it 'cleans up finished ProjectImportScheduleWorker jobs' do
......@@ -81,10 +89,34 @@ RSpec.describe UpdateAllMirrorsWorker do
expect(job_tracker_instance).to have_received(:clean_up).once
end
it 'waits until all ProjectImportScheduleWorker jobs to complete' do
it 'sleeps a bit after scheduling mirrors' do
expect(worker).to receive(:sleep).with(described_class::RESCHEDULE_WAIT).exactly(3).times
worker.perform
end
end
expect(job_tracker_instance).to have_received(:count).exactly(3).times
context 'any of job tracker flags is off' do
before do
stub_feature_flags(
project_import_schedule_worker_job_tracker: true,
update_all_mirrors_job_tracker: false
)
count = 3
allow(ProjectImportScheduleWorker).to receive(:queue_size) { |_| count -= 1 }
end
it 'waits until ProjectImportScheduleWorker jobs to complete' do
worker.perform
expect(ProjectImportScheduleWorker).to have_received(:queue_size).exactly(3).times
end
it 'sleeps a bit after scheduling mirrors' do
expect(worker).to receive(:sleep).with(described_class::RESCHEDULE_WAIT).exactly(3).times
worker.perform
end
end
context 'if capacity is available' do
......@@ -133,7 +165,7 @@ RSpec.describe UpdateAllMirrorsWorker do
end
it 'does not wait' do
expect(Kernel).not_to receive(:sleep)
expect(worker).not_to receive(:sleep)
worker.perform
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment