Remove the Geo::Secondary::RepositoryBackfillWorker

It was only an experiment and never have
been enabled in production.
parent bcc3ed8a
......@@ -475,14 +475,6 @@
:weight: 1
:idempotent:
:tags: []
- :name: geo:geo_secondary_repository_backfill
:feature_category: :geo_replication
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent:
:tags: []
- :name: jira_connect:jira_connect_sync_branch
:feature_category: :integrations
:has_external_dependencies:
......
# frozen_string_literal: true
module Geo
module Secondary
module BackfillWorker
extend ActiveSupport::Concern
LEASE_TIMEOUT = 60.minutes
RUN_TIME = 60.minutes.to_i
included do
prepend ShadowMethods
include ApplicationWorker
include ExclusiveLeaseGuard
include GeoQueue
include ::Gitlab::Geo::LogHelpers
include ::Gitlab::Utils::StrongMemoize
sidekiq_options retry: false
loggable_arguments 0
attr_reader :shard_name, :start_time, :loops
end
module ShadowMethods
def lease_key
@lease_key ||= "#{self.class.name.underscore}:shard:#{shard_name}"
end
def lease_timeout
LEASE_TIMEOUT
end
end
def initialize
@scheduled_jobs = []
@loops = 0
end
# rubocop:disable Gitlab/ModuleWithInstanceVariables
def perform(shard_name)
@shard_name = shard_name
@start_time = Time.now.utc
return unless healthy_node?
try_obtain_lease do
schedule_jobs
end
end
# rubocop:enable Gitlab/ModuleWithInstanceVariables
private
def base_log_data(message)
super(message).merge(worker_metadata)
end
def healthy_node?
unless Gitlab::Geo.geo_database_configured?
log_info('Geo database not configured')
return false
end
unless Gitlab::Geo.secondary?
log_info('Current node not a secondary')
return false
end
unless Gitlab::ShardHealthCache.healthy_shard?(shard_name)
log_info("Shard (#{shard_name}) is not healthy")
return false
end
true
end
# rubocop:disable Gitlab/ModuleWithInstanceVariables
def node_enabled?
# Only check every minute to avoid polling the DB excessively
unless @last_enabled_check.present? && @last_enabled_check > 1.minute.ago
@last_enabled_check = Time.now.utc
clear_memoization(:current_node_enabled)
end
strong_memoize(:current_node_enabled) do
Gitlab::Geo.current_node_enabled?
end
end
# rubocop:enable Gitlab/ModuleWithInstanceVariables
def run_time
RUN_TIME
end
def over_capacity?
false
end
def over_time?
(Time.now.utc - start_time) >= run_time
end
def worker_metadata
{ shard: shard_name }
end
end
end
end
......@@ -3,12 +3,7 @@
module Geo
class RepositorySyncWorker < Geo::Scheduler::Secondary::PerShardSchedulerWorker # rubocop:disable Scalability/IdempotentWorker
def schedule_job(shard_name)
if ::Feature.enabled?(:geo_streaming_results_repository_sync)
Geo::Secondary::RepositoryBackfillWorker.perform_async(shard_name)
else
Geo::RepositoryShardSyncWorker.perform_async(shard_name)
end
Geo::RepositoryShardSyncWorker.perform_async(shard_name)
Geo::DesignRepositoryShardSyncWorker.perform_async(shard_name)
end
end
......
# frozen_string_literal: true
module Geo
module Secondary
class RepositoryBackfillWorker # rubocop:disable Scalability/IdempotentWorker
include Geo::Secondary::BackfillWorker
private
attr_reader :scheduled_jobs
def connection
strong_memoize(:connection) { Geo::TrackingBase.connection.raw_connection }
end
def max_capacity
# If we don't have a count, that means that for some reason
# RepositorySyncWorker stopped running/updating the cache. We might
# be trying to shut down Geo while this job may still be running.
healthy_count = healthy_shard_count
return 0 unless healthy_count > 0
capacity_per_shard = Gitlab::Geo.current_node.repos_max_capacity / healthy_count
[1, capacity_per_shard.to_i].max
end
def healthy_shard_count
Gitlab::ShardHealthCache.healthy_shard_count.to_i
end
def over_capacity?
scheduled_jobs.size >= max_capacity
end
def schedule_jobs
log_info('Repository backfilling started')
reason = :unknown
begin
connection.send_query("#{projects_ids_unsynced.to_sql};#{project_ids_updated_recently.to_sql}")
connection.set_single_row_mode
reason = loop do
break :node_disabled unless node_enabled?
break :over_time if over_time?
break :lease_lost unless renew_lease!
update_jobs_in_progress
if over_capacity?
sleep(1)
else
# This will stream the results one by one
# until there are no more results to fetch.
result = connection.get_result
break :complete if result.nil?
result.check
result.each do |row|
schedule_job(row['id'])
end
end
end
rescue => error
reason = :error
log_error('Repository backfilling error', error)
raise error
ensure
log_info('Repository backfilling finished', total_loops: loops, duration: Time.now.utc - start_time, reason: reason)
end
end
def schedule_job(project_id)
job_id = Geo::ProjectSyncWorker.perform_async(project_id, sync_repository: true, sync_wiki: true)
if job_id
@scheduled_jobs << { job_id: job_id }
log_info("Repository scheduled for backfilling", project_id: project_id, job_id: job_id)
else
log_info("Repository could not be scheduled for backfilling", project_id: project_id)
end
end
def scheduled_job_ids
scheduled_jobs.map { |data| data[:job_id] }
end
def update_jobs_in_progress
job_ids = scheduled_job_ids
return if job_ids.empty?
# SidekiqStatus returns an array of booleans: true if the job is still running, false otherwise.
# For each entry, first use `zip` to make { job_id: 123 } -> [ { job_id: 123 }, bool ]
# Next, filter out the jobs that have completed.
@scheduled_jobs = Gitlab::SidekiqStatus.job_status(scheduled_job_ids).then do |status|
@scheduled_jobs.zip(status).map { |(job, running)| job if running }.compact
end
end
# rubocop: disable CodeReuse/ActiveRecord
def projects_ids_unsynced
Geo::ProjectUnsyncedFinder
.new(current_node: Gitlab::Geo.current_node, shard_name: shard_name)
.execute
.reorder(last_repository_updated_at: :desc)
.select(:id)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def project_ids_updated_recently
Geo::ProjectUpdatedRecentlyFinder
.new(current_node: Gitlab::Geo.current_node, shard_name: shard_name)
.execute
.order('project_registry.last_repository_synced_at ASC NULLS FIRST, projects.last_repository_updated_at ASC')
.select(:id)
end
# rubocop: enable CodeReuse/ActiveRecord
end
end
end
......@@ -12,6 +12,7 @@ RSpec.describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do
let!(:unsynced_project) { create(:project) }
let(:healthy_shard_name) { project_in_synced_group.repository.storage }
let(:design_worker) { Geo::DesignRepositoryShardSyncWorker }
let(:repository_worker) { Geo::RepositoryShardSyncWorker }
before do
stub_current_geo_node(secondary)
......@@ -21,91 +22,73 @@ RSpec.describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do
Sidekiq::Testing.inline! { example.run }
end
shared_examples '#perform' do |worker|
context 'additional shards' do
it 'skips backfill for repositories on other shards' do
create(:project, :broken_storage, group: synced_group)
unhealthy_dirty = create(:project, :broken_storage, group: synced_group)
create(:geo_project_registry, :synced, :repository_dirty, project: unhealthy_dirty)
context 'additional shards' do
it 'skips backfill for repositories on other shards' do
create(:project, :broken_storage, group: synced_group)
unhealthy_dirty = create(:project, :broken_storage, group: synced_group)
create(:geo_project_registry, :synced, :repository_dirty, project: unhealthy_dirty)
allow(Gitlab::GitalyClient).to receive(:call) do
raise GRPC::Unavailable.new('No Gitaly available')
end
expect(worker).not_to receive(:perform_async).with('broken')
expect(design_worker).not_to receive(:perform_async).with('broken')
subject.perform
allow(Gitlab::GitalyClient).to receive(:call) do
raise GRPC::Unavailable.new('No Gitaly available')
end
it 'skips backfill for projects on shards excluded by selective sync' do
secondary.update!(selective_sync_type: 'shards', selective_sync_shards: [healthy_shard_name])
expect(repository_worker).not_to receive(:perform_async).with('broken')
expect(design_worker).not_to receive(:perform_async).with('broken')
# Report both shards as healthy
expect(Gitlab::HealthChecks::GitalyCheck).to receive(:readiness)
.and_return([result(true, healthy_shard_name), result(true, 'broken')])
subject.perform
end
expect(worker).to receive(:perform_async).with('default')
expect(design_worker).to receive(:perform_async).with('default')
expect(worker).not_to receive(:perform_async).with('broken')
expect(design_worker).not_to receive(:perform_async).with('broken')
it 'skips backfill for projects on shards excluded by selective sync' do
secondary.update!(selective_sync_type: 'shards', selective_sync_shards: [healthy_shard_name])
subject.perform
end
# Report both shards as healthy
expect(Gitlab::HealthChecks::GitalyCheck).to receive(:readiness)
.and_return([result(true, healthy_shard_name), result(true, 'broken')])
it 'skips backfill for projects on missing shards' do
missing_not_synced = create(:project, group: synced_group)
missing_not_synced.update_column(:repository_storage, 'unknown')
missing_dirty = create(:project, group: synced_group)
missing_dirty.update_column(:repository_storage, 'unknown')
expect(repository_worker).to receive(:perform_async).with('default')
expect(design_worker).to receive(:perform_async).with('default')
expect(repository_worker).not_to receive(:perform_async).with('broken')
expect(design_worker).not_to receive(:perform_async).with('broken')
create(:geo_project_registry, :synced, :repository_dirty, project: missing_dirty)
# hide the 'broken' storage for this spec
stub_storage_settings({})
subject.perform
end
expect(worker).to receive(:perform_async).with(project_in_synced_group.repository.storage)
expect(design_worker).to receive(:perform_async).with(project_in_synced_group.repository.storage)
expect(worker).not_to receive(:perform_async).with('unknown')
expect(design_worker).not_to receive(:perform_async).with('unknown')
it 'skips backfill for projects on missing shards' do
missing_not_synced = create(:project, group: synced_group)
missing_not_synced.update_column(:repository_storage, 'unknown')
missing_dirty = create(:project, group: synced_group)
missing_dirty.update_column(:repository_storage, 'unknown')
subject.perform
end
create(:geo_project_registry, :synced, :repository_dirty, project: missing_dirty)
it 'skips backfill for projects with downed Gitaly server' do
create(:project, :broken_storage, group: synced_group)
unhealthy_dirty = create(:project, :broken_storage, group: synced_group)
# hide the 'broken' storage for this spec
stub_storage_settings({})
create(:geo_project_registry, :synced, :repository_dirty, project: unhealthy_dirty)
expect(repository_worker).to receive(:perform_async).with(project_in_synced_group.repository.storage)
expect(design_worker).to receive(:perform_async).with(project_in_synced_group.repository.storage)
expect(repository_worker).not_to receive(:perform_async).with('unknown')
expect(design_worker).not_to receive(:perform_async).with('unknown')
# Report only one healthy shard
expect(Gitlab::HealthChecks::GitalyCheck).to receive(:readiness)
.and_return([result(true, healthy_shard_name), result(false, 'broken')])
subject.perform
end
expect(worker).to receive(:perform_async).with(healthy_shard_name)
expect(design_worker).to receive(:perform_async).with(healthy_shard_name)
expect(worker).not_to receive(:perform_async).with('broken')
expect(design_worker).not_to receive(:perform_async).with('broken')
it 'skips backfill for projects with downed Gitaly server' do
create(:project, :broken_storage, group: synced_group)
unhealthy_dirty = create(:project, :broken_storage, group: synced_group)
subject.perform
end
end
end
create(:geo_project_registry, :synced, :repository_dirty, project: unhealthy_dirty)
context 'when geo_streaming_results_repository_sync flag is enabled', :geo_fdw do
before do
stub_feature_flags(geo_streaming_results_repository_sync: true)
end
# Report only one healthy shard
expect(Gitlab::HealthChecks::GitalyCheck).to receive(:readiness)
.and_return([result(true, healthy_shard_name), result(false, 'broken')])
include_examples '#perform', Geo::Secondary::RepositoryBackfillWorker
end
expect(repository_worker).to receive(:perform_async).with(healthy_shard_name)
expect(design_worker).to receive(:perform_async).with(healthy_shard_name)
expect(repository_worker).not_to receive(:perform_async).with('broken')
expect(design_worker).not_to receive(:perform_async).with('broken')
context 'when geo_streaming_results_repository_sync flag is disabled' do
before do
stub_feature_flags(geo_streaming_results_repository_sync: false)
subject.perform
end
include_examples '#perform', Geo::RepositoryShardSyncWorker
end
def result(success, shard)
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Geo::Secondary::RepositoryBackfillWorker, :geo, :geo_fdw, :clean_gitlab_redis_cache do
include EE::GeoHelpers
let(:primary) { create(:geo_node, :primary) }
let(:secondary) { create(:geo_node, repos_max_capacity: 5) }
let(:shard_name) { Gitlab.config.repositories.storages.each_key.first }
before do
stub_current_geo_node(secondary)
stub_healthy_shards(shard_name)
end
it 'disables Sidekiq retries' do
expect(subject.sidekiq_options_hash).to eq(
'retry' => false,
'queue' => 'geo:geo_secondary_repository_backfill',
'queue_namespace' => :geo
)
end
describe '#perform' do
it 'does not schedule jobs when Geo database is not configured' do
create(:project)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
with_no_geo_database_configured do
subject.perform(shard_name)
end
end
it 'does not schedule jobs when not running on a Geo secondary node' do
stub_current_geo_node(primary)
create(:project)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
subject.perform(shard_name)
end
it 'does not schedule jobs when shard is not healthy' do
stub_healthy_shards([])
create(:project)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
subject.perform(shard_name)
end
it 'does not schedule jobs when the Geo secondary node is disabled' do
stub_node_disabled(secondary)
create(:project)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
subject.perform(shard_name)
end
it 'does not schedule jobs for projects on other shards' do
project = create(:project)
project.update_column(:repository_storage, 'other')
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
subject.perform(shard_name)
end
it 'schedules a job for each unsynced project' do
create_list(:project, 2)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).twice.and_return(true)
subject.perform(shard_name)
end
it 'schedules a job for each project where last attempt to sync failed' do
create(:geo_project_registry, :sync_failed)
create(:geo_project_registry, :synced)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).once.and_return(true)
subject.perform(shard_name)
end
it 'schedules a job for each synced project updated recently' do
create(:geo_project_registry, :synced, :repository_dirty)
create(:geo_project_registry, :synced)
create(:geo_project_registry, :synced, :wiki_dirty)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).twice.and_return(true)
subject.perform(shard_name)
end
it 'respects Geo secondary node max capacity per shard' do
stub_healthy_shards([shard_name, 'shard2', 'shard3', 'shard4', 'shard5'])
project_1 = create(:project)
project_2 = create(:project)
allow(Geo::ProjectSyncWorker).to receive(:perform_async).with(project_1.id, anything).and_return('jid-1')
allow(Geo::ProjectSyncWorker).to receive(:perform_async).with(project_2.id, anything).and_return('jid-2')
allow(Gitlab::SidekiqStatus).to receive(:job_status).with(['jid-2']).and_return([true], [false])
allow(Gitlab::SidekiqStatus).to receive(:job_status).with(['jid-1']).and_return([false])
expect(subject).to receive(:sleep).once.and_call_original
subject.perform(shard_name)
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment