Commit aee40a34 authored by Mike Kozono's avatar Mike Kozono Committed by Aakriti Gupta

Add worker to maintain Geo primary checksum tables

Geo replicates data from primary to secondaries. It verifies data
integrity by checksumming the data on the primary, then
checksumming the data on secondaries and comparing the result.

For performance reasons, on the primary, we need to store the
checksums and other metadata in a separate table from the
source table. For example, the primary checksum data for
`merge_request_diffs` is stored in
`merge_request_diff_details`. The verification logic already
mostly exists. But it depends on the checksum table having a
record when the source table has a record.

This commit adds a worker which inserts and deletes
checksum table records without requiring triggers or the
like on the source table. It is acceptable for this process to
be slow.

This worker is behind a feature flag verification_state_backfill_worker

EE: true
parent 369af359
......@@ -8,6 +8,7 @@ module Geo
DEFAULT_VERIFICATION_BATCH_SIZE = 10
DEFAULT_REVERIFICATION_BATCH_SIZE = 1000
DEFAULT_VERIFICATION_STATE_BACKFILL_BATCH_SIZE = 10000
included do
event :checksum_succeeded
......@@ -46,7 +47,13 @@ module Geo
# Secondaries don't need to run this since they will receive an event for each
# rechecksummed resource: https://gitlab.com/gitlab-org/gitlab/-/issues/13842
::Geo::ReverificationBatchWorker.perform_with_capacity(replicable_name) if ::Gitlab::Geo.primary?
return unless ::Gitlab::Geo.primary?
::Geo::ReverificationBatchWorker.perform_with_capacity(replicable_name)
if Feature.enabled?(:verification_state_backfill_worker, default_enabled: :yaml) && verification_query_class.separate_verification_state_table?
::Geo::VerificationStateBackfillWorker.perform_async(replicable_name)
end
end
# Called by VerificationBatchWorker.
......@@ -107,6 +114,20 @@ module Geo
reverify_batch(batch_size: reverification_batch_size)
end
# Gets the next batch of rows from the replicable table, and inserts and
# deletes corresponding rows in the verification state table.
#
# @return [Boolean] whether any rows needed to be inserted or deleted
def backfill_verification_state_table
return false unless Gitlab::Geo.primary?
Geo::VerificationStateBackfillService.new(model, batch_size: verification_state_backfill_batch_size).execute
rescue StandardError => e
log_error("Error while updating verifiables", e)
raise
end
# If primary, query the model table.
# If secondary, query the registry table.
def verification_query_class
......@@ -123,6 +144,11 @@ module Geo
DEFAULT_REVERIFICATION_BATCH_SIZE
end
# @return [Integer] number of records to check for backfill per batch job
def verification_state_backfill_batch_size
DEFAULT_VERIFICATION_STATE_BACKFILL_BATCH_SIZE
end
def checksummed_count
# When verification is disabled, this returns nil.
# Bonus: This causes the progress bar to be hidden.
......
......@@ -30,6 +30,10 @@ module EE
scope :with_verification_state, ->(state) { joins(:merge_request_diff_detail).where(merge_request_diff_details: { verification_state: verification_state_value(state) }) }
scope :checksummed, -> { joins(:merge_request_diff_detail).where.not(merge_request_diff_details: { verification_checksum: nil } ) }
scope :not_checksummed, -> { joins(:merge_request_diff_detail).where(merge_request_diff_details: { verification_checksum: nil } ) }
def create_verification_details
create_merge_request_diff_detail
end
end
class_methods do
......@@ -45,19 +49,9 @@ module EE
.merge(object_storage_scope(node))
end
override :verification_state_table_name
def verification_state_table_name
'merge_request_diff_details'
end
override :verification_state_model_key
def verification_state_model_key
'merge_request_diff_id'
end
override :verification_arel_table
def verification_arel_table
MergeRequestDiffDetail.arel_table
override :verification_state_table_class
def verification_state_table_class
MergeRequestDiffDetail
end
private
......
# frozen_string_literal: true
module Geo
class VerificationStateBackfillService
include ::Gitlab::Geo::LogHelpers
attr_reader :replicable_model, :batch_size
delegate :primary_key, :verification_state_table_name, :verification_state_model_key, :verification_arel_table, :verification_state_table_class, to: :replicable_model
def initialize(replicable_model, batch_size:)
@replicable_model = replicable_model
@batch_size = batch_size
end
# Gets the next batch of rows from the replicable table, and inserts and
# deletes corresponding rows in the verification state table.
#
# @return [Boolean] whether any rows needed to be inserted or deleted
def execute
range = next_range!
return unless range
handle_differences_in_verifiables(range)
rescue StandardError => e
log_error("Error while updating #{verification_state_table_name}", e)
raise
end
private
# @return [Range] the next range of a batch of records
def next_range!
Gitlab::Geo::BaseBatcher.new(replicable_model, verification_state_table_class, verification_state_model_key, key: batcher_key, batch_size: batch_size).next_range!
end
def batcher_key
"verification_backfill:#{replicable_model.name.parameterize}"
end
# This method creates or deletes verification details records.
# It creates for available verifiables where records don't exist yet.
# These would be replicable records that have recently become scoped as
# available_verifiables.
# New replicable records will automatically create child records in the
# verification details table, hence not created in this method.
# When a replicable record is no longer a part of the scope
# available_veriables, it is deleted.
# When a replicable record is deleted, the child record in the verification
# details table is automatically removed, hence not deleted in this method.
#
# @return [Boolean] whether any rows needed to be inserted or deleted
def handle_differences_in_verifiables(range)
verifiable_ids = replicable_model.pluck_verifiable_ids_in_range(range) || []
verification_details_ids = replicable_model.pluck_verification_details_ids_in_range(range) || []
for_creation_ids = verifiable_ids - verification_details_ids
for_deletion_ids = verification_details_ids - verifiable_ids
create_verification_details(range, for_creation_ids)
delete_verification_details(range, for_deletion_ids)
[for_creation_ids, for_deletion_ids].flatten.compact.any?
end
def create_verification_details(range, for_creation_ids)
replicable_model.find(for_creation_ids).map do |replicable|
replicable.create_verification_details
end
log_created(range, for_creation_ids)
end
def delete_verification_details(range, for_deletion_ids)
verification_state_table_class.delete(for_deletion_ids)
log_deleted(range, for_deletion_ids)
end
def log_created(range, for_creation_ids)
log_debug(
"Created verification details for ",
{
replicable_model: replicable_model.name,
start: range.first,
finish: range.last,
created: for_creation_ids
}
)
end
def log_deleted(range, for_deletion_ids)
log_debug(
"Deleted verification details for ",
{
replicable_model: replicable_model.name,
start: range.first,
finish: range.last,
deleted: for_deletion_ids
}
)
end
end
end
......@@ -705,6 +705,15 @@
:weight: 1
:idempotent: true
:tags: []
- :name: geo:geo_verification_state_backfill
:worker_name: Geo::VerificationStateBackfillWorker
:feature_category: :geo_replication
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent:
:tags: []
- :name: geo:geo_verification_timeout
:worker_name: Geo::VerificationTimeoutWorker
:feature_category: :geo_replication
......
# frozen_string_literal: true
module Geo
# Iterates over the table corresponding to the `replicable_class`
# to backfill the corresponding verification state table.
class VerificationStateBackfillWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
data_consistency :always
include ::Gitlab::Geo::LogHelpers
include GeoQueue
prepend Reenqueuer
LEASE_TIMEOUT = 1.minute
def perform(replicable_name)
replicator_class = ::Gitlab::Geo::Replicator.for_replicable_name(replicable_name)
replicator_class.backfill_verification_state_table
end
def lease_timeout
LEASE_TIMEOUT
end
end
end
---
name: verification_state_backfill_worker
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/69301
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/340648
milestone: '14.4'
type: development
group: group::geo
default_enabled: false
......@@ -18,6 +18,12 @@ module Gitlab
geo_logger.error(data)
end
def log_debug(message, details = {})
data = base_log_data(message)
data.merge!(details) if details
geo_logger.debug(data)
end
protected
def base_log_data(message)
......
......@@ -92,6 +92,10 @@ module Gitlab
end
end
def create_verification_details
raise NotImplementedError
end
private_class_method :start_verification_batch
private_class_method :start_verification_batch_query
private_class_method :start_verification_batch_subselect
......@@ -202,27 +206,24 @@ module Gitlab
.lock('FOR UPDATE SKIP LOCKED') # rubocop:disable CodeReuse/ActiveRecord
end
# Overridden in ReplicableRegistry
# This method can also be overriden in the replicable model class that
# includes this concern to specify the primary key of the database
# table that stores verification state
# Override this method in the class that includes this concern to specify
# a different ActiveRecord class to store verification state
# See module EE::MergeRequestDiff for example
def verification_state_table_class
self
end
# Overridden in ReplicableRegistry
def verification_state_model_key
self.primary_key
verification_state_table_class.primary_key
end
# Override this method in the class that includes this concern to specify
# a different database table to store verification state
# See module EE::MergeRequestDiff for example
def verification_state_table_name
table_name
verification_state_table_class.table_name
end
# Override this method in the class that includes this concern to specify
# a different arel table to store verification state
# See module EE::MergeRequestDiff for example
def verification_arel_table
arel_table
verification_state_table_class.arel_table
end
# Fail verification for records which started verification a long time ago
......@@ -289,6 +290,27 @@ module Gitlab
WHERE #{self.verification_state_model_key} IN (#{relation.select(self.verification_state_model_key).to_sql})
SQL
end
# rubocop:disable CodeReuse/ActiveRecord
def pluck_verification_details_ids_in_range(range)
verification_state_table_class
.where(self.verification_state_model_key => range)
.pluck(self.verification_state_model_key)
end
# rubocop:enable CodeReuse/ActiveRecord
def pluck_verifiable_ids_in_range(range)
self
.available_verifiables
.primary_key_in(range)
.pluck_primary_key
end
# @return whether primary checksum data is stored in a table separate
# from the model table
def separate_verification_state_table?
verification_state_table_name != table_name
end
end
# Overridden by Geo::VerifiableRegistry
......
......@@ -144,6 +144,19 @@ RSpec.shared_examples 'a verifiable replicator' do
described_class.trigger_background_verification
end
context 'when verification_state_backfill_worker feature flag is enabled' do
before do
stub_feature_flags(verification_state_backfill_worker: true)
expect(described_class.model).to receive(:separate_verification_state_table?).and_return(true)
end
it 'enqueues VerificationStateBackfillWorker' do
expect(::Geo::VerificationStateBackfillWorker).to receive(:perform_async).with(described_class.replicable_name)
described_class.trigger_background_verification
end
end
context 'for a Geo secondary' do
it 'does not enqueue ReverificationBatchWorker' do
stub_secondary_node
......@@ -184,6 +197,28 @@ RSpec.shared_examples 'a verifiable replicator' do
end
end
describe '.backfill_verification_state_table' do
context 'when on secondary' do
before do
stub_secondary_node
end
it 'returns false' do
expect(described_class.backfill_verification_state_table).to be_falsy
end
end
it 'calls VerificationStateBackfillService' do
stub_primary_node
expect_next_instance_of(Geo::VerificationStateBackfillService) do |service|
expect(service).to receive(:execute).and_return(true)
end
described_class.backfill_verification_state_table
end
end
describe '.verify_batch' do
context 'when there are records needing verification' do
let(:another_replicator) { double('another_replicator', verify: true) }
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Geo::VerificationStateBackfillService, :geo do
let(:replicable_1) { FactoryBot.create(:merge_request_diff, :external) }
let(:verifiable_1) { FactoryBot.create(:merge_request_diff_detail, merge_request_diff: replicable_1) }
subject(:job) { described_class.new(MergeRequestDiff, batch_size: 1000) }
describe '#execute' do
context 'when a replicable is missing a corresponding verifiable' do
before do
replicable_1.merge_request_diff_detail.destroy!
end
it 'adds a new verifiable' do
expect { job.execute }.to change { MergeRequestDiffDetail.count }.from(0).to(1)
end
end
context 'when some replicables were removed from scope' do
before do
replicable_1.update_attribute(:stored_externally, false)
end
it 'deletes the verifiable' do
expect { job.execute }.to change { MergeRequestDiffDetail.count }.from(1).to(0)
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Geo::VerificationStateBackfillWorker, :geo do
include EE::GeoHelpers
include ExclusiveLeaseHelpers
subject(:job) { described_class.new }
let(:job_args) { 'MergeRequestDiff' }
let_it_be(:primary) { create(:geo_node, :primary) }
before do
stub_current_geo_node(primary)
end
it 'uses a geo queue' do
expect(subject.sidekiq_options_hash).to include(
'queue' => 'geo:geo_verification_state_backfill',
'queue_namespace' => :geo
)
end
describe '#perform' do
it_behaves_like 'reenqueuer'
it_behaves_like '#perform is rate limited to 1 call per', 5.seconds
context 'when service is executed' do
before do
expect_next_instance_of(Geo::VerificationStateBackfillService) do |service|
expect(service).to receive(:execute).and_return(execute_return)
end
end
context 'when Geo::VerificationStateBackfillService#execute returns true' do
let(:execute_return) { true }
it 'returns true' do
expect(subject.perform(job_args)).to be_truthy
end
it 'worker gets reenqueued' do
expect(Geo::VerificationStateBackfillWorker).to receive(:perform_async)
subject.perform(job_args)
end
end
context 'when VerificationStateBackfillService#execute returns false' do
let(:execute_return) { false }
it 'returns false' do
expect(subject.perform(job_args)).to be_falsey
end
it 'worker does not get reenqueued (we will wait until next cronjob)' do
expect(Geo::VerificationStateBackfillWorker).not_to receive(:perform_async)
subject.perform(job_args)
end
end
end
end
end
# frozen_string_literal: true
# Expects `subject` to be a job/worker instance
# Expects `subject` to be a job/worker instance and
# `job_args` to be arguments to #perform if it takes arguments
RSpec.shared_examples 'reenqueuer' do
before do
allow(subject).to receive(:sleep) # faster tests
end
let(:subject_perform) { defined?(job_args) ? subject.perform(job_args) : subject.perform }
it 'implements lease_timeout' do
expect(subject.lease_timeout).to be_a(ActiveSupport::Duration)
end
......@@ -18,12 +21,13 @@ RSpec.shared_examples 'reenqueuer' do
it 'tries to obtain a lease' do
expect_to_obtain_exclusive_lease(subject.lease_key)
subject.perform
subject_perform
end
end
end
# Expects `subject` to be a job/worker instance
# Expects `subject` to be a job/worker instance and
# `job_args` to be arguments to #perform if it takes arguments
RSpec.shared_examples '#perform is rate limited to 1 call per' do |minimum_duration|
before do
# Allow Timecop freeze and travel without the block form
......@@ -38,13 +42,15 @@ RSpec.shared_examples '#perform is rate limited to 1 call per' do |minimum_durat
Timecop.safe_mode = true
end
let(:subject_perform) { defined?(job_args) ? subject.perform(job_args) : subject.perform }
context 'when the work finishes in 0 seconds' do
let(:actual_duration) { 0 }
it 'sleeps exactly the minimum duration' do
expect(subject).to receive(:sleep).with(a_value_within(0.01).of(minimum_duration))
subject.perform
subject_perform
end
end
......@@ -54,7 +60,7 @@ RSpec.shared_examples '#perform is rate limited to 1 call per' do |minimum_durat
it 'sleeps 90% of minimum duration' do
expect(subject).to receive(:sleep).with(a_value_within(0.01).of(0.9 * minimum_duration))
subject.perform
subject_perform
end
end
......@@ -64,7 +70,7 @@ RSpec.shared_examples '#perform is rate limited to 1 call per' do |minimum_durat
it 'sleeps 10% of minimum duration' do
expect(subject).to receive(:sleep).with(a_value_within(0.01).of(0.1 * minimum_duration))
subject.perform
subject_perform
end
end
......@@ -74,7 +80,7 @@ RSpec.shared_examples '#perform is rate limited to 1 call per' do |minimum_durat
it 'does not sleep' do
expect(subject).not_to receive(:sleep)
subject.perform
subject_perform
end
end
......@@ -84,7 +90,7 @@ RSpec.shared_examples '#perform is rate limited to 1 call per' do |minimum_durat
it 'does not sleep' do
expect(subject).not_to receive(:sleep)
subject.perform
subject_perform
end
end
......@@ -94,7 +100,7 @@ RSpec.shared_examples '#perform is rate limited to 1 call per' do |minimum_durat
it 'does not sleep' do
expect(subject).not_to receive(:sleep)
subject.perform
subject_perform
end
end
......
......@@ -257,6 +257,7 @@ RSpec.describe 'Every Sidekiq worker' do
'Geo::Scheduler::SchedulerWorker' => 3,
'Geo::Scheduler::Secondary::SchedulerWorker' => 3,
'Geo::VerificationBatchWorker' => 0,
'Geo::VerificationStateBackfillWorker' => false,
'Geo::VerificationTimeoutWorker' => false,
'Geo::VerificationWorker' => 3,
'GeoRepositoryDestroyWorker' => 3,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment