Commit 41c4c991 authored by Douglas Barbosa Alexandre's avatar Douglas Barbosa Alexandre

Merge branch...

Merge branch '9446-split-geo-projectsyncworker-into-repositorysyncworker-and-wikisyncworker-option2' into 'master'

Geo: Handle repository and wiki sync separately in Geo::ProjectSyncWorker

Closes #9444, #9446, and #5348

See merge request gitlab-org/gitlab-ee!9360
parents eeb7e2c3 966b4e92
......@@ -223,11 +223,19 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
end
def repository_sync_due?(scheduled_time)
never_synced_repository? || repository_sync_needed?(scheduled_time)
return true if last_repository_synced_at.nil?
return false unless resync_repository?
return false if repository_retry_at && scheduled_time < repository_retry_at
scheduled_time > last_repository_synced_at
end
def wiki_sync_due?(scheduled_time)
never_synced_wiki? || wiki_sync_needed?(scheduled_time)
return true if last_wiki_synced_at.nil?
return false unless resync_wiki?
return false if wiki_retry_at && scheduled_time < wiki_retry_at
scheduled_time > last_wiki_synced_at
end
# Returns whether repository is pending verification check
......@@ -365,28 +373,6 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
"projects/#{project_id}/fetches_since_gc"
end
def never_synced_repository?
last_repository_synced_at.nil?
end
def never_synced_wiki?
last_wiki_synced_at.nil?
end
def repository_sync_needed?(timestamp)
return false unless resync_repository?
return false if repository_retry_at && timestamp < repository_retry_at
last_repository_synced_at && timestamp > last_repository_synced_at
end
def wiki_sync_needed?(timestamp)
return false unless resync_wiki?
return false if wiki_retry_at && timestamp < wiki_retry_at
last_wiki_synced_at && timestamp > last_wiki_synced_at
end
# How many times have we retried syncing it?
#
# @param [String] type must be one of the values in TYPES
......
......@@ -158,7 +158,11 @@ module Geo
def reschedule_sync
log_info("Reschedule #{type} sync because a RepositoryUpdateEvent was processed during the sync")
::Geo::ProjectSyncWorker.perform_async(project.id, Time.now)
::Geo::ProjectSyncWorker.perform_async(
project.id,
sync_repository: type.repository?,
sync_wiki: type.wiki?
)
end
def fail_registry!(message, error, attrs = {})
......@@ -170,7 +174,7 @@ module Geo
end
def type
self.class.type
@type ||= self.class.type.to_s.inquiry
end
def update_delay_in_seconds
......
......@@ -15,7 +15,7 @@ module Geo
end
# rubocop: disable CodeReuse/ActiveRecord
def perform(project_id, scheduled_time)
def perform(project_id, options = {})
registry = Geo::ProjectRegistry.find_or_initialize_by(project_id: project_id)
project = registry.project
......@@ -30,9 +30,37 @@ module Geo
return
end
Geo::RepositorySyncService.new(project).execute if registry.repository_sync_due?(scheduled_time)
Geo::WikiSyncService.new(project).execute if registry.wiki_sync_due?(scheduled_time)
options = extract_options(registry, options)
sync_repository(registry, options)
sync_wiki(registry, options)
end
# rubocop: enable CodeReuse/ActiveRecord
def sync_repository(registry, options)
return unless options[:sync_repository] && registry.resync_repository?
Geo::RepositorySyncService.new(registry.project).execute
end
def sync_wiki(registry, options)
return unless options[:sync_wiki] && registry.resync_wiki?
Geo::WikiSyncService.new(registry.project).execute
end
def extract_options(registry, options)
options.is_a?(Hash) ? options.symbolize_keys : backward_options(registry, options)
end
# Before GitLab 11.8 we used to pass the scheduled time instead of an options hash,
# this method makes the job arguments backward compatible and
# can be removed in any version after GitLab 12.0.
def backward_options(registry, schedule_time)
{
sync_repository: registry.repository_sync_due?(schedule_time),
sync_wiki: registry.wiki_sync_due?(schedule_time)
}
end
end
end
......@@ -42,11 +42,19 @@ module Geo
[1, capacity_per_shard.to_i].max
end
# rubocop: disable CodeReuse/ActiveRecord
def schedule_job(project_id)
job_id = Geo::ProjectSyncWorker.perform_async(project_id, Time.now)
registry = Geo::ProjectRegistry.find_or_initialize_by(project_id: project_id)
job_id = Geo::ProjectSyncWorker.perform_async(
project_id,
sync_repository: registry.repository_sync_due?(Time.now),
sync_wiki: registry.wiki_sync_due?(Time.now)
)
{ project_id: project_id, job_id: job_id } if job_id
end
# rubocop: enable CodeReuse/ActiveRecord
def scheduled_project_ids
scheduled_jobs.map { |data| data[:project_id] }
......
---
title: 'Geo: Handle repository and wiki sync separately in Geo::ProjectSyncWorker'
merge_request: 9360
author:
type: changed
......@@ -12,7 +12,7 @@ module Gitlab
registry.repository_created!(event)
enqueue_job_if_shard_healthy(event) do
::Geo::ProjectSyncWorker.perform_async(event.project_id, Time.now)
::Geo::ProjectSyncWorker.perform_async(event.project_id, sync_repository: true, sync_wiki: true)
end
end
......
......@@ -11,7 +11,11 @@ module Gitlab
registry.repository_updated!(event.source, scheduled_at)
job_id = enqueue_job_if_shard_healthy(event) do
::Geo::ProjectSyncWorker.perform_async(event.project_id, scheduled_at)
::Geo::ProjectSyncWorker.perform_async(
event.project_id,
sync_repository: event.repository?,
sync_wiki: event.wiki?
)
end
log_event(job_id)
......
......@@ -21,7 +21,7 @@ describe Gitlab::Geo::LogCursor::Events::RepositoryUpdatedEvent, :postgresql, :c
allow(Gitlab::ShardHealthCache).to receive(:healthy_shard?).with('broken').and_return(false)
end
RSpec.shared_examples 'RepositoryUpdatedEvent' do
shared_examples 'RepositoryUpdatedEvent' do
it 'creates a new project registry if it does not exist' do
expect { subject.process }.to change(Geo::ProjectRegistry, :count).by(1)
end
......@@ -108,9 +108,37 @@ describe Gitlab::Geo::LogCursor::Events::RepositoryUpdatedEvent, :postgresql, :c
it_behaves_like 'RepositoryUpdatedEvent'
it 'schedules a Geo::ProjectSyncWorker' do
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(project.id, now).once
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(project.id, sync_repository: true, sync_wiki: false).once
Timecop.freeze(now) { subject.process }
subject.process
end
context 'enqueues the job with the proper args' do
let!(:registry) { create(:geo_project_registry, :synced, project: repository_updated_event.project) }
before do
repository_updated_event.update!(source: event_source)
end
context 'enqueues wiki sync' do
let(:event_source) { Geo::RepositoryUpdatedEvent::WIKI }
it 'passes correct options' do
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(project.id, { sync_repository: false, sync_wiki: true })
subject.process
end
end
context 'enqueues repository sync' do
let(:event_source) { Geo::RepositoryUpdatedEvent::REPOSITORY }
it 'passes correct options' do
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(project.id, { sync_repository: true, sync_wiki: false })
subject.process
end
end
end
end
......@@ -120,9 +148,9 @@ describe Gitlab::Geo::LogCursor::Events::RepositoryUpdatedEvent, :postgresql, :c
it_behaves_like 'RepositoryUpdatedEvent'
it 'does not schedule a Geo::ProjectSyncWorker job' do
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(project.id, now)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(project.id, anything)
Timecop.freeze(now) { subject.process }
subject.process
end
end
end
......
......@@ -21,11 +21,20 @@ RSpec.describe Geo::ProjectSyncWorker do
.with(instance_of(Project)).once.and_return(wiki_sync_service)
end
context 'backward compatibility' do
it 'performs sync for the given project when time is passed' do
subject.perform(project.id, Time.now)
expect(repository_sync_service).to have_received(:execute)
expect(wiki_sync_service).to have_received(:execute)
end
end
context 'when project could not be found' do
it 'logs an error and returns' do
expect(subject).to receive(:log_error).with("Couldn't find project, skipping syncing", project_id: 999)
expect { subject.perform(999, Time.now) }.not_to raise_error
expect { subject.perform(999) }.not_to raise_error
end
end
......@@ -35,21 +44,23 @@ RSpec.describe Geo::ProjectSyncWorker do
expect(repository_sync_service).not_to receive(:execute)
expect(wiki_sync_service).not_to receive(:execute)
subject.perform(project_with_broken_storage.id, Time.now)
subject.perform(project_with_broken_storage.id)
end
end
context 'when project repositories has never been synced' do
it 'performs Geo::RepositorySyncService for the given project' do
subject.perform(project.id, Time.now)
subject.perform(project.id, sync_repository: true)
expect(repository_sync_service).to have_received(:execute).once
expect(wiki_sync_service).not_to have_received(:execute)
end
it 'performs Geo::WikiSyncService for the given project' do
subject.perform(project.id, Time.now)
subject.perform(project.id, sync_wiki: true)
expect(wiki_sync_service).to have_received(:execute).once
expect(repository_sync_service).not_to have_received(:execute)
end
end
......@@ -57,13 +68,13 @@ RSpec.describe Geo::ProjectSyncWorker do
let!(:registry) { create(:geo_project_registry, :synced, project: project) }
it 'does not perform Geo::RepositorySyncService for the given project' do
subject.perform(project.id, Time.now)
subject.perform(project.id, sync_repository: true)
expect(repository_sync_service).not_to have_received(:execute)
end
it 'does not perform Geo::WikiSyncService for the given project' do
subject.perform(project.id, Time.now)
subject.perform(project.id, sync_wiki: true)
expect(wiki_sync_service).not_to have_received(:execute)
end
......@@ -73,72 +84,16 @@ RSpec.describe Geo::ProjectSyncWorker do
let!(:registry) { create(:geo_project_registry, :sync_failed, project: project) }
it 'performs Geo::RepositorySyncService for the given project' do
subject.perform(project.id, Time.now)
subject.perform(project.id, sync_repository: true)
expect(repository_sync_service).to have_received(:execute).once
end
it 'performs Geo::WikiSyncService for the given project' do
subject.perform(project.id, Time.now)
subject.perform(project.id, sync_wiki: true)
expect(wiki_sync_service).to have_received(:execute).once
end
end
context 'when project repository is dirty' do
let!(:registry) do
create(:geo_project_registry, :synced, :repository_dirty, project: project)
end
it 'performs Geo::RepositorySyncService for the given project' do
subject.perform(project.id, Time.now)
expect(repository_sync_service).to have_received(:execute).once
end
it 'does not perform Geo::WikiSyncService for the given project' do
subject.perform(project.id, Time.now)
expect(wiki_sync_service).not_to have_received(:execute)
end
end
context 'when wiki is dirty' do
let!(:registry) do
create(:geo_project_registry, :synced, :wiki_dirty, project: project)
end
it 'does not perform Geo::RepositorySyncService for the given project' do
subject.perform(project.id, Time.now)
expect(repository_sync_service).not_to have_received(:execute)
end
it 'performs Geo::WikiSyncService for the given project' do
subject.perform(project.id, Time.now)
expect(wiki_sync_service).to have_received(:execute)
end
end
context 'when project repository was synced after the time the job was scheduled in' do
it 'does not perform Geo::RepositorySyncService for the given project' do
create(:geo_project_registry, :synced, :repository_dirty, project: project, last_repository_synced_at: Time.now)
subject.perform(project.id, Time.now - 5.minutes)
expect(repository_sync_service).not_to have_received(:execute)
end
end
context 'when wiki repository was synced after the time the job was scheduled in' do
it 'does not perform Geo::RepositorySyncService for the given project' do
create(:geo_project_registry, :synced, :wiki_dirty, project: project, last_wiki_synced_at: Time.now)
subject.perform(project.id, Time.now - 5.minutes)
expect(wiki_sync_service).not_to have_received(:execute)
end
end
end
end
......@@ -122,7 +122,7 @@ describe Geo::RepositoryShardSyncWorker, :geo, :delete, :clean_gitlab_redis_cach
it 'does not perform Geo::ProjectSyncWorker for projects that do not belong to selected namespaces to replicate' do
expect(Geo::ProjectSyncWorker).to receive(:perform_async)
.with(unsynced_project_in_restricted_group.id, within(1.minute).of(Time.now))
.with(unsynced_project_in_restricted_group.id, sync_repository: true, sync_wiki: true)
.once
.and_return(spy)
......@@ -134,7 +134,7 @@ describe Geo::RepositoryShardSyncWorker, :geo, :delete, :clean_gitlab_redis_cach
create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project)
expect(Geo::ProjectSyncWorker).to receive(:perform_async)
.with(unsynced_project_in_restricted_group.id, within(1.minute).of(Time.now))
.with(unsynced_project_in_restricted_group.id, sync_repository: true, sync_wiki: false)
.once
.and_return(spy)
......@@ -182,6 +182,32 @@ describe Geo::RepositoryShardSyncWorker, :geo, :delete, :clean_gitlab_redis_cach
end
end
context 'projects that require resync' do
context 'when project repository is dirty' do
it 'syncs repository only' do
create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project)
create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project_in_restricted_group)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(unsynced_project.id, sync_repository: true, sync_wiki: false)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(unsynced_project_in_restricted_group.id, sync_repository: true, sync_wiki: false)
subject.perform(shard_name)
end
end
context 'when project wiki is dirty' do
it 'syncs wiki only' do
create(:geo_project_registry, :synced, :wiki_dirty, project: unsynced_project)
create(:geo_project_registry, :synced, :wiki_dirty, project: unsynced_project_in_restricted_group)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(unsynced_project.id, sync_repository: false, sync_wiki: true)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(unsynced_project_in_restricted_group.id, sync_repository: false, sync_wiki: true)
subject.perform(shard_name)
end
end
end
context 'all repositories fail' do
let!(:project_list) { create_list(:project, 4, :random_last_repository_updated_at) }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment