Commit 5ba1af9f authored by Mayra Cabrera's avatar Mayra Cabrera

Merge branch '223250-geo-make-registry-tables-as-ssot-to-verify-projects-and-wikis' into 'master'

Make registry table as SSOT to verify Projects and Wikis

Closes #223250

See merge request gitlab-org/gitlab!35095
parents 2ff70c48 5b44921b
...@@ -60,8 +60,6 @@ module Reenqueuer ...@@ -60,8 +60,6 @@ module Reenqueuer
5.seconds 5.seconds
end end
# We intend to get rid of sleep:
# https://gitlab.com/gitlab-org/gitlab/issues/121697
module ReenqueuerSleeper module ReenqueuerSleeper
# The block will run, and then sleep until the minimum duration. Returns the # The block will run, and then sleep until the minimum duration. Returns the
# block's return value. # block's return value.
......
...@@ -37,5 +37,36 @@ module Geo ...@@ -37,5 +37,36 @@ module Geo
.limit(batch_size) .limit(batch_size)
end end
# rubocop:enable CodeReuse/ActiveRecord # rubocop:enable CodeReuse/ActiveRecord
# rubocop:disable CodeReuse/ActiveRecord
def find_project_ids_pending_verification(batch_size:, except_ids: [])
Geo::ProjectRegistry
.from_union([
repositories_checksummed_pending_verification,
wikis_checksummed_pending_verification
])
.model_id_not_in(except_ids)
.limit(batch_size)
.pluck_model_foreign_key
end
# rubocop:enable CodeReuse/ActiveRecord
private
# rubocop:disable CodeReuse/ActiveRecord
def repositories_checksummed_pending_verification
Geo::ProjectRegistry
.repositories_checksummed_pending_verification
.select(Geo::ProjectRegistry.arel_table[:project_id])
end
# rubocop:enable CodeReuse/ActiveRecord
# rubocop:disable CodeReuse/ActiveRecord
def wikis_checksummed_pending_verification
Geo::ProjectRegistry
.wikis_checksummed_pending_verification
.select(Geo::ProjectRegistry.arel_table[:project_id])
end
# rubocop:enable CodeReuse/ActiveRecord
end end
end end
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
class Geo::ProjectRegistry < Geo::BaseRegistry class Geo::ProjectRegistry < Geo::BaseRegistry
include ::Delay include ::Delay
include ::EachBatch include ::EachBatch
include ::FromUnion
include ::ShaAttribute include ::ShaAttribute
MODEL_CLASS = ::Project MODEL_CLASS = ::Project
...@@ -176,8 +177,12 @@ class Geo::ProjectRegistry < Geo::BaseRegistry ...@@ -176,8 +177,12 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
end end
end end
def self.registries_pending_verification def self.repositories_checksummed_pending_verification
repositories_pending_verification.or(wikis_pending_verification) where(repositories_pending_verification.and(arel_table[:primary_repository_checksummed].eq(true)))
end
def self.wikis_checksummed_pending_verification
where(wikis_pending_verification.and(arel_table[:primary_wiki_checksummed].eq(true)))
end end
def self.repositories_pending_verification def self.repositories_pending_verification
...@@ -258,8 +263,9 @@ class Geo::ProjectRegistry < Geo::BaseRegistry ...@@ -258,8 +263,9 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
# #
# @param [String] type must be one of the values in TYPES # @param [String] type must be one of the values in TYPES
# @see REGISTRY_TYPES # @see REGISTRY_TYPES
def finish_sync!(type, missing_on_primary = false) def finish_sync!(type, missing_on_primary = false, primary_checksummed = false)
ensure_valid_type!(type) ensure_valid_type!(type)
update!( update!(
# Indicate that the sync succeeded (but separately mark as synced atomically) # Indicate that the sync succeeded (but separately mark as synced atomically)
"last_#{type}_successful_sync_at" => Time.current, "last_#{type}_successful_sync_at" => Time.current,
...@@ -270,6 +276,7 @@ class Geo::ProjectRegistry < Geo::BaseRegistry ...@@ -270,6 +276,7 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
"#{type}_missing_on_primary" => missing_on_primary, "#{type}_missing_on_primary" => missing_on_primary,
# Indicate that repository verification needs to be done again # Indicate that repository verification needs to be done again
"primary_#{type}_checksummed" => primary_checksummed,
"#{type}_verification_checksum_sha" => nil, "#{type}_verification_checksum_sha" => nil,
"#{type}_checksum_mismatch" => false, "#{type}_checksum_mismatch" => false,
"last_#{type}_verification_failure" => nil) "last_#{type}_verification_failure" => nil)
...@@ -326,9 +333,11 @@ class Geo::ProjectRegistry < Geo::BaseRegistry ...@@ -326,9 +333,11 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
end end
# Resets repository/wiki verification state. Is called when a Geo # Resets repository/wiki verification state. Is called when a Geo
# secondary node process a Geo::ResetChecksymEvent. # secondary node process a Geo::ResetChecksumEvent.
def reset_checksum! def reset_checksum!
update!( update!(
primary_repository_checksummed: true,
primary_wiki_checksummed: true,
repository_verification_checksum_sha: nil, repository_verification_checksum_sha: nil,
wiki_verification_checksum_sha: nil, wiki_verification_checksum_sha: nil,
repository_checksum_mismatch: false, repository_checksum_mismatch: false,
......
...@@ -135,7 +135,7 @@ module Geo ...@@ -135,7 +135,7 @@ module Geo
def mark_sync_as_successful(missing_on_primary: false) def mark_sync_as_successful(missing_on_primary: false)
log_info("Marking #{type} sync as successful") log_info("Marking #{type} sync as successful")
persisted = registry.finish_sync!(type, missing_on_primary) persisted = registry.finish_sync!(type, missing_on_primary, primary_checksummed?)
reschedule_sync unless persisted reschedule_sync unless persisted
...@@ -144,6 +144,14 @@ module Geo ...@@ -144,6 +144,14 @@ module Geo
download_time_s: download_time_in_seconds) download_time_s: download_time_in_seconds)
end end
def primary_checksummed?
primary_checksum.present?
end
def primary_checksum
project.repository_state&.public_send("#{type}_verification_checksum") # rubocop:disable GitlabSecurity/PublicSend
end
def reschedule_sync def reschedule_sync
log_info("Reschedule #{type} sync because a RepositoryUpdateEvent was processed during the sync") log_info("Reschedule #{type} sync because a RepositoryUpdateEvent was processed during the sync")
......
...@@ -23,7 +23,7 @@ module Geo ...@@ -23,7 +23,7 @@ module Geo
def should_verify_checksum? def should_verify_checksum?
return false if resync? return false if resync?
return false unless primary_checksum.present? return false unless primary_checksummed?
mismatch?(secondary_checksum) mismatch?(secondary_checksum)
end end
...@@ -32,6 +32,10 @@ module Geo ...@@ -32,6 +32,10 @@ module Geo
registry.public_send("resync_#{type}") # rubocop:disable GitlabSecurity/PublicSend registry.public_send("resync_#{type}") # rubocop:disable GitlabSecurity/PublicSend
end end
def primary_checksummed?
primary_checksum.present?
end
def primary_checksum def primary_checksum
project.repository_state.public_send("#{type}_verification_checksum") # rubocop:disable GitlabSecurity/PublicSend project.repository_state.public_send("#{type}_verification_checksum") # rubocop:disable GitlabSecurity/PublicSend
end end
...@@ -71,6 +75,7 @@ module Geo ...@@ -71,6 +75,7 @@ module Geo
end end
registry.update!( registry.update!(
"primary_#{type}_checksummed" => primary_checksummed?,
"#{type}_verification_checksum_sha" => checksum, "#{type}_verification_checksum_sha" => checksum,
"#{type}_verification_checksum_mismatched" => mismatch, "#{type}_verification_checksum_mismatched" => mismatch,
"#{type}_checksum_mismatch" => mismatch.present?, "#{type}_checksum_mismatch" => mismatch.present?,
......
...@@ -8,6 +8,7 @@ module Geo ...@@ -8,6 +8,7 @@ module Geo
# This worker does not perform work scoped to a context # This worker does not perform work scoped to a context
include CronjobQueue include CronjobQueue
# rubocop:enable Scalability/CronWorkerContext # rubocop:enable Scalability/CronWorkerContext
attr_accessor :shard_name attr_accessor :shard_name
loggable_arguments 0 loggable_arguments 0
...@@ -38,17 +39,50 @@ module Geo ...@@ -38,17 +39,50 @@ module Geo
current_node.verification_max_capacity current_node.verification_max_capacity
end end
# rubocop:disable CodeReuse/ActiveRecord
def load_pending_resources def load_pending_resources
return [] unless valid_shard?
if Geo::ProjectRegistry.registry_consistency_worker_enabled?
project_ids =
registry_finder
.find_project_ids_pending_verification(batch_size: db_retrieve_batch_size, except_ids: scheduled_project_ids)
Project
.id_in(project_ids)
.within_shards(shard_name)
.pluck_primary_key
else
Geo::ProjectRegistryPendingVerificationFinder Geo::ProjectRegistryPendingVerificationFinder
.new(current_node: current_node, shard_name: shard_name, batch_size: db_retrieve_batch_size) .new(current_node: current_node, shard_name: shard_name, batch_size: db_retrieve_batch_size)
.execute .execute
.pluck_primary_key .merge(Geo::ProjectRegistry.model_id_not_in(scheduled_project_ids))
.pluck(Geo::Fdw::Project.arel_table[:id])
end
end end
# rubocop:enable CodeReuse/ActiveRecord
def schedule_job(registry_id) def scheduled_project_ids
scheduled_jobs.map { |data| data[:project_id] }
end
# rubocop:disable CodeReuse/ActiveRecord
def schedule_job(project_id)
registry_id = Geo::ProjectRegistry.where(project_id: project_id).pick(:id)
job_id = Geo::RepositoryVerification::Secondary::SingleWorker.perform_async(registry_id) job_id = Geo::RepositoryVerification::Secondary::SingleWorker.perform_async(registry_id)
{ id: registry_id, job_id: job_id } if job_id { project_id: project_id, job_id: job_id } if job_id
end
# rubocop:enable CodeReuse/ActiveRecord
def registry_finder
@registry_finder ||= Geo::ProjectRegistryFinder.new
end
def valid_shard?
return true unless current_node.selective_sync_by_shards?
current_node.selective_sync_shards.include?(shard_name)
end end
end end
end end
......
---
title: Make registry table as SSOT to verify Projects and Wikis
merge_request: 35095
author:
type: changed
# frozen_string_literal: true
class AddVerifiedOnPrimaryColumnsToProjectRegistry < ActiveRecord::Migration[6.0]
DOWNTIME = false
def change
add_column :project_registry, :primary_repository_checksummed, :boolean, default: false, null: false
add_column :project_registry, :primary_wiki_checksummed, :boolean, default: false, null: false
end
end
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
# #
# It's strongly recommended that you check this file into your version control system. # It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema.define(version: 2020_04_07_120740) do ActiveRecord::Schema.define(version: 2020_07_07_011052) do
# These are extensions that must be enabled in order to support this database # These are extensions that must be enabled in order to support this database
enable_extension "plpgsql" enable_extension "plpgsql"
...@@ -146,6 +146,8 @@ ActiveRecord::Schema.define(version: 2020_04_07_120740) do ...@@ -146,6 +146,8 @@ ActiveRecord::Schema.define(version: 2020_04_07_120740) do
t.datetime_with_timezone "last_wiki_verification_ran_at" t.datetime_with_timezone "last_wiki_verification_ran_at"
t.binary "repository_verification_checksum_mismatched" t.binary "repository_verification_checksum_mismatched"
t.binary "wiki_verification_checksum_mismatched" t.binary "wiki_verification_checksum_mismatched"
t.boolean "primary_repository_checksummed", default: false, null: false
t.boolean "primary_wiki_checksummed", default: false, null: false
t.index ["last_repository_successful_sync_at"], name: "idx_project_registry_synced_repositories_partial", where: "((resync_repository = false) AND (repository_retry_count IS NULL) AND (repository_verification_checksum_sha IS NOT NULL))" t.index ["last_repository_successful_sync_at"], name: "idx_project_registry_synced_repositories_partial", where: "((resync_repository = false) AND (repository_retry_count IS NULL) AND (repository_verification_checksum_sha IS NOT NULL))"
t.index ["last_repository_successful_sync_at"], name: "index_project_registry_on_last_repository_successful_sync_at" t.index ["last_repository_successful_sync_at"], name: "index_project_registry_on_last_repository_successful_sync_at"
t.index ["last_repository_synced_at"], name: "index_project_registry_on_last_repository_synced_at" t.index ["last_repository_synced_at"], name: "index_project_registry_on_last_repository_synced_at"
......
...@@ -9,6 +9,8 @@ FactoryBot.define do ...@@ -9,6 +9,8 @@ FactoryBot.define do
last_wiki_successful_sync_at { nil } last_wiki_successful_sync_at { nil }
resync_repository { true } resync_repository { true }
resync_wiki { true } resync_wiki { true }
primary_repository_checksummed { true }
primary_wiki_checksummed { true }
trait :dirty do trait :dirty do
resync_repository { true } resync_repository { true }
......
...@@ -44,4 +44,38 @@ RSpec.describe Geo::ProjectRegistryFinder, :geo do ...@@ -44,4 +44,38 @@ RSpec.describe Geo::ProjectRegistryFinder, :geo do
expect(registries).to match_ids(registry_project_2, registry_project_3) expect(registries).to match_ids(registry_project_2, registry_project_3)
end end
end end
describe '#find_project_ids_pending_verification' do
it 'returns project IDs where repository and/or wiki is pending verification' do
project_ids = subject.find_project_ids_pending_verification(batch_size: 10)
expect(project_ids).to match_array([project_1.id, project_4.id, project_5.id])
end
it 'excludes registries where repository and wiki is missing on primary' do
registry_project_7 = create(:geo_project_registry, :synced, repository_missing_on_primary: true)
registry_project_8 = create(:geo_project_registry, :synced, wiki_missing_on_primary: true)
create(:geo_project_registry, :synced, repository_missing_on_primary: true, wiki_missing_on_primary: true)
project_ids = subject.find_project_ids_pending_verification(batch_size: 10)
expect(project_ids).to match_array([project_1.id, project_4.id, project_5.id, registry_project_7.project_id, registry_project_8.project_id])
end
it 'excludes registries where repository and wiki has not been verified on primary' do
registry_project_7 = create(:geo_project_registry, :synced, primary_repository_checksummed: false)
registry_project_8 = create(:geo_project_registry, :synced, primary_wiki_checksummed: false)
create(:geo_project_registry, :synced, primary_repository_checksummed: false, primary_wiki_checksummed: false)
project_ids = subject.find_project_ids_pending_verification(batch_size: 10)
expect(project_ids).to match_array([project_1.id, project_4.id, project_5.id, registry_project_7.project_id, registry_project_8.project_id])
end
it 'excludes except_ids' do
project_ids = subject.find_project_ids_pending_verification(batch_size: 10, except_ids: [project_5.id])
expect(project_ids).to match_array([project_1.id, project_4.id])
end
end
end end
...@@ -80,9 +80,11 @@ RSpec.describe Gitlab::Geo::LogCursor::Events::RepositoryUpdatedEvent, :clean_gi ...@@ -80,9 +80,11 @@ RSpec.describe Gitlab::Geo::LogCursor::Events::RepositoryUpdatedEvent, :clean_gi
subject.process subject.process
reloaded_registry = registry.reload reloaded_registry = registry.reload
expect(reloaded_registry.wiki_verification_checksum_sha).to be_nil expect(reloaded_registry).to have_attributes(
expect(reloaded_registry.wiki_checksum_mismatch).to be false wiki_verification_checksum_sha: nil,
expect(reloaded_registry.last_wiki_verification_failure).to be_nil wiki_checksum_mismatch: false,
last_wiki_verification_failure: nil
)
end end
it 'sets resync_wiki_was_scheduled_at to the scheduled time' do it 'sets resync_wiki_was_scheduled_at to the scheduled time' do
......
...@@ -44,6 +44,8 @@ RSpec.describe Gitlab::Geo::LogCursor::Events::ResetChecksumEvent, :clean_gitlab ...@@ -44,6 +44,8 @@ RSpec.describe Gitlab::Geo::LogCursor::Events::ResetChecksumEvent, :clean_gitlab
subject.process subject.process
expect(registry.reload).to have_attributes( expect(registry.reload).to have_attributes(
primary_repository_checksummed: true,
primary_wiki_checksummed: true,
repository_verification_checksum_sha: nil, repository_verification_checksum_sha: nil,
wiki_verification_checksum_sha: nil wiki_verification_checksum_sha: nil
) )
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe Geo::RepositorySyncService do RSpec.describe Geo::RepositorySyncService, :geo do
include ::EE::GeoHelpers include ::EE::GeoHelpers
include ExclusiveLeaseHelpers include ExclusiveLeaseHelpers
...@@ -151,6 +151,20 @@ RSpec.describe Geo::RepositorySyncService do ...@@ -151,6 +151,20 @@ RSpec.describe Geo::RepositorySyncService do
end end
end end
it 'marks primary_repository_checksummed as true when repository has been verified on primary' do
create(:repository_state, :repository_verified, project: project)
registry = create(:geo_project_registry, project: project, primary_repository_checksummed: false)
expect { subject.execute }.to change { registry.reload.primary_repository_checksummed}.from(false).to(true)
end
it 'marks primary_repository_checksummed as false when repository has not been verified on primary' do
create(:repository_state, :repository_failed, project: project)
registry = create(:geo_project_registry, project: project, primary_repository_checksummed: true)
expect { subject.execute }.to change { registry.reload.primary_repository_checksummed}.from(true).to(false)
end
context 'tracking database' do context 'tracking database' do
context 'temporary repositories' do context 'temporary repositories' do
include_examples 'cleans temporary repositories' do include_examples 'cleans temporary repositories' do
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe Geo::WikiSyncService do RSpec.describe Geo::WikiSyncService, :geo do
include ::EE::GeoHelpers include ::EE::GeoHelpers
include ExclusiveLeaseHelpers include ExclusiveLeaseHelpers
...@@ -126,6 +126,20 @@ RSpec.describe Geo::WikiSyncService do ...@@ -126,6 +126,20 @@ RSpec.describe Geo::WikiSyncService do
end end
end end
it 'marks primary_wiki_checksummed as true when wiki has been verified on primary' do
create(:repository_state, :wiki_verified, project: project)
registry = create(:geo_project_registry, project: project, primary_wiki_checksummed: false)
expect { subject.execute }.to change { registry.reload.primary_wiki_checksummed}.from(false).to(true)
end
it 'marks primary_wiki_checksummed as false when wiki has not been verified on primary' do
create(:repository_state, :wiki_failed, project: project)
registry = create(:geo_project_registry, project: project, primary_wiki_checksummed: true)
expect { subject.execute }.to change { registry.reload.primary_wiki_checksummed}.from(true).to(false)
end
context 'tracking database' do context 'tracking database' do
context 'temporary repositories' do context 'temporary repositories' do
include_examples 'cleans temporary repositories' do include_examples 'cleans temporary repositories' do
......
...@@ -30,6 +30,76 @@ RSpec.describe Geo::RepositoryVerification::Secondary::ShardWorker, :geo, :geo_f ...@@ -30,6 +30,76 @@ RSpec.describe Geo::RepositoryVerification::Secondary::ShardWorker, :geo, :geo_f
end end
end end
it 'does not schedule jobs when shard becomes unhealthy' do
create(:repository_state, project: project)
Gitlab::ShardHealthCache.update([])
expect(secondary_single_worker).not_to receive(:perform_async)
subject.perform(shard_name)
end
it 'does not schedule jobs when no geo database is configured' do
allow(Gitlab::Geo).to receive(:geo_database_configured?) { false }
expect(secondary_single_worker).not_to receive(:perform_async)
subject.perform(shard_name)
# We need to unstub here or the DatabaseCleaner will have issues since it
# will appear as though the tracking DB were not available
allow(Gitlab::Geo).to receive(:geo_database_configured?).and_call_original
end
it 'does not schedule jobs when not running on a secondary' do
allow(Gitlab::Geo).to receive(:secondary?) { false }
expect(secondary_single_worker).not_to receive(:perform_async)
subject.perform(shard_name)
end
it 'does not schedule jobs when number of scheduled jobs exceeds capacity' do
create(:project)
is_expected.to receive(:scheduled_job_ids).and_return(1..1000).at_least(:once)
is_expected.not_to receive(:schedule_job)
Sidekiq::Testing.inline! { subject.perform(shard_name) }
end
context 'backoff time' do
let(:cache_key) { "#{described_class.name.underscore}:shard:#{shard_name}:skip" }
before do
allow(Rails.cache).to receive(:write).and_call_original
allow(Rails.cache).to receive(:read).and_call_original
end
it 'sets the back off time when there are no pending items' do
expect(Rails.cache).to receive(:write).with(cache_key, true, expires_in: 300.seconds).once
subject.perform(shard_name)
end
it 'does not perform Geo::RepositoryVerification::Secondary::SingleWorker when the backoff time is set' do
create(:repository_state, :repository_verified, project: project)
create(:geo_project_registry, :synced, :repository_verification_outdated, project: project)
expect(Rails.cache).to receive(:read).with(cache_key).and_return(true)
expect(Geo::RepositoryVerification::Secondary::SingleWorker).not_to receive(:perform_async)
subject.perform(shard_name)
end
end
context 'when geo_project_registry_ssot_sync is enabled' do
before do
stub_feature_flags(geo_project_registry_ssot_sync: true)
end
it 'schedule a job for each project' do it 'schedule a job for each project' do
other_project = create(:project) other_project = create(:project)
create(:repository_state, :repository_verified, project: project) create(:repository_state, :repository_verified, project: project)
...@@ -75,23 +145,23 @@ RSpec.describe Geo::RepositoryVerification::Secondary::ShardWorker, :geo, :geo_f ...@@ -75,23 +145,23 @@ RSpec.describe Geo::RepositoryVerification::Secondary::ShardWorker, :geo, :geo_f
other_project = create(:project) other_project = create(:project)
create(:repository_state, :repository_verified, project: project) create(:repository_state, :repository_verified, project: project)
create(:repository_state, :wiki_verified, project: other_project) create(:repository_state, :wiki_verified, project: other_project)
create(:geo_project_registry, :synced, project: project, repository_missing_on_primary: true) create(:geo_project_registry, :synced, :wiki_verified, project: project, repository_missing_on_primary: true)
create(:geo_project_registry, :synced, project: other_project, wiki_missing_on_primary: true) create(:geo_project_registry, :synced, :repository_verified, project: other_project, wiki_missing_on_primary: true)
expect(secondary_single_worker).not_to receive(:perform_async) expect(secondary_single_worker).not_to receive(:perform_async)
subject.perform(shard_name) subject.perform(shard_name)
end end
# test that when jobs are always moving forward and we're not querying the same things # Test that when jobs are always moving forward and we're not querying
# over and over # the same things over and over
describe 'resource loading' do context 'resource loading' do
before do before do
allow(subject).to receive(:db_retrieve_batch_size) { 1 } allow(subject).to receive(:db_retrieve_batch_size) { 1 }
end end
let(:project1_repo_verified) { create(:repository_state, :repository_verified).project } let(:project1_repo_verified) { create(:repository_state, :repository_verified, :wiki_verified).project }
let(:project2_repo_verified) { create(:repository_state, :repository_verified).project } let(:project2_repo_verified) { create(:repository_state, :repository_verified, :wiki_verified).project }
let(:project3_repo_failed) { create(:repository_state, :repository_failed).project } let(:project3_repo_failed) { create(:repository_state, :repository_failed).project }
let(:project4_wiki_verified) { create(:repository_state, :wiki_verified).project } let(:project4_wiki_verified) { create(:repository_state, :wiki_verified).project }
let(:project5_both_verified) { create(:repository_state, :repository_verified, :wiki_verified).project } let(:project5_both_verified) { create(:repository_state, :repository_verified, :wiki_verified).project }
...@@ -109,11 +179,11 @@ RSpec.describe Geo::RepositoryVerification::Secondary::ShardWorker, :geo, :geo_f ...@@ -109,11 +179,11 @@ RSpec.describe Geo::RepositoryVerification::Secondary::ShardWorker, :geo, :geo_f
end end
end end
it 'handles multiple batches of projects needing verification, skipping failed repos' do it 'handles multiple batches of projects needing verification, skipping repositories not verified on primary' do
reg1 = create(:geo_project_registry, :synced, :repository_verification_outdated, project: project1_repo_verified) reg1 = create(:geo_project_registry, :synced, :repository_verification_outdated, project: project1_repo_verified)
reg2 = create(:geo_project_registry, :synced, :repository_verification_outdated, project: project2_repo_verified) reg2 = create(:geo_project_registry, :synced, :repository_verification_outdated, project: project2_repo_verified)
create(:geo_project_registry, :synced, :repository_verification_outdated, project: project3_repo_failed) create(:geo_project_registry, :synced, :repository_verification_outdated, :wiki_verified, project: project3_repo_failed, primary_repository_checksummed: false)
reg4 = create(:geo_project_registry, :synced, :wiki_verification_outdated, project: project4_wiki_verified) reg4 = create(:geo_project_registry, :synced, :wiki_verification_outdated, project: project4_wiki_verified, primary_repository_checksummed: false)
create(:geo_project_registry, :synced, :repository_verification_failed, :wiki_verification_failed, project: project5_both_verified) create(:geo_project_registry, :synced, :repository_verification_failed, :wiki_verification_failed, project: project5_both_verified)
reg6 = create(:geo_project_registry, :synced, project: project6_both_verified) reg6 = create(:geo_project_registry, :synced, project: project6_both_verified)
...@@ -127,69 +197,109 @@ RSpec.describe Geo::RepositoryVerification::Secondary::ShardWorker, :geo, :geo_f ...@@ -127,69 +197,109 @@ RSpec.describe Geo::RepositoryVerification::Secondary::ShardWorker, :geo, :geo_f
end end
end end
end end
end
it 'does not schedule jobs when shard becomes unhealthy' do context 'when geo_project_registry_ssot_sync is disabled' do
create(:repository_state, project: project) before do
stub_feature_flags(geo_project_registry_ssot_sync: false)
end
Gitlab::ShardHealthCache.update([]) it 'schedule a job for each project' do
other_project = create(:project)
create(:repository_state, :repository_verified, project: project)
create(:repository_state, :repository_verified, project: other_project)
create(:geo_project_registry, :synced, :repository_verification_outdated, project: project)
create(:geo_project_registry, :synced, :repository_verification_outdated, project: other_project)
expect(secondary_single_worker).not_to receive(:perform_async) expect(secondary_single_worker).to receive(:perform_async).twice
subject.perform(shard_name) subject.perform(shard_name)
end end
it 'does not schedule jobs when no geo database is configured' do it 'schedule jobs for projects missing repository verification' do
allow(Gitlab::Geo).to receive(:geo_database_configured?) { false } create(:repository_state, :repository_verified, :wiki_verified, project: project)
missing_repository_verification = create(:geo_project_registry, :synced, :wiki_verified, project: project)
expect(secondary_single_worker).not_to receive(:perform_async) expect(secondary_single_worker).to receive(:perform_async).with(missing_repository_verification.id)
subject.perform(shard_name) subject.perform(shard_name)
# We need to unstub here or the DatabaseCleaner will have issues since it
# will appear as though the tracking DB were not available
allow(Gitlab::Geo).to receive(:geo_database_configured?).and_call_original
end end
it 'does not schedule jobs when not running on a secondary' do it 'schedule jobs for projects missing wiki verification' do
allow(Gitlab::Geo).to receive(:secondary?) { false } create(:repository_state, :repository_verified, :wiki_verified, project: project)
missing_wiki_verification = create(:geo_project_registry, :synced, :repository_verified, project: project)
expect(secondary_single_worker).not_to receive(:perform_async) expect(secondary_single_worker).to receive(:perform_async).with(missing_wiki_verification.id)
subject.perform(shard_name) subject.perform(shard_name)
end end
it 'does not schedule jobs when number of scheduled jobs exceeds capacity' do it 'does not schedule jobs for projects on other shards' do
create(:project) project_other_shard = create(:project)
project_other_shard.update_column(:repository_storage, 'other')
create(:repository_state, :repository_verified, :wiki_verified, project: project_other_shard)
registry_other_shard = create(:geo_project_registry, :synced, :wiki_verified, project: project_other_shard)
is_expected.to receive(:scheduled_job_ids).and_return(1..1000).at_least(:once) expect(secondary_single_worker).not_to receive(:perform_async).with(registry_other_shard.id)
is_expected.not_to receive(:schedule_job)
Sidekiq::Testing.inline! { subject.perform(shard_name) } subject.perform(shard_name)
end end
context 'backoff time' do it 'does not schedule jobs for projects missing repositories on primary' do
let(:cache_key) { "#{described_class.name.underscore}:shard:#{shard_name}:skip" } other_project = create(:project)
create(:repository_state, :repository_verified, project: project)
create(:repository_state, :wiki_verified, project: other_project)
create(:geo_project_registry, :synced, project: project, repository_missing_on_primary: true)
create(:geo_project_registry, :synced, project: other_project, wiki_missing_on_primary: true)
expect(secondary_single_worker).not_to receive(:perform_async)
subject.perform(shard_name)
end
# Test that when jobs are always moving forward and we're not querying
# the same things over and over
context 'resource loading' do
before do before do
allow(Rails.cache).to receive(:write).and_call_original allow(subject).to receive(:db_retrieve_batch_size) { 1 }
allow(Rails.cache).to receive(:read).and_call_original
end end
it 'sets the back off time when there are no pending items' do let(:project1_repo_verified) { create(:repository_state, :repository_verified).project }
expect(Rails.cache).to receive(:write).with(cache_key, true, expires_in: 300.seconds).once let(:project2_repo_verified) { create(:repository_state, :repository_verified).project }
let(:project3_repo_failed) { create(:repository_state, :repository_failed).project }
let(:project4_wiki_verified) { create(:repository_state, :wiki_verified).project }
let(:project5_both_verified) { create(:repository_state, :repository_verified, :wiki_verified).project }
let(:project6_both_verified) { create(:repository_state, :repository_verified, :wiki_verified).project }
subject.perform(shard_name) it 'handles multiple batches of projects needing verification' do
end reg1 = create(:geo_project_registry, :synced, :repository_verification_outdated, project: project1_repo_verified)
reg2 = create(:geo_project_registry, :synced, :repository_verification_outdated, project: project2_repo_verified)
it 'does not perform Geo::RepositoryVerification::Secondary::SingleWorker when the backoff time is set' do expect(secondary_single_worker).to receive(:perform_async).with(reg1.id).once.and_call_original
create(:repository_state, :repository_verified, project: project) expect(secondary_single_worker).to receive(:perform_async).with(reg2.id).once.and_call_original
create(:geo_project_registry, :synced, :repository_verification_outdated, project: project)
expect(Rails.cache).to receive(:read).with(cache_key).and_return(true) 3.times do
Sidekiq::Testing.inline! { subject.perform(shard_name) }
end
end
expect(Geo::RepositoryVerification::Secondary::SingleWorker).not_to receive(:perform_async) it 'handles multiple batches of projects needing verification, skipping failed repos' do
reg1 = create(:geo_project_registry, :synced, :repository_verification_outdated, project: project1_repo_verified)
reg2 = create(:geo_project_registry, :synced, :repository_verification_outdated, project: project2_repo_verified)
create(:geo_project_registry, :synced, :repository_verification_outdated, project: project3_repo_failed)
reg4 = create(:geo_project_registry, :synced, :wiki_verification_outdated, project: project4_wiki_verified)
create(:geo_project_registry, :synced, :repository_verification_failed, :wiki_verification_failed, project: project5_both_verified)
reg6 = create(:geo_project_registry, :synced, project: project6_both_verified)
subject.perform(shard_name) expect(secondary_single_worker).to receive(:perform_async).with(reg1.id).once.and_call_original
expect(secondary_single_worker).to receive(:perform_async).with(reg2.id).once.and_call_original
expect(secondary_single_worker).to receive(:perform_async).with(reg4.id).once.and_call_original
expect(secondary_single_worker).to receive(:perform_async).with(reg6.id).once.and_call_original
7.times do
Sidekiq::Testing.inline! { subject.perform(shard_name) }
end
end
end end
end end
end end
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe Geo::RepositoryVerification::Secondary::SingleWorker, :clean_gitlab_redis_cache do RSpec.describe Geo::RepositoryVerification::Secondary::SingleWorker, :geo, :clean_gitlab_redis_cache do
include ::EE::GeoHelpers include ::EE::GeoHelpers
include ExclusiveLeaseHelpers include ExclusiveLeaseHelpers
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment