Commit 9b8c8ea1 authored by Nick Thomas's avatar Nick Thomas

Merge branch...

Merge branch '4957-projectsyncworker-should-skip-projects-that-have-a-broken-gitaly-shard-2' into 'master'

Only process Geo::EventLog events if associated shard is queryable and healthy

Closes #4957

See merge request gitlab-org/gitlab-ee!5537
parents 4432b596 fef2e27a
---
title: Only process Geo::EventLog events if associated shard is queryable and healthy
merge_request:
author:
type: performance
......@@ -42,6 +42,8 @@ module Gitlab
end
end
private
def handle_events(batch)
batch.each do |event_log|
next unless can_replay?(event_log)
......@@ -58,8 +60,6 @@ module Gitlab
end
end
private
def trap_signals
trap(:TERM) do
quit!
......@@ -86,8 +86,19 @@ module Gitlab
Gitlab::Geo.current_node&.projects_include?(event_log.project_id)
end
def healthy_shard_for?(event)
return true unless event.respond_to?(:project)
Gitlab::Geo::ShardHealthCache.healthy_shard?(event.project.repository_storage)
end
def enqueue_job_if_shard_healthy(event)
yield if healthy_shard_for?(event)
end
def handle_repository_created_event(event, created_at)
registry = find_or_initialize_registry(event.project_id, resync_repository: true, resync_wiki: event.wiki_path.present?)
registry = find_or_initialize_registry(event.project_id,
resync_repository: true, resync_wiki: event.wiki_path.present?)
logger.event_info(
created_at,
......@@ -100,7 +111,9 @@ module Gitlab
registry.save!
::Geo::ProjectSyncWorker.perform_async(event.project_id, Time.now)
enqueue_job_if_shard_healthy(event) do
::Geo::ProjectSyncWorker.perform_async(event.project_id, Time.now)
end
end
def handle_repository_updated_event(event, created_at)
......@@ -114,7 +127,9 @@ module Gitlab
registry.save!
job_id = ::Geo::ProjectSyncWorker.perform_async(event.project_id, Time.now)
job_id = enqueue_job_if_shard_healthy(event) do
::Geo::ProjectSyncWorker.perform_async(event.project_id, Time.now)
end
logger.event_info(
created_at,
......@@ -138,6 +153,8 @@ module Gitlab
}
unless skippable
# Must always schedule - https://gitlab.com/gitlab-org/gitlab-ee/issues/3651
# TODO: Wrap in enqueue_job_if_shard_healthy once ^ is resolved
params[:job_id] = ::Geo::RepositoryDestroyService.new(
event.project_id,
event.deleted_project_name,
......@@ -154,6 +171,7 @@ module Gitlab
def handle_repositories_changed_event(event, created_at)
return unless Gitlab::Geo.current_node.id == event.geo_node_id
# Must always schedule, regardless of shard health
job_id = ::Geo::RepositoriesCleanUpWorker.perform_in(1.hour, event.geo_node_id)
if job_id
......@@ -177,6 +195,7 @@ module Gitlab
}
unless skippable
# Must always schedule, regardless of shard health
params[:job_id] = ::Geo::RenameRepositoryService.new(
event.project_id,
event.old_path_with_namespace,
......@@ -203,6 +222,7 @@ module Gitlab
}
unless skippable
# Must always schedule, regardless of shard health
params[:job_id] = ::Geo::HashedStorageMigrationService.new(
event.project_id,
old_disk_path: event.old_disk_path,
......@@ -215,6 +235,7 @@ module Gitlab
end
def handle_hashed_storage_attachments_event(event, created_at)
# Must always schedule, regardless of shard health
job_id = ::Geo::HashedStorageAttachmentsMigrationService.new(
event.project_id,
old_attachments_path: event.old_attachments_path,
......
......@@ -66,12 +66,49 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
end
describe '#run_once!' do
context 'when associated shard is unhealthy' do
let(:project) { create(:project, :broken_storage) }
let(:repository_created_event) { create(:geo_repository_created_event, project: project) }
let(:event_log) { create(:geo_event_log, repository_created_event: repository_created_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
before do
expect(Gitlab::Geo::ShardHealthCache).to receive(:healthy_shard?).with('broken').and_return(false)
end
it 'skips handling the event' do
t = Time.now
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(project.id, t)
Timecop.freeze(t) { daemon.run_once! }
end
end
context 'when there is no associated shard for the event' do
let(:event_log) { create(:geo_event_log, :job_artifact_deleted_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
let(:job_artifact_deleted_event) { event_log.job_artifact_deleted_event }
let(:job_artifact) { job_artifact_deleted_event.job_artifact }
before do
create(:geo_job_artifact_registry, artifact_id: job_artifact.id)
end
it 'handles the event' do
expect(Gitlab::Geo::ShardHealthCache).not_to receive(:healthy_shard?).with('default')
expect { daemon.run_once! }.to change(Geo::JobArtifactRegistry, :count).by(-1)
end
end
context 'when replaying a repository created event' do
let(:project) { create(:project) }
let(:repository_created_event) { create(:geo_repository_created_event, project: project) }
let(:event_log) { create(:geo_event_log, repository_created_event: repository_created_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
before do
allow(Gitlab::Geo::ShardHealthCache).to receive(:healthy_shard?).with('default').and_return(true)
end
it 'creates a new project registry' do
expect { daemon.run_once! }.to change(Geo::ProjectRegistry, :count).by(1)
end
......@@ -108,6 +145,10 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
let(:event_log) { create(:geo_event_log, repository_updated_event: repository_updated_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
before do
allow(Gitlab::Geo::ShardHealthCache).to receive(:healthy_shard?).with('default').and_return(true)
end
it 'creates a new project registry if it does not exist' do
expect { daemon.run_once! }.to change(Geo::ProjectRegistry, :count).by(1)
end
......@@ -234,6 +275,10 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
let!(:registry) { create(:geo_project_registry, :synced, project: project) }
before do
allow(Gitlab::Geo::ShardHealthCache).to receive(:healthy_shard?).with('default').and_return(true)
end
it 'replays events for projects that belong to selected namespaces to replicate' do
secondary.update!(namespaces: [group_1])
......@@ -356,6 +401,10 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
let(:lfs_object_deleted_event) { event_log.lfs_object_deleted_event }
let(:lfs_object) { lfs_object_deleted_event.lfs_object }
before do
allow(Gitlab::Geo::ShardHealthCache).to receive(:healthy_shard?).with('default').and_return(true)
end
it 'does not create a tracking database entry' do
expect { daemon.run_once! }.not_to change(Geo::FileRegistry, :count)
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment