Commit d5d42c7b authored by Valery Sizov's avatar Valery Sizov

Geo: Fail syncs which exceed a timeout

We create a worker SyncTimeoutCronWorker
it will find all the orphaned started registry records
and mark them as failed
parent f1a3964d
...@@ -592,6 +592,9 @@ Gitlab.ee do ...@@ -592,6 +592,9 @@ Gitlab.ee do
Settings.cron_jobs['geo_verification_cron_worker'] ||= Settingslogic.new({}) Settings.cron_jobs['geo_verification_cron_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_verification_cron_worker']['cron'] ||= '* * * * *' Settings.cron_jobs['geo_verification_cron_worker']['cron'] ||= '* * * * *'
Settings.cron_jobs['geo_verification_cron_worker']['job_class'] ||= 'Geo::VerificationCronWorker' Settings.cron_jobs['geo_verification_cron_worker']['job_class'] ||= 'Geo::VerificationCronWorker'
Settings.cron_jobs['geo_sync_timeout_cron_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_sync_timeout_cron_worker']['cron'] ||= '*/10 * * * *'
Settings.cron_jobs['geo_sync_timeout_cron_worker']['job_class'] ||= 'Geo::SyncTimeoutCronWorker'
Settings.cron_jobs['geo_secondary_usage_data_cron_worker'] ||= Settingslogic.new({}) Settings.cron_jobs['geo_secondary_usage_data_cron_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_secondary_usage_data_cron_worker']['cron'] ||= '0 0 * * 0' Settings.cron_jobs['geo_secondary_usage_data_cron_worker']['cron'] ||= '0 0 * * 0'
Settings.cron_jobs['geo_secondary_usage_data_cron_worker']['job_class'] ||= 'Geo::SecondaryUsageDataCronWorker' Settings.cron_jobs['geo_secondary_usage_data_cron_worker']['job_class'] ||= 'Geo::SecondaryUsageDataCronWorker'
......
...@@ -12,6 +12,12 @@ module Geo ...@@ -12,6 +12,12 @@ module Geo
event :deleted event :deleted
end end
class_methods do
def sync_timeout
::Geo::BlobDownloadService::LEASE_TIMEOUT
end
end
def handle_after_create_commit def handle_after_create_commit
return false unless Gitlab::Geo.primary? return false unless Gitlab::Geo.primary?
return unless self.class.enabled? return unless self.class.enabled?
......
...@@ -11,6 +11,8 @@ module Geo::ReplicableRegistry ...@@ -11,6 +11,8 @@ module Geo::ReplicableRegistry
}.freeze }.freeze
class_methods do class_methods do
include Delay
def state_value(state_string) def state_value(state_string)
STATE_VALUES[state_string] STATE_VALUES[state_string]
end end
...@@ -26,6 +28,20 @@ module Geo::ReplicableRegistry ...@@ -26,6 +28,20 @@ module Geo::ReplicableRegistry
def registry_consistency_worker_enabled? def registry_consistency_worker_enabled?
replicator_class.enabled? replicator_class.enabled?
end end
# Fail syncs for records which started syncing a long time ago
def fail_sync_timeouts
attrs = {
state: state_value(:failed),
last_sync_failure: "Sync timed out after #{replicator_class.sync_timeout}",
retry_count: 1,
retry_at: next_retry_time(1)
}
sync_timed_out.all.each_batch do |relation|
relation.update_all(attrs)
end
end
end end
def replicator_class def replicator_class
...@@ -42,6 +58,7 @@ module Geo::ReplicableRegistry ...@@ -42,6 +58,7 @@ module Geo::ReplicableRegistry
scope :pending, -> { with_state(:pending) } scope :pending, -> { with_state(:pending) }
scope :retry_due, -> { where(arel_table[:retry_at].eq(nil).or(arel_table[:retry_at].lt(Time.current))) } scope :retry_due, -> { where(arel_table[:retry_at].eq(nil).or(arel_table[:retry_at].lt(Time.current))) }
scope :synced, -> { with_state(:synced) } scope :synced, -> { with_state(:synced) }
scope :sync_timed_out, -> { with_state(:started).where("last_synced_at < ?", replicator_class.sync_timeout.ago) }
state_machine :state, initial: :pending do state_machine :state, initial: :pending do
state :pending, value: STATE_VALUES[:pending] state :pending, value: STATE_VALUES[:pending]
......
...@@ -13,6 +13,12 @@ module Geo ...@@ -13,6 +13,12 @@ module Geo
event :deleted event :deleted
end end
class_methods do
def sync_timeout
::Geo::FrameworkRepositorySyncService::LEASE_TIMEOUT
end
end
# Called by Gitlab::Geo::Replicator#consume # Called by Gitlab::Geo::Replicator#consume
def consume_event_updated(**params) def consume_event_updated(**params)
return unless in_replicables_for_current_secondary? return unless in_replicables_for_current_secondary?
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
class Geo::BaseRegistry < Geo::TrackingBase class Geo::BaseRegistry < Geo::TrackingBase
include BulkInsertSafe include BulkInsertSafe
include EachBatch
self.abstract_class = true self.abstract_class = true
......
...@@ -211,6 +211,14 @@ ...@@ -211,6 +211,14 @@
:weight: 1 :weight: 1
:idempotent: :idempotent:
:tags: [] :tags: []
- :name: cronjob:geo_sync_timeout_cron
:feature_category: :geo_replication
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: cronjob:geo_verification_cron - :name: cronjob:geo_verification_cron
:feature_category: :geo_replication :feature_category: :geo_replication
:has_external_dependencies: :has_external_dependencies:
......
# frozen_string_literal: true
module Geo
# Fail sync for records which started syncing a long time ago
class SyncTimeoutCronWorker
include ApplicationWorker
include ::Gitlab::Geo::LogHelpers
# This worker does not perform work scoped to a context
include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
idempotent!
sidekiq_options retry: false, dead: false
feature_category :geo_replication
def perform
Gitlab::Geo.enabled_replicator_classes.each do |replicator_class|
replicator_class.fail_sync_timeouts
end
end
end
end
---
title: 'Geo: Fail syncs which exceed a timeout'
merge_request: 56538
author:
type: fixed
...@@ -22,6 +22,7 @@ module Gitlab ...@@ -22,6 +22,7 @@ module Gitlab
geo_repository_verification_secondary_scheduler_worker geo_repository_verification_secondary_scheduler_worker
geo_secondary_registry_consistency_worker geo_secondary_registry_consistency_worker
geo_secondary_usage_data_cron_worker geo_secondary_usage_data_cron_worker
geo_sync_timeout_cron_worker
].freeze ].freeze
GEO_JOBS = (COMMON_JOBS + PRIMARY_JOBS + SECONDARY_JOBS).freeze GEO_JOBS = (COMMON_JOBS + PRIMARY_JOBS + SECONDARY_JOBS).freeze
......
...@@ -23,7 +23,10 @@ module Gitlab ...@@ -23,7 +23,10 @@ module Gitlab
delegate :in_replicables_for_current_secondary?, to: :model_record delegate :in_replicables_for_current_secondary?, to: :model_record
class << self class << self
delegate :find_registries_never_attempted_sync, :find_registries_needs_sync_again, to: :registry_class delegate :find_registries_never_attempted_sync,
:find_registries_needs_sync_again,
:fail_sync_timeouts,
to: :registry_class
end end
# Declare supported event # Declare supported event
......
...@@ -18,6 +18,7 @@ RSpec.describe Gitlab::Geo::CronManager, :geo do ...@@ -18,6 +18,7 @@ RSpec.describe Gitlab::Geo::CronManager, :geo do
geo_prune_event_log_worker geo_prune_event_log_worker
geo_verification_cron_worker geo_verification_cron_worker
geo_secondary_usage_data_cron_worker geo_secondary_usage_data_cron_worker
geo_sync_timeout_cron_worker
].freeze ].freeze
def job(name) def job(name)
...@@ -41,7 +42,8 @@ RSpec.describe Gitlab::Geo::CronManager, :geo do ...@@ -41,7 +42,8 @@ RSpec.describe Gitlab::Geo::CronManager, :geo do
job('geo_repository_sync_worker'), job('geo_repository_sync_worker'),
job('geo_container_repository_sync_worker'), job('geo_container_repository_sync_worker'),
job('geo_repository_verification_secondary_scheduler_worker'), job('geo_repository_verification_secondary_scheduler_worker'),
job('geo_secondary_usage_data_cron_worker') job('geo_secondary_usage_data_cron_worker'),
job('geo_sync_timeout_cron_worker')
] ]
end end
......
...@@ -3,6 +3,18 @@ ...@@ -3,6 +3,18 @@
RSpec.shared_examples 'a Geo framework registry' do RSpec.shared_examples 'a Geo framework registry' do
let(:registry_class_factory) { described_class.underscore.tr('/', '_').to_sym } let(:registry_class_factory) { described_class.underscore.tr('/', '_').to_sym }
context 'scopes' do
describe 'sync_timed_out' do
it 'return correct records' do
record = create(registry_class_factory, :started, last_synced_at: 9.hours.ago)
create(registry_class_factory, :started, last_synced_at: 1.hour.ago)
create(registry_class_factory, :failed, last_synced_at: 9.hours.ago)
expect(described_class.sync_timed_out).to eq [record]
end
end
end
context 'finders' do context 'finders' do
let!(:failed_item1) { create(registry_class_factory, :failed) } let!(:failed_item1) { create(registry_class_factory, :failed) }
let!(:failed_item2) { create(registry_class_factory, :failed) } let!(:failed_item2) { create(registry_class_factory, :failed) }
...@@ -52,4 +64,15 @@ RSpec.shared_examples 'a Geo framework registry' do ...@@ -52,4 +64,15 @@ RSpec.shared_examples 'a Geo framework registry' do
end end
end end
end end
describe '.fail_sync_timeouts' do
it 'marks started records as failed if they are expired' do
record1 = create(registry_class_factory, :started, last_synced_at: 9.hours.ago)
create(registry_class_factory, :started, last_synced_at: 1.hour.ago) # not yet expired
described_class.fail_sync_timeouts
expect(record1.reload.state).to eq described_class::STATE_VALUES[:failed]
end
end
end end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Geo::SyncTimeoutCronWorker, :geo do
describe '#perform' do
it 'calls fail_sync_timeouts on enabled Replicators' do
replicator = double('replicator')
expect(replicator).to receive(:fail_sync_timeouts)
expect(Gitlab::Geo).to receive(:enabled_replicator_classes).and_return([replicator])
described_class.new.perform
end
end
it 'uses a cronjob queue' do
expect(subject.sidekiq_options_hash).to include(
'queue' => 'cronjob:geo_sync_timeout_cron',
'queue_namespace' => :cronjob
)
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment