Commit 760de419 authored by Gabriel Mazetto's avatar Gabriel Mazetto

Changed ProjectRegistry workers to use a scheduler / batch model

Instead of a single class iterating in batches and executing the
operation there, we iterate in bigger batches to get the ranges and
delegate to a separate worker that will operate in the ranges.
parent fe5f3896
......@@ -161,6 +161,9 @@
- cronjob:update_all_mirrors
- cronjob:pseudonymizer
- geo:geo_batch_project_registry
- geo:geo_batch_project_registry_scheduler
- geo:geo_scheduler_scheduler
- geo:geo_scheduler_primary_scheduler
- geo:geo_scheduler_secondary_scheduler
......@@ -169,7 +172,6 @@
- geo:geo_file_registry_removal
- geo:geo_hashed_storage_attachments_migration
- geo:geo_hashed_storage_migration
- geo:geo_project_registry_batch
- geo:geo_project_sync
- geo:geo_rename_repository
- geo:geo_repositories_clean_up
......
......@@ -56,13 +56,13 @@ class Admin::Geo::ProjectsController < Admin::ApplicationController
end
def recheck_all
Geo::ProjectRegistryBatchWorker.perform_async(:recheck_repositories)
Geo::Batch::ProjectRegistrySchedulerWorker.perform_async(:recheck_repositories)
redirect_back_or_admin_geo_projects(notice: s_('Geo|All projects are being scheduled for re-check'))
end
def resync_all
Geo::ProjectRegistryBatchWorker.perform_async(:resync_repositories)
Geo::Batch::ProjectRegistrySchedulerWorker.perform_async(:resync_repositories)
redirect_back_or_admin_geo_projects(notice: s_('Geo|All projects are being scheduled for re-sync'))
end
......
......@@ -109,6 +109,21 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
)
end
# Retrieve the range of IDs in a relation
#
# @return [Array] with minimum ID and max ID
def self.range
pluck('MIN(id)', 'MAX(id)').first
end
# Search for IDs in the range
#
# @param [Integer] start initial ID
# @param [Integer] finish final ID
def self.with_range(start, finish)
where(id: start..finish)
end
# Must be run before fetching the repository to avoid a race condition
#
# @param [String] type must be one of the values in TYPES
......
# frozen_string_literal: true
module Geo
module Batch
# Responsible for scheduling multiple jobs to mark Project Registries as requiring syncing or verification.
#
# This class includes an Exclusive Lease guard and only one can be executed at the same time
# If multiple jobs are scheduled, only one will run and the others will drop forever.
class ProjectRegistrySchedulerWorker
include ApplicationWorker
include GeoQueue
include ExclusiveLeaseGuard
include ::Gitlab::Geo::LogHelpers
BATCH_SIZE = 10000
LEASE_TIMEOUT = 2.minutes # TTL for X amount of loops to happen until it is renewed
RENEW_AFTER_LOOPS = 20 # renew lease at every 20 loops has finished
OPERATIONS = [:resync_repositories, :recheck_repositories].freeze
DELAY_INTERVAL = 10.seconds.to_i # base delay for scheduling batch execution
def perform(operation)
return fail_invalid_operation!(operation) unless OPERATIONS.include?(operation.to_sym)
try_obtain_lease do
perform_in_batches_with_range(operation.to_sym)
end
end
private
def perform_in_batches_with_range(operation)
Geo::ProjectRegistry.each_batch(of: BATCH_SIZE) do |batch, index|
delay = index * DELAY_INTERVAL
::Geo::Batch::ProjectRegistryWorker.perform_in(delay, operation, batch.range)
renew_lease! if index % RENEW_AFTER_LOOPS == 0 # we renew after X amount of loops to not add much delay here
end
end
def lease_timeout
LEASE_TIMEOUT
end
def fail_invalid_operation!(operation)
raise ArgumentError, "Invalid operation: '#{operation.inspect}' informed. Must be one of the following: #{OPERATIONS.map { |valid_op| "'#{valid_op}'" }.join(', ')}"
end
end
end
end
# frozen_string_literal: true
module Geo
module Batch
# Responsible for scheduling multiple jobs to mark Project Registries as requiring syncing or verification.
#
# This class includes an Exclusive Lease guard and only one can be executed at the same time
# If multiple jobs are scheduled, only one will run and the others will drop forever.
class ProjectRegistryWorker
include ApplicationWorker
include GeoQueue
include ::Gitlab::Geo::LogHelpers
BATCH_SIZE = 250
OPERATIONS = [:resync_repositories, :recheck_repositories].freeze
def perform(operation, range)
case operation.to_sym
when :resync_repositories
resync_repositories(range)
when :recheck_repositories
recheck_repositories(range)
else
fail_invalid_operation!(operation)
end
end
private
def resync_repositories(range)
Geo::ProjectRegistry.with_range(range[0], range[1]).each_batch(of: BATCH_SIZE) do |batch|
batch.flag_repositories_for_resync!
end
end
def recheck_repositories(range)
Geo::ProjectRegistry.with_range(range[0], range[1]).each_batch(of: BATCH_SIZE) do |batch|
batch.flag_repositories_for_recheck!
end
end
def fail_invalid_operation!(operation)
raise ArgumentError, "Invalid operation: '#{operation.inspect}' informed. Must be one of the following: #{OPERATIONS.map { |valid_op| "'#{valid_op}'" }.join(', ')}"
end
end
end
end
# frozen_string_literal: true
module Geo
# Responsible for scheduling multiple jobs to mark Project Registries as requiring syncing or verification.
#
# This class includes an Exclusive Lease guard and only one can be executed at the same time
# If multiple jobs are scheduled, only one will run and the others will drop forever.
class ProjectRegistryBatchWorker
include ApplicationWorker
include GeoQueue
include ExclusiveLeaseGuard
include ::Gitlab::Geo::LogHelpers
BATCH_SIZE = 1000
LEASE_TIMEOUT = 8.hours
OPERATIONS = [:resync_repositories, :recheck_repositories].freeze
def perform(operation)
try_obtain_lease do
case operation.to_sym
when :resync_repositories
resync_repositories
when :recheck_repositories
recheck_repositories
else
fail_invalid_operation!(operation)
end
end
end
private
def resync_repositories
Geo::ProjectRegistry.each_batch(of: BATCH_SIZE) do |batch|
batch.flag_repositories_for_resync!
end
end
def recheck_repositories
Geo::ProjectRegistry.each_batch(of: BATCH_SIZE) do |batch|
batch.flag_repositories_for_recheck!
end
end
def lease_timeout
LEASE_TIMEOUT
end
def fail_invalid_operation!(operation)
raise ArgumentError, "Invalid operation: '#{operation.inspect}' informed. Must be one of the following: #{OPERATIONS.map { |valid_op| "'#{valid_op}'" }.join(', ')}"
end
end
end
......@@ -158,8 +158,8 @@ describe Admin::Geo::ProjectsController, :geo do
it 'schedules a batch job' do
Sidekiq::Testing.fake! do
expect { subject }.to change(Geo::ProjectRegistryBatchWorker.jobs, :size).by(1)
expect(Geo::ProjectRegistryBatchWorker.jobs.last['args']).to include('recheck_repositories')
expect { subject }.to change(Geo::Batch::ProjectRegistrySchedulerWorker.jobs, :size).by(1)
expect(Geo::Batch::ProjectRegistrySchedulerWorker.jobs.last['args']).to include('recheck_repositories')
end
end
......@@ -184,8 +184,8 @@ describe Admin::Geo::ProjectsController, :geo do
it 'schedules a batch job' do
Sidekiq::Testing.fake! do
expect { subject }.to change(Geo::ProjectRegistryBatchWorker.jobs, :size).by(1)
expect(Geo::ProjectRegistryBatchWorker.jobs.last['args']).to include('resync_repositories')
expect { subject }.to change(Geo::Batch::ProjectRegistrySchedulerWorker.jobs, :size).by(1)
expect(Geo::Batch::ProjectRegistrySchedulerWorker.jobs.last['args']).to include('resync_repositories')
end
end
......
# frozen_string_literal: true
require 'rails_helper'
RSpec.describe Geo::Batch::ProjectRegistrySchedulerWorker do
include ExclusiveLeaseHelpers
include ::EE::GeoHelpers
set(:secondary) { create(:geo_node) }
let(:lease_key) { subject.lease_key }
let(:lease_timeout) { 2.minutes }
before do
stub_current_geo_node(secondary)
stub_exclusive_lease(renew: true)
end
describe '#perform' do
context 'when operation is :recheck_repositories' do
let!(:registry) { create(:geo_project_registry, :repository_verified) }
it 'schedules batches of repositories for recheck' do
Sidekiq::Testing.fake! do
expect { subject.perform(:recheck_repositories) }.to change(Geo::Batch::ProjectRegistryWorker.jobs, :size).by(1)
expect(Geo::Batch::ProjectRegistryWorker.jobs.last['args']).to include('recheck_repositories')
end
end
it 'does nothing if exclusive lease is already acquired' do
stub_exclusive_lease_taken(lease_key, timeout: lease_timeout)
Sidekiq::Testing.fake! do
expect { subject.perform(:recheck_repositories) }.not_to change(Geo::Batch::ProjectRegistryWorker.jobs, :size)
end
end
end
context 'when operation is :resync_repositories' do
let!(:registry) { create(:geo_project_registry, :synced) }
it 'schedules batches of repositories for resync' do
Sidekiq::Testing.fake! do
expect { subject.perform(:resync_repositories) }.to change(Geo::Batch::ProjectRegistryWorker.jobs, :size).by(1)
expect(Geo::Batch::ProjectRegistryWorker.jobs.last['args']).to include('resync_repositories')
end
end
it 'does nothing if exclusive lease is already acquired' do
stub_exclusive_lease_taken(lease_key, timeout: lease_timeout)
Sidekiq::Testing.fake! do
expect { subject.perform(:resync_repositories) }.not_to change(Geo::Batch::ProjectRegistryWorker.jobs, :size)
end
end
end
context 'when informed operation is unknown/invalid' do
it 'fails with ArgumentError' do
expect { subject.perform(:unknown_operation) }.to raise_error(ArgumentError)
end
end
end
end
......@@ -2,38 +2,28 @@
require 'rails_helper'
RSpec.describe Geo::ProjectRegistryBatchWorker do
include ExclusiveLeaseHelpers
RSpec.describe Geo::Batch::ProjectRegistryWorker do
include ::EE::GeoHelpers
set(:secondary) { create(:geo_node) }
before do
stub_current_geo_node(secondary)
stub_exclusive_lease(renew: true)
end
describe '#perform' do
let(:range) { [0, registry.id] }
context 'when operation is :recheck_repositories' do
let!(:registry) { create(:geo_project_registry, :repository_verified) }
it 'flags repositories for recheck' do
Sidekiq::Testing.inline! do
subject.perform(:recheck_repositories)
subject.perform(:recheck_repositories, range)
end
expect(registry.reload.repository_verification_pending?).to be_truthy
end
it 'does nothing if exclusive lease is already acquired' do
stub_exclusive_lease_taken('geo/project_registry_batch_worker', timeout: 20)
Sidekiq::Testing.inline! do
subject.perform(:recheck_repositories)
end
expect(registry).to have_attributes(registry.reload.attributes)
end
end
context 'when operation is :resync_repositories' do
......@@ -41,26 +31,18 @@ RSpec.describe Geo::ProjectRegistryBatchWorker do
it 'flags repositories for resync' do
Sidekiq::Testing.inline! do
subject.perform(:resync_repositories)
subject.perform(:resync_repositories, range)
end
expect(registry.reload.resync_repository?).to be_truthy
end
it 'does nothing if exclusive lease is already acquired' do
stub_exclusive_lease_taken('geo/project_registry_batch_worker', timeout: 20)
Sidekiq::Testing.inline! do
subject.perform(:recheck_repositories)
end
expect(registry).to have_attributes(registry.reload.attributes)
end
end
context 'when informed operation is unknown/invalid' do
let(:range) { [1, 10] }
it 'fails with ArgumentError' do
expect { subject.perform(:unknown_operation) }.to raise_error(ArgumentError)
expect { subject.perform(:unknown_operation, range) }.to raise_error(ArgumentError)
end
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment