Commit 1d5b5df0 authored by Nick Thomas's avatar Nick Thomas

Merge branch '3230-reduce-sync-delay' into 'master'

Schedules repository sync when handling events in the Geo Log Cursor

Closes #3230

See merge request gitlab-org/gitlab-ee!2838
parents 47e5871d d7f93733
---
title: Schedule repository synchronization when processing events on a Geo secondary
node
merge_request: 2838
author:
type: changed
...@@ -65,15 +65,15 @@ module Gitlab ...@@ -65,15 +65,15 @@ module Gitlab
next unless can_replay?(event_log) next unless can_replay?(event_log)
if event_log.repository_updated_event if event_log.repository_updated_event
handle_repository_update(event_log) handle_repository_updated(event_log)
elsif event_log.repository_created_event elsif event_log.repository_created_event
handle_repository_created(event_log) handle_repository_created(event_log)
elsif event_log.repository_deleted_event elsif event_log.repository_deleted_event
handle_repository_delete(event_log) handle_repository_deleted(event_log)
elsif event_log.repositories_changed_event elsif event_log.repositories_changed_event
handle_repositories_changed(event_log.repositories_changed_event) handle_repositories_changed(event_log.repositories_changed_event)
elsif event_log.repository_renamed_event elsif event_log.repository_renamed_event
handle_repository_rename(event_log) handle_repository_renamed(event_log)
end end
end end
end end
...@@ -107,95 +107,98 @@ module Gitlab ...@@ -107,95 +107,98 @@ module Gitlab
end end
def handle_repository_created(event_log) def handle_repository_created(event_log)
created_event = event_log.repository_created_event event = event_log.repository_created_event
registry = ::Geo::ProjectRegistry.find_or_initialize_by(project_id: created_event.project_id) registry = find_or_initialize_registry(event.project_id, resync_repository: true, resync_wiki: event.wiki_path.present?)
registry.resync_repository = true
registry.resync_wiki = created_event.wiki_path.present?
log_event_info( log_event_info(
event_log.created_at, event_log.created_at,
message: 'Repository created', message: 'Repository created',
project_id: created_event.project_id, project_id: event.project_id,
repo_path: created_event.repo_path, repo_path: event.repo_path,
wiki_path: created_event.wiki_path, wiki_path: event.wiki_path,
resync_repository: registry.resync_repository, resync_repository: registry.resync_repository,
resync_wiki: registry.resync_wiki) resync_wiki: registry.resync_wiki)
registry.save! registry.save!
end
def handle_repository_update(event)
updated_event = event.repository_updated_event
registry = ::Geo::ProjectRegistry.find_or_initialize_by(project_id: updated_event.project_id)
case updated_event.source ::Geo::ProjectSyncWorker.perform_async(event.project_id, Time.now)
when 'repository'
registry.resync_repository = true
when 'wiki'
registry.resync_wiki = true
end end
def handle_repository_updated(event_log)
event = event_log.repository_updated_event
registry = find_or_initialize_registry(event.project_id, "resync_#{event.source}" => true)
log_event_info( log_event_info(
event.created_at, event_log.created_at,
message: "Repository update", message: 'Repository update',
project_id: updated_event.project_id, project_id: event.project_id,
source: updated_event.source, source: event.source,
resync_repository: registry.resync_repository, resync_repository: registry.resync_repository,
resync_wiki: registry.resync_wiki) resync_wiki: registry.resync_wiki)
registry.save! registry.save!
::Geo::ProjectSyncWorker.perform_async(event.project_id, Time.now)
end end
def handle_repository_delete(event) def handle_repository_deleted(event_log)
deleted_event = event.repository_deleted_event event = event_log.repository_deleted_event
full_path = File.join(deleted_event.repository_storage_path,
deleted_event.deleted_path) disk_path = File.join(event.repository_storage_path, event.deleted_path)
job_id = ::Geo::RepositoryDestroyService job_id = ::Geo::RepositoryDestroyService
.new(deleted_event.project_id, .new(event.project_id, event.deleted_project_name, disk_path, event.repository_storage_name)
deleted_event.deleted_project_name,
full_path,
deleted_event.repository_storage_name)
.async_execute .async_execute
log_event_info(event.created_at,
message: "Deleted project", log_event_info(
project_id: deleted_event.project_id, event_log.created_at,
full_path: full_path, message: 'Deleted project',
project_id: event.project_id,
disk_path: disk_path,
job_id: job_id) job_id: job_id)
# No need to create a project entry if it doesn't exist # No need to create a project entry if it doesn't exist
::Geo::ProjectRegistry.where(project_id: deleted_event.project_id).delete_all ::Geo::ProjectRegistry.where(project_id: event.project_id).delete_all
end end
def handle_repositories_changed(changed_event) def handle_repositories_changed(event)
return unless Gitlab::Geo.current_node.id == changed_event.geo_node_id return unless Gitlab::Geo.current_node.id == event.geo_node_id
job_id = ::Geo::RepositoriesCleanUpWorker.perform_in(1.hour, changed_event.geo_node_id) job_id = ::Geo::RepositoriesCleanUpWorker.perform_in(1.hour, event.geo_node_id)
if job_id if job_id
log_info('Scheduled repositories clean up for Geo node', geo_node_id: changed_event.geo_node_id, job_id: job_id) log_info('Scheduled repositories clean up for Geo node', geo_node_id: event.geo_node_id, job_id: job_id)
else else
log_error('Could not schedule repositories clean up for Geo node', geo_node_id: changed_event.geo_node_id) log_error('Could not schedule repositories clean up for Geo node', geo_node_id: event.geo_node_id)
end end
end end
def handle_repository_rename(event) def handle_repository_renamed(event_log)
renamed_event = event.repository_renamed_event event = event_log.repository_renamed_event
return unless renamed_event.project_id return unless event.project_id
old_path = renamed_event.old_path_with_namespace old_path = event.old_path_with_namespace
new_path = renamed_event.new_path_with_namespace new_path = event.new_path_with_namespace
job_id = ::Geo::MoveRepositoryService job_id = ::Geo::MoveRepositoryService
.new(renamed_event.project_id, "", old_path, new_path) .new(event.project_id, '', old_path, new_path)
.async_execute .async_execute
log_event_info(event.created_at, log_event_info(
message: "Renaming project", event_log.created_at,
project_id: renamed_event.project_id, message: 'Renaming project',
project_id: event.project_id,
old_path: old_path, old_path: old_path,
new_path: new_path, new_path: new_path,
job_id: job_id) job_id: job_id)
end end
def find_or_initialize_registry(project_id, attrs)
registry = ::Geo::ProjectRegistry.find_or_initialize_by(project_id: project_id)
registry.assign_attributes(attrs)
registry
end
def cursor_delay(created_at) def cursor_delay(created_at)
(Time.now - created_at).to_f.round(3) (Time.now - created_at).to_f.round(3)
end end
......
...@@ -65,7 +65,9 @@ describe Gitlab::Geo::DatabaseTasks do ...@@ -65,7 +65,9 @@ describe Gitlab::Geo::DatabaseTasks do
describe described_class::Migrate do describe described_class::Migrate do
describe '.up' do describe '.up' do
it 'requires ENV["VERSION"] to be set' do it 'requires ENV["VERSION"] to be set' do
expect { subject.up }.to raise_error(String) stub_env('VERSION', nil)
expect { subject.up }.to raise_error(/VERSION is required/)
end end
it 'calls ActiveRecord::Migrator.run' do it 'calls ActiveRecord::Migrator.run' do
...@@ -78,7 +80,9 @@ describe Gitlab::Geo::DatabaseTasks do ...@@ -78,7 +80,9 @@ describe Gitlab::Geo::DatabaseTasks do
describe '.down' do describe '.down' do
it 'requires ENV["VERSION"] to be set' do it 'requires ENV["VERSION"] to be set' do
expect { subject.down }.to raise_error(String) stub_env('VERSION', nil)
expect { subject.down }.to raise_error(/VERSION is required/)
end end
it 'calls ActiveRecord::Migrator.run' do it 'calls ActiveRecord::Migrator.run' do
......
...@@ -4,7 +4,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql do ...@@ -4,7 +4,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql do
include ::EE::GeoHelpers include ::EE::GeoHelpers
describe '#run!' do describe '#run!' do
set(:geo_node) { create(:geo_node) } set(:geo_node) { create(:geo_node, :primary) }
before do before do
stub_current_geo_node(geo_node) stub_current_geo_node(geo_node)
...@@ -30,7 +30,8 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql do ...@@ -30,7 +30,8 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql do
end end
context 'when replaying a repository created event' do context 'when replaying a repository created event' do
let(:repository_created_event) { create(:geo_repository_created_event) } let(:project) { create(:project) }
let(:repository_created_event) { create(:geo_repository_created_event, project: project) }
let(:event_log) { create(:geo_event_log, repository_created_event: repository_created_event) } let(:event_log) { create(:geo_event_log, repository_created_event: repository_created_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) } let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
...@@ -47,7 +48,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql do ...@@ -47,7 +48,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql do
registry = Geo::ProjectRegistry.last registry = Geo::ProjectRegistry.last
expect(registry).to have_attributes(resync_repository: true, resync_wiki: true) expect(registry).to have_attributes(project_id: project.id, resync_repository: true, resync_wiki: true)
end end
it 'sets resync_wiki to false if wiki_path is nil' do it 'sets resync_wiki to false if wiki_path is nil' do
...@@ -57,14 +58,22 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql do ...@@ -57,14 +58,22 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql do
registry = Geo::ProjectRegistry.last registry = Geo::ProjectRegistry.last
expect(registry).to have_attributes(resync_repository: true, resync_wiki: false) expect(registry).to have_attributes(project_id: project.id, resync_repository: true, resync_wiki: false)
end
it 'performs Geo::ProjectSyncWorker' do
expect(Geo::ProjectSyncWorker).to receive(:perform_async)
.with(project.id, anything).once
subject.run!
end end
end end
context 'when replaying a repository updated event' do context 'when replaying a repository updated event' do
let(:event_log) { create(:geo_event_log, :updated_event) } let(:project) { create(:project) }
let(:repository_updated_event) { create(:geo_repository_updated_event, project: project) }
let(:event_log) { create(:geo_event_log, repository_updated_event: repository_updated_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) } let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
let(:repository_updated_event) { event_log.repository_updated_event }
before do before do
allow(subject).to receive(:exit?).and_return(false, true) allow(subject).to receive(:exit?).and_return(false, true)
...@@ -91,6 +100,13 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql do ...@@ -91,6 +100,13 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql do
expect(registry.reload.resync_wiki).to be true expect(registry.reload.resync_wiki).to be true
end end
it 'performs Geo::ProjectSyncWorker' do
expect(Geo::ProjectSyncWorker).to receive(:perform_async)
.with(project.id, anything).once
subject.run!
end
end end
context 'when replaying a repository deleted event' do context 'when replaying a repository deleted event' do
...@@ -155,6 +171,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql do ...@@ -155,6 +171,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql do
before do before do
allow(subject).to receive(:exit?).and_return(false, true) allow(subject).to receive(:exit?).and_return(false, true)
allow(Geo::ProjectSyncWorker).to receive(:perform_async)
end end
it 'replays events for projects that belong to selected namespaces to replicate' do it 'replays events for projects that belong to selected namespaces to replicate' do
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment