Commit 5b44921b authored by Douglas Barbosa Alexandre's avatar Douglas Barbosa Alexandre Committed by Mayra Cabrera

Add a method to retrieve registries pending verification

Returns registries where repository and/or
wiki is pending verification.
parent 17a5d870
...@@ -60,8 +60,6 @@ module Reenqueuer ...@@ -60,8 +60,6 @@ module Reenqueuer
5.seconds 5.seconds
end end
# We intend to get rid of sleep:
# https://gitlab.com/gitlab-org/gitlab/issues/121697
module ReenqueuerSleeper module ReenqueuerSleeper
# The block will run, and then sleep until the minimum duration. Returns the # The block will run, and then sleep until the minimum duration. Returns the
# block's return value. # block's return value.
......
...@@ -37,5 +37,36 @@ module Geo ...@@ -37,5 +37,36 @@ module Geo
.limit(batch_size) .limit(batch_size)
end end
# rubocop:enable CodeReuse/ActiveRecord # rubocop:enable CodeReuse/ActiveRecord
# rubocop:disable CodeReuse/ActiveRecord
def find_project_ids_pending_verification(batch_size:, except_ids: [])
Geo::ProjectRegistry
.from_union([
repositories_checksummed_pending_verification,
wikis_checksummed_pending_verification
])
.model_id_not_in(except_ids)
.limit(batch_size)
.pluck_model_foreign_key
end
# rubocop:enable CodeReuse/ActiveRecord
private
# rubocop:disable CodeReuse/ActiveRecord
def repositories_checksummed_pending_verification
Geo::ProjectRegistry
.repositories_checksummed_pending_verification
.select(Geo::ProjectRegistry.arel_table[:project_id])
end
# rubocop:enable CodeReuse/ActiveRecord
# rubocop:disable CodeReuse/ActiveRecord
def wikis_checksummed_pending_verification
Geo::ProjectRegistry
.wikis_checksummed_pending_verification
.select(Geo::ProjectRegistry.arel_table[:project_id])
end
# rubocop:enable CodeReuse/ActiveRecord
end end
end end
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
class Geo::ProjectRegistry < Geo::BaseRegistry class Geo::ProjectRegistry < Geo::BaseRegistry
include ::Delay include ::Delay
include ::EachBatch include ::EachBatch
include ::FromUnion
include ::ShaAttribute include ::ShaAttribute
MODEL_CLASS = ::Project MODEL_CLASS = ::Project
...@@ -176,8 +177,12 @@ class Geo::ProjectRegistry < Geo::BaseRegistry ...@@ -176,8 +177,12 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
end end
end end
def self.registries_pending_verification def self.repositories_checksummed_pending_verification
repositories_pending_verification.or(wikis_pending_verification) where(repositories_pending_verification.and(arel_table[:primary_repository_checksummed].eq(true)))
end
def self.wikis_checksummed_pending_verification
where(wikis_pending_verification.and(arel_table[:primary_wiki_checksummed].eq(true)))
end end
def self.repositories_pending_verification def self.repositories_pending_verification
...@@ -258,8 +263,9 @@ class Geo::ProjectRegistry < Geo::BaseRegistry ...@@ -258,8 +263,9 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
# #
# @param [String] type must be one of the values in TYPES # @param [String] type must be one of the values in TYPES
# @see REGISTRY_TYPES # @see REGISTRY_TYPES
def finish_sync!(type, missing_on_primary = false) def finish_sync!(type, missing_on_primary = false, primary_checksummed = false)
ensure_valid_type!(type) ensure_valid_type!(type)
update!( update!(
# Indicate that the sync succeeded (but separately mark as synced atomically) # Indicate that the sync succeeded (but separately mark as synced atomically)
"last_#{type}_successful_sync_at" => Time.current, "last_#{type}_successful_sync_at" => Time.current,
...@@ -270,6 +276,7 @@ class Geo::ProjectRegistry < Geo::BaseRegistry ...@@ -270,6 +276,7 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
"#{type}_missing_on_primary" => missing_on_primary, "#{type}_missing_on_primary" => missing_on_primary,
# Indicate that repository verification needs to be done again # Indicate that repository verification needs to be done again
"primary_#{type}_checksummed" => primary_checksummed,
"#{type}_verification_checksum_sha" => nil, "#{type}_verification_checksum_sha" => nil,
"#{type}_checksum_mismatch" => false, "#{type}_checksum_mismatch" => false,
"last_#{type}_verification_failure" => nil) "last_#{type}_verification_failure" => nil)
...@@ -326,9 +333,11 @@ class Geo::ProjectRegistry < Geo::BaseRegistry ...@@ -326,9 +333,11 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
end end
# Resets repository/wiki verification state. Is called when a Geo # Resets repository/wiki verification state. Is called when a Geo
# secondary node process a Geo::ResetChecksymEvent. # secondary node process a Geo::ResetChecksumEvent.
def reset_checksum! def reset_checksum!
update!( update!(
primary_repository_checksummed: true,
primary_wiki_checksummed: true,
repository_verification_checksum_sha: nil, repository_verification_checksum_sha: nil,
wiki_verification_checksum_sha: nil, wiki_verification_checksum_sha: nil,
repository_checksum_mismatch: false, repository_checksum_mismatch: false,
......
...@@ -135,7 +135,7 @@ module Geo ...@@ -135,7 +135,7 @@ module Geo
def mark_sync_as_successful(missing_on_primary: false) def mark_sync_as_successful(missing_on_primary: false)
log_info("Marking #{type} sync as successful") log_info("Marking #{type} sync as successful")
persisted = registry.finish_sync!(type, missing_on_primary) persisted = registry.finish_sync!(type, missing_on_primary, primary_checksummed?)
reschedule_sync unless persisted reschedule_sync unless persisted
...@@ -144,6 +144,14 @@ module Geo ...@@ -144,6 +144,14 @@ module Geo
download_time_s: download_time_in_seconds) download_time_s: download_time_in_seconds)
end end
def primary_checksummed?
primary_checksum.present?
end
def primary_checksum
project.repository_state&.public_send("#{type}_verification_checksum") # rubocop:disable GitlabSecurity/PublicSend
end
def reschedule_sync def reschedule_sync
log_info("Reschedule #{type} sync because a RepositoryUpdateEvent was processed during the sync") log_info("Reschedule #{type} sync because a RepositoryUpdateEvent was processed during the sync")
......
...@@ -23,7 +23,7 @@ module Geo ...@@ -23,7 +23,7 @@ module Geo
def should_verify_checksum? def should_verify_checksum?
return false if resync? return false if resync?
return false unless primary_checksum.present? return false unless primary_checksummed?
mismatch?(secondary_checksum) mismatch?(secondary_checksum)
end end
...@@ -32,6 +32,10 @@ module Geo ...@@ -32,6 +32,10 @@ module Geo
registry.public_send("resync_#{type}") # rubocop:disable GitlabSecurity/PublicSend registry.public_send("resync_#{type}") # rubocop:disable GitlabSecurity/PublicSend
end end
def primary_checksummed?
primary_checksum.present?
end
def primary_checksum def primary_checksum
project.repository_state.public_send("#{type}_verification_checksum") # rubocop:disable GitlabSecurity/PublicSend project.repository_state.public_send("#{type}_verification_checksum") # rubocop:disable GitlabSecurity/PublicSend
end end
...@@ -71,6 +75,7 @@ module Geo ...@@ -71,6 +75,7 @@ module Geo
end end
registry.update!( registry.update!(
"primary_#{type}_checksummed" => primary_checksummed?,
"#{type}_verification_checksum_sha" => checksum, "#{type}_verification_checksum_sha" => checksum,
"#{type}_verification_checksum_mismatched" => mismatch, "#{type}_verification_checksum_mismatched" => mismatch,
"#{type}_checksum_mismatch" => mismatch.present?, "#{type}_checksum_mismatch" => mismatch.present?,
......
...@@ -8,6 +8,7 @@ module Geo ...@@ -8,6 +8,7 @@ module Geo
# This worker does not perform work scoped to a context # This worker does not perform work scoped to a context
include CronjobQueue include CronjobQueue
# rubocop:enable Scalability/CronWorkerContext # rubocop:enable Scalability/CronWorkerContext
attr_accessor :shard_name attr_accessor :shard_name
loggable_arguments 0 loggable_arguments 0
...@@ -38,17 +39,50 @@ module Geo ...@@ -38,17 +39,50 @@ module Geo
current_node.verification_max_capacity current_node.verification_max_capacity
end end
# rubocop:disable CodeReuse/ActiveRecord
def load_pending_resources def load_pending_resources
Geo::ProjectRegistryPendingVerificationFinder return [] unless valid_shard?
.new(current_node: current_node, shard_name: shard_name, batch_size: db_retrieve_batch_size)
.execute if Geo::ProjectRegistry.registry_consistency_worker_enabled?
.pluck_primary_key project_ids =
registry_finder
.find_project_ids_pending_verification(batch_size: db_retrieve_batch_size, except_ids: scheduled_project_ids)
Project
.id_in(project_ids)
.within_shards(shard_name)
.pluck_primary_key
else
Geo::ProjectRegistryPendingVerificationFinder
.new(current_node: current_node, shard_name: shard_name, batch_size: db_retrieve_batch_size)
.execute
.merge(Geo::ProjectRegistry.model_id_not_in(scheduled_project_ids))
.pluck(Geo::Fdw::Project.arel_table[:id])
end
end
# rubocop:enable CodeReuse/ActiveRecord
def scheduled_project_ids
scheduled_jobs.map { |data| data[:project_id] }
end end
def schedule_job(registry_id) # rubocop:disable CodeReuse/ActiveRecord
def schedule_job(project_id)
registry_id = Geo::ProjectRegistry.where(project_id: project_id).pick(:id)
job_id = Geo::RepositoryVerification::Secondary::SingleWorker.perform_async(registry_id) job_id = Geo::RepositoryVerification::Secondary::SingleWorker.perform_async(registry_id)
{ id: registry_id, job_id: job_id } if job_id { project_id: project_id, job_id: job_id } if job_id
end
# rubocop:enable CodeReuse/ActiveRecord
def registry_finder
@registry_finder ||= Geo::ProjectRegistryFinder.new
end
def valid_shard?
return true unless current_node.selective_sync_by_shards?
current_node.selective_sync_shards.include?(shard_name)
end end
end end
end end
......
---
title: Make registry table as SSOT to verify Projects and Wikis
merge_request: 35095
author:
type: changed
# frozen_string_literal: true
class AddVerifiedOnPrimaryColumnsToProjectRegistry < ActiveRecord::Migration[6.0]
DOWNTIME = false
def change
add_column :project_registry, :primary_repository_checksummed, :boolean, default: false, null: false
add_column :project_registry, :primary_wiki_checksummed, :boolean, default: false, null: false
end
end
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
# #
# It's strongly recommended that you check this file into your version control system. # It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema.define(version: 2020_04_07_120740) do ActiveRecord::Schema.define(version: 2020_07_07_011052) do
# These are extensions that must be enabled in order to support this database # These are extensions that must be enabled in order to support this database
enable_extension "plpgsql" enable_extension "plpgsql"
...@@ -146,6 +146,8 @@ ActiveRecord::Schema.define(version: 2020_04_07_120740) do ...@@ -146,6 +146,8 @@ ActiveRecord::Schema.define(version: 2020_04_07_120740) do
t.datetime_with_timezone "last_wiki_verification_ran_at" t.datetime_with_timezone "last_wiki_verification_ran_at"
t.binary "repository_verification_checksum_mismatched" t.binary "repository_verification_checksum_mismatched"
t.binary "wiki_verification_checksum_mismatched" t.binary "wiki_verification_checksum_mismatched"
t.boolean "primary_repository_checksummed", default: false, null: false
t.boolean "primary_wiki_checksummed", default: false, null: false
t.index ["last_repository_successful_sync_at"], name: "idx_project_registry_synced_repositories_partial", where: "((resync_repository = false) AND (repository_retry_count IS NULL) AND (repository_verification_checksum_sha IS NOT NULL))" t.index ["last_repository_successful_sync_at"], name: "idx_project_registry_synced_repositories_partial", where: "((resync_repository = false) AND (repository_retry_count IS NULL) AND (repository_verification_checksum_sha IS NOT NULL))"
t.index ["last_repository_successful_sync_at"], name: "index_project_registry_on_last_repository_successful_sync_at" t.index ["last_repository_successful_sync_at"], name: "index_project_registry_on_last_repository_successful_sync_at"
t.index ["last_repository_synced_at"], name: "index_project_registry_on_last_repository_synced_at" t.index ["last_repository_synced_at"], name: "index_project_registry_on_last_repository_synced_at"
......
...@@ -9,6 +9,8 @@ FactoryBot.define do ...@@ -9,6 +9,8 @@ FactoryBot.define do
last_wiki_successful_sync_at { nil } last_wiki_successful_sync_at { nil }
resync_repository { true } resync_repository { true }
resync_wiki { true } resync_wiki { true }
primary_repository_checksummed { true }
primary_wiki_checksummed { true }
trait :dirty do trait :dirty do
resync_repository { true } resync_repository { true }
......
...@@ -44,4 +44,38 @@ RSpec.describe Geo::ProjectRegistryFinder, :geo do ...@@ -44,4 +44,38 @@ RSpec.describe Geo::ProjectRegistryFinder, :geo do
expect(registries).to match_ids(registry_project_2, registry_project_3) expect(registries).to match_ids(registry_project_2, registry_project_3)
end end
end end
describe '#find_project_ids_pending_verification' do
it 'returns project IDs where repository and/or wiki is pending verification' do
project_ids = subject.find_project_ids_pending_verification(batch_size: 10)
expect(project_ids).to match_array([project_1.id, project_4.id, project_5.id])
end
it 'excludes registries where repository and wiki is missing on primary' do
registry_project_7 = create(:geo_project_registry, :synced, repository_missing_on_primary: true)
registry_project_8 = create(:geo_project_registry, :synced, wiki_missing_on_primary: true)
create(:geo_project_registry, :synced, repository_missing_on_primary: true, wiki_missing_on_primary: true)
project_ids = subject.find_project_ids_pending_verification(batch_size: 10)
expect(project_ids).to match_array([project_1.id, project_4.id, project_5.id, registry_project_7.project_id, registry_project_8.project_id])
end
it 'excludes registries where repository and wiki has not been verified on primary' do
registry_project_7 = create(:geo_project_registry, :synced, primary_repository_checksummed: false)
registry_project_8 = create(:geo_project_registry, :synced, primary_wiki_checksummed: false)
create(:geo_project_registry, :synced, primary_repository_checksummed: false, primary_wiki_checksummed: false)
project_ids = subject.find_project_ids_pending_verification(batch_size: 10)
expect(project_ids).to match_array([project_1.id, project_4.id, project_5.id, registry_project_7.project_id, registry_project_8.project_id])
end
it 'excludes except_ids' do
project_ids = subject.find_project_ids_pending_verification(batch_size: 10, except_ids: [project_5.id])
expect(project_ids).to match_array([project_1.id, project_4.id])
end
end
end end
...@@ -80,9 +80,11 @@ RSpec.describe Gitlab::Geo::LogCursor::Events::RepositoryUpdatedEvent, :clean_gi ...@@ -80,9 +80,11 @@ RSpec.describe Gitlab::Geo::LogCursor::Events::RepositoryUpdatedEvent, :clean_gi
subject.process subject.process
reloaded_registry = registry.reload reloaded_registry = registry.reload
expect(reloaded_registry.wiki_verification_checksum_sha).to be_nil expect(reloaded_registry).to have_attributes(
expect(reloaded_registry.wiki_checksum_mismatch).to be false wiki_verification_checksum_sha: nil,
expect(reloaded_registry.last_wiki_verification_failure).to be_nil wiki_checksum_mismatch: false,
last_wiki_verification_failure: nil
)
end end
it 'sets resync_wiki_was_scheduled_at to the scheduled time' do it 'sets resync_wiki_was_scheduled_at to the scheduled time' do
......
...@@ -44,6 +44,8 @@ RSpec.describe Gitlab::Geo::LogCursor::Events::ResetChecksumEvent, :clean_gitlab ...@@ -44,6 +44,8 @@ RSpec.describe Gitlab::Geo::LogCursor::Events::ResetChecksumEvent, :clean_gitlab
subject.process subject.process
expect(registry.reload).to have_attributes( expect(registry.reload).to have_attributes(
primary_repository_checksummed: true,
primary_wiki_checksummed: true,
repository_verification_checksum_sha: nil, repository_verification_checksum_sha: nil,
wiki_verification_checksum_sha: nil wiki_verification_checksum_sha: nil
) )
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe Geo::RepositorySyncService do RSpec.describe Geo::RepositorySyncService, :geo do
include ::EE::GeoHelpers include ::EE::GeoHelpers
include ExclusiveLeaseHelpers include ExclusiveLeaseHelpers
...@@ -151,6 +151,20 @@ RSpec.describe Geo::RepositorySyncService do ...@@ -151,6 +151,20 @@ RSpec.describe Geo::RepositorySyncService do
end end
end end
it 'marks primary_repository_checksummed as true when repository has been verified on primary' do
create(:repository_state, :repository_verified, project: project)
registry = create(:geo_project_registry, project: project, primary_repository_checksummed: false)
expect { subject.execute }.to change { registry.reload.primary_repository_checksummed}.from(false).to(true)
end
it 'marks primary_repository_checksummed as false when repository has not been verified on primary' do
create(:repository_state, :repository_failed, project: project)
registry = create(:geo_project_registry, project: project, primary_repository_checksummed: true)
expect { subject.execute }.to change { registry.reload.primary_repository_checksummed}.from(true).to(false)
end
context 'tracking database' do context 'tracking database' do
context 'temporary repositories' do context 'temporary repositories' do
include_examples 'cleans temporary repositories' do include_examples 'cleans temporary repositories' do
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe Geo::WikiSyncService do RSpec.describe Geo::WikiSyncService, :geo do
include ::EE::GeoHelpers include ::EE::GeoHelpers
include ExclusiveLeaseHelpers include ExclusiveLeaseHelpers
...@@ -126,6 +126,20 @@ RSpec.describe Geo::WikiSyncService do ...@@ -126,6 +126,20 @@ RSpec.describe Geo::WikiSyncService do
end end
end end
it 'marks primary_wiki_checksummed as true when wiki has been verified on primary' do
create(:repository_state, :wiki_verified, project: project)
registry = create(:geo_project_registry, project: project, primary_wiki_checksummed: false)
expect { subject.execute }.to change { registry.reload.primary_wiki_checksummed}.from(false).to(true)
end
it 'marks primary_wiki_checksummed as false when wiki has not been verified on primary' do
create(:repository_state, :wiki_failed, project: project)
registry = create(:geo_project_registry, project: project, primary_wiki_checksummed: true)
expect { subject.execute }.to change { registry.reload.primary_wiki_checksummed}.from(true).to(false)
end
context 'tracking database' do context 'tracking database' do
context 'temporary repositories' do context 'temporary repositories' do
include_examples 'cleans temporary repositories' do include_examples 'cleans temporary repositories' do
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe Geo::RepositoryVerification::Secondary::SingleWorker, :clean_gitlab_redis_cache do RSpec.describe Geo::RepositoryVerification::Secondary::SingleWorker, :geo, :clean_gitlab_redis_cache do
include ::EE::GeoHelpers include ::EE::GeoHelpers
include ExclusiveLeaseHelpers include ExclusiveLeaseHelpers
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment