Commit c4ae73ab authored by Michael Kozono's avatar Michael Kozono

Merge branch '32899-handle-race-condition-for-container-registry-sync' into 'master'

Geo: Handle race condition for container synchronization

Closes #32899

See merge request gitlab-org/gitlab!17823
parents f6eb5bf6 cdfb06dc
---
title: 'Geo: Fix race condition for container synchronization'
merge_request: 17823
author:
type: fixed
...@@ -20,12 +20,6 @@ class Geo::ContainerRepositoryRegistry < Geo::BaseRegistry ...@@ -20,12 +20,6 @@ class Geo::ContainerRepositoryRegistry < Geo::BaseRegistry
registry.last_synced_at = Time.now registry.last_synced_at = Time.now
end end
before_transition any => :synced do |registry, _|
registry.retry_count = 0
registry.retry_at = nil
registry.last_sync_failure = nil
end
before_transition any => :pending do |registry, _| before_transition any => :pending do |registry, _|
registry.retry_at = 0 registry.retry_at = 0
registry.retry_count = 0 registry.retry_count = 0
...@@ -35,10 +29,6 @@ class Geo::ContainerRepositoryRegistry < Geo::BaseRegistry ...@@ -35,10 +29,6 @@ class Geo::ContainerRepositoryRegistry < Geo::BaseRegistry
transition [:synced, :failed, :pending] => :started transition [:synced, :failed, :pending] => :started
end end
event :finish_sync! do
transition started: :synced
end
event :repository_updated! do event :repository_updated! do
transition [:synced, :failed, :started] => :pending transition [:synced, :failed, :started] => :pending
end end
...@@ -58,4 +48,26 @@ class Geo::ContainerRepositoryRegistry < Geo::BaseRegistry ...@@ -58,4 +48,26 @@ class Geo::ContainerRepositoryRegistry < Geo::BaseRegistry
retry_at: next_retry_time(new_retry_count) retry_at: next_retry_time(new_retry_count)
) )
end end
def finish_sync!
update!(
retry_count: 0,
last_sync_failure: nil,
retry_at: nil
)
mark_synced_atomically
end
def mark_synced_atomically
# We can only update registry if state is started.
# If state is set to pending that means that repository_updated! was called
# during the sync so we need to reschedule new sync
num_rows = self.class
.where(container_repository_id: container_repository_id)
.where(state: 'started')
.update_all(state: 'synced')
num_rows > 0
end
end end
...@@ -4,11 +4,8 @@ require 'tempfile' ...@@ -4,11 +4,8 @@ require 'tempfile'
module Geo module Geo
class ContainerRepositorySync class ContainerRepositorySync
include ExclusiveLeaseGuard
include Gitlab::Utils::StrongMemoize include Gitlab::Utils::StrongMemoize
LEASE_TIMEOUT = 1.hour.freeze
attr_reader :name, :container_repository attr_reader :name, :container_repository
def initialize(container_repository) def initialize(container_repository)
...@@ -17,9 +14,6 @@ module Geo ...@@ -17,9 +14,6 @@ module Geo
end end
def execute def execute
try_obtain_lease do
# It makes sense to do this sequentially because in most cases images
# share some layers so it can save IO ops.
tags_to_sync.each do |tag| tags_to_sync.each do |tag|
sync_tag(tag[:name]) sync_tag(tag[:name])
end end
...@@ -30,7 +24,6 @@ module Geo ...@@ -30,7 +24,6 @@ module Geo
true true
end end
end
private private
...@@ -87,14 +80,6 @@ module Geo ...@@ -87,14 +80,6 @@ module Geo
secondary_tags - primary_tags secondary_tags - primary_tags
end end
def lease_key
@lease_key ||= "#{self.class.name}:#{name}"
end
def lease_timeout
LEASE_TIMEOUT
end
# The client for primary registry # The client for primary registry
def client def client
strong_memoize(:client) do strong_memoize(:client) do
......
...@@ -2,8 +2,12 @@ ...@@ -2,8 +2,12 @@
module Geo module Geo
class ContainerRepositorySyncService class ContainerRepositorySyncService
include ExclusiveLeaseGuard
include ::Gitlab::Geo::ContainerRepositoryLogHelpers include ::Gitlab::Geo::ContainerRepositoryLogHelpers
LEASE_TIMEOUT = 8.hours.freeze
LEASE_KEY = 'geo_container_sync'
attr_reader :container_repository attr_reader :container_repository
def initialize(container_repository) def initialize(container_repository)
...@@ -11,12 +15,19 @@ module Geo ...@@ -11,12 +15,19 @@ module Geo
end end
def execute def execute
try_obtain_lease do
sync_repository
end
end
def sync_repository
log_info('Marking sync as started') log_info('Marking sync as started')
registry.start_sync! registry.start_sync!
Geo::ContainerRepositorySync.new(container_repository).execute Geo::ContainerRepositorySync.new(container_repository).execute
registry.finish_sync! mark_sync_as_successful
log_info('Finished sync') log_info('Finished sync')
rescue => e rescue => e
fail_registry_sync!("Container repository sync failed", e) fail_registry_sync!("Container repository sync failed", e)
...@@ -24,12 +35,32 @@ module Geo ...@@ -24,12 +35,32 @@ module Geo
private private
def mark_sync_as_successful
persisted = registry.finish_sync!
reschedule_sync unless persisted
end
def reschedule_sync
log_info("Reschedule container sync because a ContainerRepositoryUpdatedEvent was processed during the sync")
Geo::ContainerRepositorySyncWorker.perform_async(container_repository.id)
end
def fail_registry_sync!(message, error) def fail_registry_sync!(message, error)
log_error(message, error) log_error(message, error)
registry.fail_sync!(message, error) registry.fail_sync!(message, error)
end end
def lease_key
@lease_key ||= "#{LEASE_KEY}:#{container_repository.id}"
end
def lease_timeout
LEASE_TIMEOUT
end
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def registry def registry
@registry ||= begin @registry ||= begin
......
...@@ -14,8 +14,10 @@ module Geo ...@@ -14,8 +14,10 @@ module Geo
Sidekiq.logger.warn "Failed #{msg['class']} with #{msg['args']}: #{msg['error_message']}" Sidekiq.logger.warn "Failed #{msg['class']} with #{msg['args']}: #{msg['error_message']}"
end end
attr_reader :repository
def perform(id) def perform(id)
repository = ContainerRepository.find_by_id(id) @repository = ContainerRepository.find_by_id(id)
if repository.nil? if repository.nil?
log_error("Couldn't find container repository, skipping syncing", container_repository_id: id) log_error("Couldn't find container repository, skipping syncing", container_repository_id: id)
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
require 'spec_helper' require 'spec_helper'
describe Geo::ContainerRepositoryRegistry, :geo do describe Geo::ContainerRepositoryRegistry, :geo do
set(:container_repository_registry) { create(:container_repository_registry) } set(:registry) { create(:container_repository_registry) }
describe 'relationships' do describe 'relationships' do
it { is_expected.to belong_to(:container_repository) } it { is_expected.to belong_to(:container_repository) }
...@@ -20,7 +20,7 @@ describe Geo::ContainerRepositoryRegistry, :geo do ...@@ -20,7 +20,7 @@ describe Geo::ContainerRepositoryRegistry, :geo do
result = described_class.repository_id_not_in([container_repository1_id, container_repository2_id]) result = described_class.repository_id_not_in([container_repository1_id, container_repository2_id])
expect(result).to match_ids([container_repository_registry]) expect(result).to match_ids([registry])
end end
end end
end end
...@@ -30,17 +30,42 @@ describe Geo::ContainerRepositoryRegistry, :geo do ...@@ -30,17 +30,42 @@ describe Geo::ContainerRepositoryRegistry, :geo do
end end
describe '#finish_sync!' do describe '#finish_sync!' do
it 'finishes registry record' do let(:registry) { create(:container_repository_registry, :sync_started) }
container_repository_registry = create(:container_repository_registry, :sync_started)
container_repository_registry.finish_sync! it 'finishes registry record' do
registry.finish_sync!
expect(container_repository_registry.reload).to have_attributes( expect(registry.reload).to have_attributes(
retry_count: 0, retry_count: 0,
retry_at: nil, retry_at: nil,
last_sync_failure: nil, last_sync_failure: nil,
state: 'synced' state: 'synced'
) )
end end
context 'when a container sync was scheduled after the last sync began' do
before do
registry.update!(
state: 'pending',
retry_count: 2,
retry_at: 1.hour.ago,
last_sync_failure: 'error'
)
registry.finish_sync!
end
it 'does not reset state' do
expect(registry.reload.state).to eq 'pending'
end
it 'resets the other sync state fields' do
expect(registry.reload).to have_attributes(
retry_count: 0,
retry_at: nil,
last_sync_failure: nil
)
end
end
end end
end end
...@@ -8,34 +8,70 @@ describe Geo::ContainerRepositorySyncService, :geo do ...@@ -8,34 +8,70 @@ describe Geo::ContainerRepositorySyncService, :geo do
set(:secondary) { create(:geo_node) } set(:secondary) { create(:geo_node) }
let(:registry) { create(:container_repository_registry, :sync_started) }
let(:container_repository) { registry.container_repository }
let(:lease_key) { "#{Geo::ContainerRepositorySyncService::LEASE_KEY}:#{container_repository.id}" }
let(:lease_uuid) { 'uuid'}
subject { described_class.new(container_repository) }
before do before do
stub_current_geo_node(secondary) stub_current_geo_node(secondary)
end end
describe '#execute' do context 'lease handling' do
let(:container_repository_registry) { create(:container_repository_registry, :sync_started) } before do
stub_exclusive_lease(lease_key, lease_uuid)
end
it 'returns the lease when sync succeeds' do
registry
expect_to_cancel_exclusive_lease(lease_key, lease_uuid)
allow_any_instance_of(Geo::ContainerRepositorySync).to receive(:execute)
subject.execute
end
it 'returns the lease when sync fails' do
allow_any_instance_of(Geo::ContainerRepositorySync).to receive(:execute)
.and_raise(StandardError)
expect_to_cancel_exclusive_lease(lease_key, lease_uuid)
subject.execute
end
it 'skips syncing repositories if cannot obtain a lease' do
stub_exclusive_lease_taken(lease_key)
expect_any_instance_of(Geo::ContainerRepositorySync).not_to receive(:execute)
subject.execute
end
end
describe '#execute' do
it 'fails registry record if there was exception' do it 'fails registry record if there was exception' do
allow_any_instance_of(Geo::ContainerRepositorySync) allow_any_instance_of(Geo::ContainerRepositorySync)
.to receive(:execute).and_raise 'Sync Error' .to receive(:execute).and_raise 'Sync Error'
described_class.new(container_repository_registry.container_repository).execute described_class.new(registry.container_repository).execute
expect(container_repository_registry.reload.failed?).to be_truthy expect(registry.reload.failed?).to be_truthy
end end
it 'finishes registry record if there was no exception' do it 'finishes registry record if there was no exception' do
expect_any_instance_of(Geo::ContainerRepositorySync) expect_any_instance_of(Geo::ContainerRepositorySync)
.to receive(:execute) .to receive(:execute)
described_class.new(container_repository_registry.container_repository).execute described_class.new(registry.container_repository).execute
expect(container_repository_registry.reload.synced?).to be_truthy expect(registry.reload.synced?).to be_truthy
end end
it 'finishes registry record if there was no exception and registy does not exist' do it 'finishes registry record if there was no exception and registy does not exist' do
container_repository = create(:container_repository)
expect_any_instance_of(Geo::ContainerRepositorySync) expect_any_instance_of(Geo::ContainerRepositorySync)
.to receive(:execute) .to receive(:execute)
...@@ -46,4 +82,15 @@ describe Geo::ContainerRepositorySyncService, :geo do ...@@ -46,4 +82,15 @@ describe Geo::ContainerRepositorySyncService, :geo do
expect(registry.synced?).to be_truthy expect(registry.synced?).to be_truthy
end end
end end
context 'race condition when ContainerRepositoryUpdatedEvent was processed during a sync' do
it 'reschedules the sync' do
allow_any_instance_of(described_class).to receive(:registry).and_return(registry)
expect(::Geo::ContainerRepositorySyncWorker).to receive(:perform_async)
expect(registry).to receive(:finish_sync!).and_return(false)
described_class.new(registry.container_repository).send(:mark_sync_as_successful)
end
end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment