Move Geo repository updated event creation into a worker

Changelog: changed
EE: true
parent 4db3053f
# frozen_string_literal: true
module Repositories
class KeepAroundRefsCreatedEvent < ::Gitlab::EventStore::Event
def schema
{
'type' => 'object',
'properties' => {
'project_id' => { 'type' => 'integer' }
}
}
end
end
end
...@@ -60,7 +60,15 @@ module EE ...@@ -60,7 +60,15 @@ module EE
def keep_around(*shas) def keep_around(*shas)
super super
ensure ensure
log_geo_updated_event # The keep_around method is called from different places. One of them
# is when a pipeline is created. So, in this case, it is still under
# Ci::Pipeline.transaction. It's safe to skip the transaction check
# because we already wrote the refs to the repository on disk.
Sidekiq::Worker.skipping_transaction_check do
::Gitlab::EventStore.publish(
::Repositories::KeepAroundRefsCreatedEvent.new(data: { project_id: project.id })
)
end
end end
override :after_change_head override :after_change_head
......
...@@ -20,10 +20,8 @@ module Geo ...@@ -20,10 +20,8 @@ module Geo
def execute def execute
return false unless Gitlab::Geo.primary? return false unless Gitlab::Geo.primary?
::Gitlab::Database::QueryAnalyzers::PreventCrossDatabaseModification.allow_cross_database_modification_within_transaction(url: 'https://gitlab.com/gitlab-org/gitlab/-/issues/351271') do
reset_repository_checksum! reset_repository_checksum!
create_repository_updated_event! create_repository_updated_event!
end
true true
end end
......
...@@ -507,6 +507,15 @@ ...@@ -507,6 +507,15 @@
:weight: 1 :weight: 1
:idempotent: :idempotent:
:tags: [] :tags: []
- :name: geo:geo_create_repository_updated_event
:worker_name: Geo::CreateRepositoryUpdatedEventWorker
:feature_category: :geo_replication
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: geo:geo_design_repository_shard_sync - :name: geo:geo_design_repository_shard_sync
:worker_name: Geo::DesignRepositoryShardSyncWorker :worker_name: Geo::DesignRepositoryShardSyncWorker
:feature_category: :geo_replication :feature_category: :geo_replication
......
# frozen_string_literal: true
module Geo
class CreateRepositoryUpdatedEventWorker
include ApplicationWorker
include Gitlab::EventStore::Subscriber
include ::GeoQueue
data_consistency :always
idempotent!
def handle_event(event)
Project.find_by_id(event.data[:project_id]).try do |project|
::Geo::RepositoryUpdatedService.new(project.repository).execute
end
end
end
end
...@@ -25,6 +25,9 @@ module EE ...@@ -25,6 +25,9 @@ module EE
store.subscribe ::GitlabSubscriptions::NotifySeatsExceededWorker, to: ::Members::MembersAddedEvent store.subscribe ::GitlabSubscriptions::NotifySeatsExceededWorker, to: ::Members::MembersAddedEvent
store.subscribe ::Security::Findings::DeleteByJobIdWorker, to: ::Ci::JobArtifactsDeletedEvent store.subscribe ::Security::Findings::DeleteByJobIdWorker, to: ::Ci::JobArtifactsDeletedEvent
store.subscribe ::Geo::CreateRepositoryUpdatedEventWorker,
to: ::Repositories::KeepAroundRefsCreatedEvent,
if: -> (_) { ::Gitlab::Geo.primary? }
end end
end end
end end
......
...@@ -8,9 +8,10 @@ RSpec.describe Gitlab::EventStore do ...@@ -8,9 +8,10 @@ RSpec.describe Gitlab::EventStore do
instance = described_class.instance instance = described_class.instance
expect(instance.subscriptions.keys).to include( expect(instance.subscriptions.keys).to include(
::Ci::JobArtifactsDeletedEvent,
::Ci::PipelineCreatedEvent, ::Ci::PipelineCreatedEvent,
::Members::MembersAddedEvent, ::Members::MembersAddedEvent,
::Ci::JobArtifactsDeletedEvent ::Repositories::KeepAroundRefsCreatedEvent
) )
end end
end end
......
...@@ -110,6 +110,8 @@ RSpec.describe Repository do ...@@ -110,6 +110,8 @@ RSpec.describe Repository do
describe '#keep_around' do describe '#keep_around' do
let(:sha) { sample_commit.id } let(:sha) { sample_commit.id }
let(:event) { instance_double('Repositories::KeepAroundRefsCreatedEvent') }
let(:event_data) { { project_id: project.id } }
context 'on a Geo primary' do context 'on a Geo primary' do
before do before do
...@@ -117,18 +119,38 @@ RSpec.describe Repository do ...@@ -117,18 +119,38 @@ RSpec.describe Repository do
end end
context 'when a single SHA is passed' do context 'when a single SHA is passed' do
it 'creates a RepositoryUpdatedEvent' do it 'publishes Repositories::KeepAroundRefsCreatedEvent' do
expect do allow(Repositories::KeepAroundRefsCreatedEvent)
.to receive(:new)
.with(data: event_data)
.and_return(event)
expect(Gitlab::EventStore).to receive(:publish).with(event).once
repository.keep_around(sha) repository.keep_around(sha)
end.to change { ::Geo::RepositoryUpdatedEvent.count }.by(1) end
it 'creates a Geo::RepositoryUpdatedEvent', :sidekiq_inline do
expect { repository.keep_around(sha) }
.to change { ::Geo::RepositoryUpdatedEvent.count }.by(1)
end end
end end
context 'when multiple SHAs are passed' do context 'when multiple SHAs are passed' do
it 'creates exactly one RepositoryUpdatedEvent' do it 'publishes exactly one Repositories::KeepAroundRefsCreatedEvent' do
expect do allow(Repositories::KeepAroundRefsCreatedEvent)
.to receive(:new)
.with(data: event_data)
.and_return(event)
expect(Gitlab::EventStore).to receive(:publish).with(event).once
repository.keep_around(sha, sample_big_commit.id) repository.keep_around(sha, sample_big_commit.id)
end.to change { ::Geo::RepositoryUpdatedEvent.count }.by(1) end
it 'creates exactly one Geo::RepositoryUpdatedEvent', :sidekiq_inline do
expect { repository.keep_around(sha, sample_big_commit.id) }
.to change { ::Geo::RepositoryUpdatedEvent.count }.by(1)
end end
end end
end end
...@@ -138,10 +160,20 @@ RSpec.describe Repository do ...@@ -138,10 +160,20 @@ RSpec.describe Repository do
stub_current_geo_node(secondary_node) stub_current_geo_node(secondary_node)
end end
it 'does not create a RepositoryUpdatedEvent' do it 'publishes a Repositories::KeepAroundRefsCreatedEvent' do
expect do allow(Repositories::KeepAroundRefsCreatedEvent)
repository.keep_around(sha) .to receive(:new)
end.not_to change { ::Geo::RepositoryUpdatedEvent.count } .with(data: event_data)
.and_return(event)
expect(Gitlab::EventStore).to receive(:publish).with(event)
repository.keep_around(sha, sample_big_commit.id)
end
it 'does not create a Geo::RepositoryUpdatedEvent', :sidekiq_inline do
expect { repository.keep_around(sha) }
.not_to change { ::Geo::RepositoryUpdatedEvent.count }
end end
end end
end end
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Geo::CreateRepositoryUpdatedEventWorker do
include ::EE::GeoHelpers
let_it_be(:primary_site) { create(:geo_node, :primary) }
let_it_be(:secondary_site) { create(:geo_node) }
let_it_be(:project) { create(:project, :repository) }
let(:event) { Repositories::KeepAroundRefsCreatedEvent.new(data: { project_id: project.id }) }
subject { consume_event(event) }
def consume_event(event)
described_class.new.perform(event.class.name, event.data)
end
context 'on a Geo primary site' do
before do
stub_current_geo_node(primary_site)
end
include_examples 'an idempotent worker' do
let(:job_args) { [event.class.name, event.data] }
it 'calls replicator#replicate_destroy' do
expect { subject }
.to change { ::Geo::RepositoryUpdatedEvent.count }.by(IdempotentWorkerHelper::WORKER_EXEC_TIMES)
subject
end
end
end
context 'on a Geo secondary site' do
it 'does not create a Geo::RepositoryUpdatedEvent' do
stub_current_geo_node(secondary_site)
expect { subject }
.not_to change { ::Geo::RepositoryUpdatedEvent.count }
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment