Commit f9b7e76a authored by Nick Thomas's avatar Nick Thomas

Merge branch...

Merge branch '4957-projectsyncworker-should-skip-projects-that-have-a-broken-gitaly-shard-4' into 'master'

Break out Gitlab::Geo::LogCursor::Daemon

Closes #4957

See merge request gitlab-org/gitlab-ee!5596
parents c5f91608 666c6828
...@@ -26,7 +26,6 @@ module Gitlab ...@@ -26,7 +26,6 @@ module Gitlab
end end
lease = Lease.try_obtain_with_ttl { run_once! } lease = Lease.try_obtain_with_ttl { run_once! }
return if exit? return if exit?
# When no new event is found sleep for a few moments # When no new event is found sleep for a few moments
...@@ -38,7 +37,7 @@ module Gitlab ...@@ -38,7 +37,7 @@ module Gitlab
# Wrap this with the connection to make it possible to reconnect if # Wrap this with the connection to make it possible to reconnect if
# PGbouncer dies: https://github.com/rails/rails/issues/29189 # PGbouncer dies: https://github.com/rails/rails/issues/29189
ActiveRecord::Base.connection_pool.with_connection do ActiveRecord::Base.connection_pool.with_connection do
LogCursor::Events.fetch_in_batches { |batch| handle_events(batch) } LogCursor::EventLogs.new.fetch_in_batches { |batch| handle_events(batch) }
end end
end end
...@@ -50,9 +49,7 @@ module Gitlab ...@@ -50,9 +49,7 @@ module Gitlab
begin begin
event = event_log.event event = event_log.event
handler = "handle_#{event.class.name.demodulize.underscore}" event_klass_for(event).new(event, event_log.created_at, logger).process
__send__(handler, event, event_log.created_at) # rubocop:disable GitlabSecurity/PublicSend
rescue NoMethodError => e rescue NoMethodError => e
logger.error(e.message) logger.error(e.message)
raise e raise e
...@@ -60,13 +57,15 @@ module Gitlab ...@@ -60,13 +57,15 @@ module Gitlab
end end
end end
def event_klass_for(event)
event_klass_name = event.class.name.demodulize
current_namespace = self.class.name.deconstantize
Object.const_get("#{current_namespace}::Events::#{event_klass_name}")
end
def trap_signals def trap_signals
trap(:TERM) do trap(:TERM) { quit! }
quit! trap(:INT) { quit! }
end
trap(:INT) do
quit!
end
end end
# Safe shutdown # Safe shutdown
...@@ -86,234 +85,6 @@ module Gitlab ...@@ -86,234 +85,6 @@ module Gitlab
Gitlab::Geo.current_node&.projects_include?(event_log.project_id) Gitlab::Geo.current_node&.projects_include?(event_log.project_id)
end end
def healthy_shard_for?(event)
return true unless event.respond_to?(:project)
Gitlab::Geo::ShardHealthCache.healthy_shard?(event.project.repository_storage)
end
def enqueue_job_if_shard_healthy(event)
yield if healthy_shard_for?(event)
end
def handle_repository_created_event(event, created_at)
registry = find_or_initialize_registry(event.project_id,
resync_repository: true, resync_wiki: event.wiki_path.present?)
logger.event_info(
created_at,
'Repository created',
project_id: event.project_id,
repo_path: event.repo_path,
wiki_path: event.wiki_path,
resync_repository: registry.resync_repository,
resync_wiki: registry.resync_wiki)
registry.save!
enqueue_job_if_shard_healthy(event) do
::Geo::ProjectSyncWorker.perform_async(event.project_id, Time.now)
end
end
def handle_repository_updated_event(event, created_at)
registry = find_or_initialize_registry(
event.project_id,
"resync_#{event.source}" => true,
"#{event.source}_verification_checksum_sha" => nil,
"#{event.source}_checksum_mismatch" => false,
"last_#{event.source}_verification_failure" => nil
)
registry.save!
job_id = enqueue_job_if_shard_healthy(event) do
::Geo::ProjectSyncWorker.perform_async(event.project_id, Time.now)
end
logger.event_info(
created_at,
'Repository update',
project_id: event.project_id,
source: event.source,
resync_repository: registry.resync_repository,
resync_wiki: registry.resync_wiki,
job_id: job_id)
end
def handle_repository_deleted_event(event, created_at)
registry = find_or_initialize_registry(event.project_id)
skippable = registry.new_record?
params = {
project_id: event.project_id,
repository_storage_name: event.repository_storage_name,
disk_path: event.deleted_path,
skippable: skippable
}
unless skippable
# Must always schedule - https://gitlab.com/gitlab-org/gitlab-ee/issues/3651
# TODO: Wrap in enqueue_job_if_shard_healthy once ^ is resolved
params[:job_id] = ::Geo::RepositoryDestroyService.new(
event.project_id,
event.deleted_project_name,
event.deleted_path,
event.repository_storage_name
).async_execute
::Geo::ProjectRegistry.where(project_id: event.project_id).delete_all
end
logger.event_info(created_at, 'Deleted project', params)
end
def handle_repositories_changed_event(event, created_at)
return unless Gitlab::Geo.current_node.id == event.geo_node_id
# Must always schedule, regardless of shard health
job_id = ::Geo::RepositoriesCleanUpWorker.perform_in(1.hour, event.geo_node_id)
if job_id
logger.info('Scheduled repositories clean up for Geo node', geo_node_id: event.geo_node_id, job_id: job_id)
else
logger.error('Could not schedule repositories clean up for Geo node', geo_node_id: event.geo_node_id)
end
end
def handle_repository_renamed_event(event, created_at)
return unless event.project_id
registry = find_or_initialize_registry(event.project_id)
skippable = registry.new_record?
params = {
project_id: event.project_id,
old_path: event.old_path_with_namespace,
new_path: event.new_path_with_namespace,
skippable: skippable
}
unless skippable
# Must always schedule, regardless of shard health
params[:job_id] = ::Geo::RenameRepositoryService.new(
event.project_id,
event.old_path_with_namespace,
event.new_path_with_namespace
).async_execute
end
logger.event_info(created_at, 'Renaming project', params)
end
def handle_hashed_storage_migrated_event(event, created_at)
return unless event.project_id
registry = find_or_initialize_registry(event.project_id)
skippable = registry.new_record?
params = {
project_id: event.project_id,
old_storage_version: event.old_storage_version,
new_storage_version: event.new_storage_version,
old_disk_path: event.old_disk_path,
new_disk_path: event.new_disk_path,
skippable: skippable
}
unless skippable
# Must always schedule, regardless of shard health
params[:job_id] = ::Geo::HashedStorageMigrationService.new(
event.project_id,
old_disk_path: event.old_disk_path,
new_disk_path: event.new_disk_path,
old_storage_version: event.old_storage_version
).async_execute
end
logger.event_info(created_at, 'Migrating project to hashed storage', params)
end
def handle_hashed_storage_attachments_event(event, created_at)
# Must always schedule, regardless of shard health
job_id = ::Geo::HashedStorageAttachmentsMigrationService.new(
event.project_id,
old_attachments_path: event.old_attachments_path,
new_attachments_path: event.new_attachments_path
).async_execute
logger.event_info(
created_at,
'Migrating attachments to hashed storage',
project_id: event.project_id,
old_attachments_path: event.old_attachments_path,
new_attachments_path: event.new_attachments_path,
job_id: job_id
)
end
def handle_lfs_object_deleted_event(event, created_at)
file_path = File.join(LfsObjectUploader.root, event.file_path)
job_id = ::Geo::FileRemovalWorker.perform_async(file_path)
logger.event_info(
created_at,
'Deleted LFS object',
oid: event.oid,
file_id: event.lfs_object_id,
file_path: file_path,
job_id: job_id)
::Geo::FileRegistry.lfs_objects.where(file_id: event.lfs_object_id).delete_all
end
def handle_job_artifact_deleted_event(event, created_at)
file_registry_job_artifacts = ::Geo::JobArtifactRegistry.where(artifact_id: event.job_artifact_id)
return unless file_registry_job_artifacts.any? # avoid race condition
file_path = File.join(::JobArtifactUploader.root, event.file_path)
if File.file?(file_path)
deleted = delete_file(file_path) # delete synchronously to ensure consistency
return unless deleted # do not delete file from registry if deletion failed
end
logger.event_info(
created_at,
'Deleted job artifact',
file_id: event.job_artifact_id,
file_path: file_path)
file_registry_job_artifacts.delete_all
end
def handle_upload_deleted_event(event, created_at)
logger.event_info(
created_at,
'Deleted upload file',
upload_id: event.upload_id,
upload_type: event.upload_type,
file_path: event.file_path,
model_id: event.model_id,
model_type: event.model_type)
::Geo::FileRegistry.where(file_id: event.upload_id, file_type: event.upload_type).delete_all
end
def find_or_initialize_registry(project_id, attrs = nil)
registry = ::Geo::ProjectRegistry.find_or_initialize_by(project_id: project_id)
registry.assign_attributes(attrs)
registry
end
def delete_file(path)
File.delete(path)
rescue => ex
logger.error("Failed to remove file", exception: ex.class.name, details: ex.message, filename: path)
false
end
# Sleeps for the expired TTL that remains on the lease plus some random seconds. # Sleeps for the expired TTL that remains on the lease plus some random seconds.
# #
# This allows multiple GeoLogCursors to randomly process a batch of events, # This allows multiple GeoLogCursors to randomly process a batch of events,
......
module Gitlab
module Geo
module LogCursor
# Manages events from primary database and store state in the DR database
class EventLogs
BATCH_SIZE = 50
# fetches up to BATCH_SIZE next events and keep track of batches
def fetch_in_batches(batch_size: BATCH_SIZE)
::Geo::EventLog.where('id > ?', last_processed).find_in_batches(batch_size: batch_size) do |batch|
yield batch
save_processed(batch.last.id)
break unless Lease.renew!
end
end
private
# saves last replicated event
def save_processed(event_id)
event_state = ::Geo::EventLogState.last || ::Geo::EventLogState.new
event_state.update!(event_id: event_id)
end
# @return [Integer] id of last replicated event
def last_processed
last = ::Geo::EventLogState.last_processed&.id
return last if last
if ::Geo::EventLog.any?
event_id = ::Geo::EventLog.last.id
save_processed(event_id)
event_id
else
-1
end
end
end
end
end
end
module Gitlab module Gitlab
module Geo module Geo
module LogCursor module LogCursor
# Manages events from primary database and store state in the DR database module Events
class Events
BATCH_SIZE = 50
# fetches up to BATCH_SIZE next events and keep track of batches
def self.fetch_in_batches
::Geo::EventLog.where('id > ?', last_processed).find_in_batches(batch_size: BATCH_SIZE) do |batch|
yield batch
save_processed(batch.last.id)
break unless Lease.renew!
end
end
# saves last replicated event
def self.save_processed(event_id)
event_state = ::Geo::EventLogState.last || ::Geo::EventLogState.new
event_state.update!(event_id: event_id)
end
# @return [Integer] id of last replicated event
def self.last_processed
last = ::Geo::EventLogState.last_processed&.id
return last if last
if ::Geo::EventLog.any?
event_id = ::Geo::EventLog.last.id
save_processed(event_id)
event_id
else
-1
end
end
end end
end end
end end
......
module Gitlab
module Geo
module LogCursor
module Events
module BaseEvent
include Utils::StrongMemoize
def initialize(event, created_at, logger)
@event = event
@created_at = created_at
@logger = logger
end
private
attr_reader :event, :created_at, :logger
def registry
@registry ||= find_or_initialize_registry
end
def skippable?
registry.new_record?
end
def healthy_shard_for?(event)
return true unless event.respond_to?(:project)
Gitlab::Geo::ShardHealthCache.healthy_shard?(event.project.repository_storage)
end
def enqueue_job_if_shard_healthy(event)
yield if healthy_shard_for?(event)
end
def find_or_initialize_registry(attrs = nil)
::Geo::ProjectRegistry.find_or_initialize_by(project_id: event.project_id).tap do |registry|
registry.assign_attributes(attrs)
end
end
end
end
end
end
end
module Gitlab
module Geo
module LogCursor
module Events
class HashedStorageAttachmentsEvent
include BaseEvent
def process
job_id = hashed_storage_attachments_migrate
log_event(job_id)
end
private
def hashed_storage_attachments_migrate
# Must always schedule, regardless of shard health
::Geo::HashedStorageAttachmentsMigrationService.new(
event.project_id,
old_attachments_path: event.old_attachments_path,
new_attachments_path: event.new_attachments_path
).async_execute
end
def log_event(job_id)
logger.event_info(
created_at,
'Migrating attachments to hashed storage',
project_id: event.project_id,
old_attachments_path: event.old_attachments_path,
new_attachments_path: event.new_attachments_path,
job_id: job_id
)
end
end
end
end
end
end
module Gitlab
module Geo
module LogCursor
module Events
class HashedStorageMigratedEvent
include BaseEvent
def process
return unless event.project_id
job_id = hashed_storage_migrate unless skippable?
log_event(job_id)
end
private
def hashed_storage_migrate
# Must always schedule, regardless of shard health
::Geo::HashedStorageMigrationService.new(
event.project_id,
old_disk_path: event.old_disk_path,
new_disk_path: event.new_disk_path,
old_storage_version: event.old_storage_version
).async_execute
end
def log_event(job_id)
logger.event_info(
created_at,
'Migrating project to hashed storage',
project_id: event.project_id,
old_storage_version: event.old_storage_version,
new_storage_version: event.new_storage_version,
old_disk_path: event.old_disk_path,
new_disk_path: event.new_disk_path,
skippable: skippable?,
job_id: job_id)
end
end
end
end
end
end
module Gitlab
module Geo
module LogCursor
module Events
class JobArtifactDeletedEvent
include BaseEvent
def process
return unless file_registry_job_artifacts.any? # avoid race condition
# delete synchronously to ensure consistency
if File.file?(file_path) && !delete_file(file_path)
return # do not delete file from registry if deletion failed
end
log_event
file_registry_job_artifacts.delete_all
end
private
def file_registry_job_artifacts
@file_registry_job_artifacts ||= ::Geo::JobArtifactRegistry.where(artifact_id: event.job_artifact_id)
end
def file_path
@file_path ||= File.join(::JobArtifactUploader.root, event.file_path)
end
def log_event
logger.event_info(
created_at,
'Deleted job artifact',
file_id: event.job_artifact_id,
file_path: file_path)
end
def delete_file(path)
File.delete(path)
rescue => ex
logger.error("Failed to remove file", exception: ex.class.name, details: ex.message, filename: path)
false
end
end
end
end
end
end
module Gitlab
module Geo
module LogCursor
module Events
class LfsObjectDeletedEvent
include BaseEvent
def process
# Must always schedule, regardless of shard health
job_id = ::Geo::FileRemovalWorker.perform_async(file_path)
log_event(job_id)
::Geo::FileRegistry.lfs_objects.where(file_id: event.lfs_object_id).delete_all
end
private
def file_path
@file_path ||= File.join(LfsObjectUploader.root, event.file_path)
end
def log_event(job_id)
logger.event_info(
created_at,
'Deleted LFS object',
oid: event.oid,
file_id: event.lfs_object_id,
file_path: file_path,
job_id: job_id)
end
end
end
end
end
end
module Gitlab
module Geo
module LogCursor
module Events
class RepositoriesChangedEvent
include BaseEvent
def process
return unless Gitlab::Geo.current_node.id == event.geo_node_id
# Must always schedule, regardless of shard health
job_id = ::Geo::RepositoriesCleanUpWorker.perform_in(1.hour, event.geo_node_id)
log_event(job_id)
end
private
def log_event(job_id)
if job_id
logger.info('Scheduled repositories clean up for Geo node', geo_node_id: event.geo_node_id, job_id: job_id)
else
logger.error('Could not schedule repositories clean up for Geo node', geo_node_id: event.geo_node_id)
end
end
end
end
end
end
end
module Gitlab
module Geo
module LogCursor
module Events
class RepositoryCreatedEvent
include BaseEvent
def process
log_event
registry.save!
enqueue_job_if_shard_healthy(event) do
::Geo::ProjectSyncWorker.perform_async(event.project_id, Time.now)
end
end
private
def registry
@registry ||= find_or_initialize_registry(
resync_repository: true,
resync_wiki: event.wiki_path.present?)
end
def log_event
logger.event_info(
created_at,
'Repository created',
project_id: event.project_id,
repo_path: event.repo_path,
wiki_path: event.wiki_path,
resync_repository: registry.resync_repository,
resync_wiki: registry.resync_wiki)
end
end
end
end
end
end
module Gitlab
module Geo
module LogCursor
module Events
class RepositoryDeletedEvent
include BaseEvent
def process
job_id = nil
unless skippable?
job_id = destroy_repository
delete_project_registry_entries
end
log_event(job_id)
end
private
def destroy_repository
# Must always schedule, regardless of shard health
::Geo::RepositoryDestroyService.new(
event.project_id,
event.deleted_project_name,
event.deleted_path,
event.repository_storage_name
).async_execute
end
def delete_project_registry_entries
::Geo::ProjectRegistry.where(project_id: event.project_id).delete_all
end
def log_event(job_id)
logger.event_info(
created_at,
'Deleted project',
project_id: event.project_id,
repository_storage_name: event.repository_storage_name,
disk_path: event.deleted_path,
skippable: skippable?,
job_id: job_id
)
end
end
end
end
end
end
module Gitlab
module Geo
module LogCursor
module Events
class RepositoryRenamedEvent
include BaseEvent
def process
return unless event.project_id
job_id = rename_repository unless skippable?
log_event(job_id)
end
private
def rename_repository
# Must always schedule, regardless of shard health
::Geo::RenameRepositoryService.new(
event.project_id,
event.old_path_with_namespace,
event.new_path_with_namespace
).async_execute
end
def log_event(job_id)
logger.event_info(
created_at,
'Renaming project',
project_id: event.project_id,
old_path: event.old_path_with_namespace,
new_path: event.new_path_with_namespace,
skippable: skippable?,
job_id: job_id)
end
end
end
end
end
end
module Gitlab
module Geo
module LogCursor
module Events
class RepositoryUpdatedEvent
include BaseEvent
def process
registry.save!
job_id = enqueue_job_if_shard_healthy(event) do
::Geo::ProjectSyncWorker.perform_async(event.project_id, Time.now)
end
log_event(job_id)
end
private
def registry
@registry ||= find_or_initialize_registry(
"resync_#{event.source}" => true,
"#{event.source}_verification_checksum_sha" => nil,
"#{event.source}_checksum_mismatch" => false,
"last_#{event.source}_verification_failure" => nil
)
end
def log_event(job_id)
logger.event_info(
created_at,
'Repository update',
project_id: event.project_id,
source: event.source,
resync_repository: registry.resync_repository,
resync_wiki: registry.resync_wiki,
job_id: job_id)
end
end
end
end
end
end
module Gitlab
module Geo
module LogCursor
module Events
class UploadDeletedEvent
include BaseEvent
def process
log_event
::Geo::FileRegistry.where(file_id: event.upload_id, file_type: event.upload_type).delete_all
end
private
def log_event
logger.event_info(
created_at,
'Deleted upload file',
upload_id: event.upload_id,
upload_type: event.upload_type,
file_path: event.file_path,
model_id: event.model_id,
model_type: event.model_type)
end
end
end
end
end
end
...@@ -66,201 +66,15 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared ...@@ -66,201 +66,15 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
end end
describe '#run_once!' do describe '#run_once!' do
context 'when associated shard is unhealthy' do context 'with some event logs' do
let(:project) { create(:project, :broken_storage) }
let(:repository_created_event) { create(:geo_repository_created_event, project: project) }
let(:event_log) { create(:geo_event_log, repository_created_event: repository_created_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
before do
expect(Gitlab::Geo::ShardHealthCache).to receive(:healthy_shard?).with('broken').and_return(false)
end
it 'skips handling the event' do
t = Time.now
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(project.id, t)
Timecop.freeze(t) { daemon.run_once! }
end
end
context 'when there is no associated shard for the event' do
let(:event_log) { create(:geo_event_log, :job_artifact_deleted_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
let(:job_artifact_deleted_event) { event_log.job_artifact_deleted_event }
let(:job_artifact) { job_artifact_deleted_event.job_artifact }
before do
create(:geo_job_artifact_registry, artifact_id: job_artifact.id)
end
it 'handles the event' do
expect(Gitlab::Geo::ShardHealthCache).not_to receive(:healthy_shard?).with('default')
expect { daemon.run_once! }.to change(Geo::JobArtifactRegistry, :count).by(-1)
end
end
context 'when replaying a repository created event' do
let(:project) { create(:project) }
let(:repository_created_event) { create(:geo_repository_created_event, project: project) }
let(:event_log) { create(:geo_event_log, repository_created_event: repository_created_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
before do
allow(Gitlab::Geo::ShardHealthCache).to receive(:healthy_shard?).with('default').and_return(true)
end
it 'creates a new project registry' do
expect { daemon.run_once! }.to change(Geo::ProjectRegistry, :count).by(1)
end
it 'sets resync attributes to true' do
daemon.run_once!
registry = Geo::ProjectRegistry.last
expect(registry).to have_attributes(project_id: project.id, resync_repository: true, resync_wiki: true)
end
it 'sets resync_wiki to false if wiki_path is nil' do
repository_created_event.update!(wiki_path: nil)
daemon.run_once!
registry = Geo::ProjectRegistry.last
expect(registry).to have_attributes(project_id: project.id, resync_repository: true, resync_wiki: false)
end
it 'performs Geo::ProjectSyncWorker' do
expect(Geo::ProjectSyncWorker).to receive(:perform_async)
.with(project.id, anything).once
daemon.run_once!
end
end
context 'when replaying a repository updated event' do
let(:project) { create(:project) } let(:project) { create(:project) }
let(:repository_updated_event) { create(:geo_repository_updated_event, project: project) } let(:repository_updated_event) { create(:geo_repository_updated_event, project: project) }
let(:event_log) { create(:geo_event_log, repository_updated_event: repository_updated_event) } let(:event_log) { create(:geo_event_log, repository_updated_event: repository_updated_event) }
let(:batch) { [event_log] }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) } let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
before do it 'handles events' do
allow(Gitlab::Geo::ShardHealthCache).to receive(:healthy_shard?).with('default').and_return(true) expect(daemon).to receive(:handle_events).with(batch)
end
it 'creates a new project registry if it does not exist' do
expect { daemon.run_once! }.to change(Geo::ProjectRegistry, :count).by(1)
end
context 'when event source is repository' do
let!(:registry) { create(:geo_project_registry, :synced, :repository_verified, :repository_checksum_mismatch, project: repository_updated_event.project) }
before do
repository_updated_event.update!(source: Geo::RepositoryUpdatedEvent::REPOSITORY)
end
it 'sets resync_repository to true' do
daemon.run_once!
expect(registry.reload.resync_repository).to be true
end
it 'resets the repository verification fields' do
daemon.run_once!
expect(registry.reload).to have_attributes(
repository_verification_checksum_sha: nil,
repository_checksum_mismatch: false,
last_repository_verification_failure: nil
)
end
end
context 'when event source is wiki' do
let!(:registry) { create(:geo_project_registry, :synced, :wiki_verified, :wiki_checksum_mismatch, project: repository_updated_event.project) }
before do
repository_updated_event.update!(source: Geo::RepositoryUpdatedEvent::WIKI)
end
it 'sets resync_wiki to true' do
daemon.run_once!
expect(registry.reload.resync_wiki).to be true
end
it 'resets the wiki repository verification fields' do
daemon.run_once!
expect(registry.reload).to have_attributes(
wiki_verification_checksum_sha: nil,
wiki_checksum_mismatch: false,
last_wiki_verification_failure: nil
)
end
end
it 'performs Geo::ProjectSyncWorker' do
expect(Geo::ProjectSyncWorker).to receive(:perform_async)
.with(project.id, anything).once
daemon.run_once!
end
end
context 'when replaying a repository deleted event' do
let(:event_log) { create(:geo_event_log, :deleted_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
let(:repository_deleted_event) { event_log.repository_deleted_event }
let(:project) { repository_deleted_event.project }
let(:deleted_project_name) { repository_deleted_event.deleted_project_name }
let(:deleted_path) { repository_deleted_event.deleted_path }
context 'when a tracking entry does not exist' do
it 'does not schedule a GeoRepositoryDestroyWorker' do
expect(::GeoRepositoryDestroyWorker).not_to receive(:perform_async)
.with(project.id, deleted_project_name, deleted_path, project.repository_storage)
daemon.run_once!
end
it 'does not create a tracking entry' do
expect { daemon.run_once! }.not_to change(Geo::ProjectRegistry, :count)
end
end
context 'when a tracking entry exists' do
let!(:tracking_entry) { create(:geo_project_registry, project: project) }
it 'schedules a GeoRepositoryDestroyWorker' do
expect(::GeoRepositoryDestroyWorker).to receive(:perform_async)
.with(project.id, deleted_project_name, deleted_path, project.repository_storage)
daemon.run_once!
end
it 'removes the tracking entry' do
expect { daemon.run_once! }.to change(Geo::ProjectRegistry, :count).by(-1)
end
end
end
context 'when replaying a repositories changed event' do
let(:repositories_changed_event) { create(:geo_repositories_changed_event, geo_node: secondary) }
let(:event_log) { create(:geo_event_log, repositories_changed_event: repositories_changed_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
it 'schedules a GeoRepositoryDestroyWorker when event node is the current node' do
expect(Geo::RepositoriesCleanUpWorker).to receive(:perform_in).with(within(5.minutes).of(1.hour), secondary.id)
daemon.run_once!
end
it 'does not schedule a GeoRepositoryDestroyWorker when event node is not the current node' do
stub_current_geo_node(build(:geo_node))
expect(Geo::RepositoriesCleanUpWorker).not_to receive(:perform_in)
daemon.run_once! daemon.run_once!
end end
...@@ -282,8 +96,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared ...@@ -282,8 +96,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
it 'replays events for projects that belong to selected namespaces to replicate' do it 'replays events for projects that belong to selected namespaces to replicate' do
secondary.update!(namespaces: [group_1]) secondary.update!(namespaces: [group_1])
expect(Geo::ProjectSyncWorker).to receive(:perform_async) expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(project.id, anything).once
.with(project.id, anything).once
daemon.run_once! daemon.run_once!
end end
...@@ -291,8 +104,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared ...@@ -291,8 +104,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
it 'does not replay events for projects that do not belong to selected namespaces to replicate' do it 'does not replay events for projects that do not belong to selected namespaces to replicate' do
secondary.update!(selective_sync_type: 'namespaces', namespaces: [group_2]) secondary.update!(selective_sync_type: 'namespaces', namespaces: [group_2])
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async) expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(project.id, anything)
.with(project.id, anything)
daemon.run_once! daemon.run_once!
end end
...@@ -300,253 +112,10 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared ...@@ -300,253 +112,10 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
it 'does not replay events for projects that do not belong to selected shards to replicate' do it 'does not replay events for projects that do not belong to selected shards to replicate' do
secondary.update!(selective_sync_type: 'shards', selective_sync_shards: ['broken']) secondary.update!(selective_sync_type: 'shards', selective_sync_shards: ['broken'])
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async) expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(project.id, anything)
.with(project.id, anything)
daemon.run_once!
end
end
context 'when processing a repository renamed event' do
let(:event_log) { create(:geo_event_log, :renamed_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
let(:repository_renamed_event) { event_log.repository_renamed_event }
let(:project) {repository_renamed_event.project }
let(:old_path_with_namespace) { repository_renamed_event.old_path_with_namespace }
let(:new_path_with_namespace) { repository_renamed_event.new_path_with_namespace }
context 'when a tracking entry does not exist' do
it 'does not create a tracking entry' do
expect { daemon.run_once! }.not_to change(Geo::ProjectRegistry, :count)
end
it 'does not schedule a Geo::RenameRepositoryWorker' do
expect(::Geo::RenameRepositoryWorker).not_to receive(:perform_async)
.with(project.id, old_path_with_namespace, new_path_with_namespace)
daemon.run_once!
end
end
context 'when a tracking entry does exists' do
it 'schedules a Geo::RenameRepositoryWorker' do
create(:geo_project_registry, project: project)
expect(::Geo::RenameRepositoryWorker).to receive(:perform_async)
.with(project.id, old_path_with_namespace, new_path_with_namespace)
daemon.run_once!
end
end
end
context 'when processing a hashed storage migration event' do
let(:event_log) { create(:geo_event_log, :hashed_storage_migration_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
let(:hashed_storage_migrated_event) { event_log.hashed_storage_migrated_event }
let(:project) { hashed_storage_migrated_event.project }
let(:old_disk_path) { hashed_storage_migrated_event.old_disk_path }
let(:new_disk_path) { hashed_storage_migrated_event.new_disk_path }
let(:old_storage_version) { hashed_storage_migrated_event.old_storage_version }
context 'when a tracking entry does not exist' do
it 'does not create a tracking entry' do
expect { daemon.run_once! }.not_to change(Geo::ProjectRegistry, :count)
end
it 'does not schedule a Geo::HashedStorageMigrationWorker' do
expect(::Geo::HashedStorageMigrationWorker).not_to receive(:perform_async)
.with(project.id, old_disk_path, new_disk_path, old_storage_version)
daemon.run_once!
end
end
context 'when a tracking entry exists' do
it 'schedules a Geo::HashedStorageMigrationWorker' do
create(:geo_project_registry, project: project)
expect(::Geo::HashedStorageMigrationWorker).to receive(:perform_async)
.with(project.id, old_disk_path, new_disk_path, old_storage_version)
daemon.run_once!
end
end
end
context 'when processing an attachment migration event to hashed storage' do
let(:event_log) { create(:geo_event_log, :hashed_storage_attachments_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
let(:hashed_storage_attachments_event) { event_log.hashed_storage_attachments_event }
it 'does not create a new project registry' do
expect { daemon.run_once! }.not_to change(Geo::ProjectRegistry, :count)
end
it 'schedules a Geo::HashedStorageAttachmentsMigrationWorker' do
project = hashed_storage_attachments_event.project
old_attachments_path = hashed_storage_attachments_event.old_attachments_path
new_attachments_path = hashed_storage_attachments_event.new_attachments_path
expect(::Geo::HashedStorageAttachmentsMigrationWorker).to receive(:perform_async)
.with(project.id, old_attachments_path, new_attachments_path)
daemon.run_once!
end
end
context 'when replaying a LFS object deleted event' do
let(:event_log) { create(:geo_event_log, :lfs_object_deleted_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
let(:lfs_object_deleted_event) { event_log.lfs_object_deleted_event }
let(:lfs_object) { lfs_object_deleted_event.lfs_object }
before do
allow(Gitlab::Geo::ShardHealthCache).to receive(:healthy_shard?).with('default').and_return(true)
end
it 'does not create a tracking database entry' do
expect { daemon.run_once! }.not_to change(Geo::FileRegistry, :count)
end
it 'schedules a Geo::FileRemovalWorker' do
file_path = File.join(LfsObjectUploader.root, lfs_object_deleted_event.file_path)
expect(::Geo::FileRemovalWorker).to receive(:perform_async)
.with(file_path)
daemon.run_once! daemon.run_once!
end end
it 'removes the tracking database entry if exist' do
create(:geo_file_registry, :lfs, file_id: lfs_object.id)
expect { daemon.run_once! }.to change(Geo::FileRegistry.lfs_objects, :count).by(-1)
end
end
context 'when replaying a upload deleted event' do
context 'with default handling' do
let(:event_log) { create(:geo_event_log, :upload_deleted_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
let(:upload_deleted_event) { event_log.upload_deleted_event }
let(:upload) { upload_deleted_event.upload }
it 'does not create a tracking database entry' do
expect { daemon.run_once! }.not_to change(Geo::FileRegistry, :count)
end
it 'removes the tracking database entry if exist' do
create(:geo_file_registry, :avatar, file_id: upload.id)
expect { daemon.run_once! }.to change(Geo::FileRegistry.attachments, :count).by(-1)
end
end
end
context 'when replaying a job artifact event' do
let(:event_log) { create(:geo_event_log, :job_artifact_deleted_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
let(:job_artifact_deleted_event) { event_log.job_artifact_deleted_event }
let(:job_artifact) { job_artifact_deleted_event.job_artifact }
context 'with a tracking database entry' do
before do
create(:geo_job_artifact_registry, artifact_id: job_artifact.id)
end
context 'with a file' do
context 'when the delete succeeds' do
it 'removes the tracking database entry' do
expect { daemon.run_once! }.to change(Geo::JobArtifactRegistry, :count).by(-1)
end
it 'deletes the file' do
expect { daemon.run_once! }.to change { File.exist?(job_artifact.file.path) }.from(true).to(false)
end
end
context 'when the delete fails' do
before do
expect(daemon).to receive(:delete_file).and_return(false)
end
it 'does not remove the tracking database entry' do
expect { daemon.run_once! }.not_to change(Geo::JobArtifactRegistry, :count)
end
end
end
context 'without a file' do
before do
FileUtils.rm(job_artifact.file.path)
end
it 'removes the tracking database entry' do
expect { daemon.run_once! }.to change(Geo::JobArtifactRegistry, :count).by(-1)
end
end
end
context 'without a tracking database entry' do
it 'does not create a tracking database entry' do
expect { daemon.run_once! }.not_to change(Geo::JobArtifactRegistry, :count)
end
it 'does not delete the file (yet, due to possible race condition)' do
expect { daemon.run_once! }.not_to change { File.exist?(job_artifact.file.path) }.from(true)
end
end
end
end
describe '#delete_file' do
context 'when the file exists' do
let!(:file) { fixture_file_upload(Rails.root + "spec/fixtures/dk.png", "`/png") }
context 'when the delete does not raise an exception' do
it 'returns true' do
expect(daemon.send(:delete_file, file.path)).to be_truthy
end
it 'does not log an error' do
expect(daemon).not_to receive(:logger)
daemon.send(:delete_file, file.path)
end
end
context 'when the delete raises an exception' do
before do
expect(File).to receive(:delete).and_raise('something went wrong')
end
it 'returns false' do
expect(daemon.send(:delete_file, file.path)).to be_falsey
end
it 'logs an error' do
logger = double(logger)
expect(daemon).to receive(:logger).and_return(logger)
expect(logger).to receive(:error).with('Failed to remove file', exception: 'RuntimeError', details: 'something went wrong', filename: file.path)
daemon.send(:delete_file, file.path)
end
end
end
context 'when the file does not exist' do
it 'returns false' do
expect(daemon.send(:delete_file, '/does/not/exist')).to be_falsey
end
it 'logs an error' do
logger = double(logger)
expect(daemon).to receive(:logger).and_return(logger)
expect(logger).to receive(:error).with('Failed to remove file', exception: 'Errno::ENOENT', details: 'No such file or directory @ unlink_internal - /does/not/exist', filename: '/does/not/exist')
daemon.send(:delete_file, '/does/not/exist')
end
end end
end end
end end
require 'spec_helper'
describe Gitlab::Geo::LogCursor::EventLogs, :postgresql, :clean_gitlab_redis_shared_state do
subject { described_class.new }
describe '#fetch_in_batches' do
context 'when there are no event_logs' do
it 'does not yield a group of events' do
expect { |b| subject.fetch_in_batches(&b) }.not_to yield_control
end
end
context 'when there are event logs' do
let!(:event_log_1) { create(:geo_event_log) }
let!(:event_log_2) { create(:geo_event_log) }
context 'when there is no event_log_state' do
it 'does not yields a group of events' do
expect { |b| subject.fetch_in_batches(&b) }.not_to yield_with_args([event_log_1, event_log_2])
end
end
context 'when there is already an event_log_state' do
before do
create(:geo_event_log_state, event_id: event_log_1.id - 1)
end
it 'saves last event as last processed after yielding' do
subject.fetch_in_batches { |batch| batch }
expect(Geo::EventLogState.last.event_id).to eq(event_log_2.id)
end
end
end
end
end
require 'spec_helper'
describe Gitlab::Geo::LogCursor::Events::HashedStorageAttachmentsEvent, :postgresql, :clean_gitlab_redis_shared_state do
let(:logger) { Gitlab::Geo::LogCursor::Logger.new(described_class, Logger::INFO) }
let(:event_log) { create(:geo_event_log, :hashed_storage_attachments_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
let(:hashed_storage_attachments_event) { event_log.hashed_storage_attachments_event }
let(:project) { hashed_storage_attachments_event.project }
let(:old_attachments_path) { hashed_storage_attachments_event.old_attachments_path }
let(:new_attachments_path) { hashed_storage_attachments_event.new_attachments_path }
subject { described_class.new(hashed_storage_attachments_event, Time.now, logger) }
around do |example|
Sidekiq::Testing.fake! { example.run }
end
describe '#process' do
it 'does not create a new project registry' do
expect { subject.process }.not_to change(Geo::ProjectRegistry, :count)
end
it 'schedules a Geo::HashedStorageAttachmentsMigrationWorker' do
expect(::Geo::HashedStorageAttachmentsMigrationWorker).to receive(:perform_async)
.with(project.id, old_attachments_path, new_attachments_path)
subject.process
end
end
end
require 'spec_helper'
describe Gitlab::Geo::LogCursor::Events::HashedStorageMigratedEvent, :postgresql, :clean_gitlab_redis_shared_state do
let(:logger) { Gitlab::Geo::LogCursor::Logger.new(described_class, Logger::INFO) }
let(:event_log) { create(:geo_event_log, :hashed_storage_migration_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
let(:hashed_storage_migrated_event) { event_log.hashed_storage_migrated_event }
let(:project) { hashed_storage_migrated_event.project }
let(:old_disk_path) { hashed_storage_migrated_event.old_disk_path }
let(:new_disk_path) { hashed_storage_migrated_event.new_disk_path }
let(:old_storage_version) { hashed_storage_migrated_event.old_storage_version }
subject { described_class.new(hashed_storage_migrated_event, Time.now, logger) }
around do |example|
Sidekiq::Testing.fake! { example.run }
end
describe '#process' do
context 'when a tracking entry does not exist' do
it 'does not create a tracking entry' do
expect { subject.process }.not_to change(Geo::ProjectRegistry, :count)
end
it 'does not schedule a Geo::HashedStorageMigrationWorker' do
expect(::Geo::HashedStorageMigrationWorker).not_to receive(:perform_async)
.with(project.id, old_disk_path, new_disk_path, old_storage_version)
subject.process
end
end
it 'schedules a Geo::HashedStorageMigrationWorker' do
create(:geo_project_registry, project: project)
expect(::Geo::HashedStorageMigrationWorker).to receive(:perform_async)
.with(project.id, old_disk_path, new_disk_path, old_storage_version)
subject.process
end
end
end
require 'spec_helper'
describe Gitlab::Geo::LogCursor::Events::JobArtifactDeletedEvent, :postgresql, :clean_gitlab_redis_shared_state do
let(:logger) { Gitlab::Geo::LogCursor::Logger.new(described_class, Logger::INFO) }
let(:event_log) { create(:geo_event_log, :job_artifact_deleted_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
let(:job_artifact_deleted_event) { event_log.job_artifact_deleted_event }
let(:job_artifact) { job_artifact_deleted_event.job_artifact }
subject { described_class.new(job_artifact_deleted_event, Time.now, logger) }
around do |example|
Sidekiq::Testing.fake! { example.run }
end
describe '#process' do
context 'with a tracking database entry' do
before do
create(:geo_job_artifact_registry, artifact_id: job_artifact.id)
end
context 'with a file' do
context 'when the delete succeeds' do
it 'removes the tracking database entry' do
expect { subject.process }.to change(Geo::JobArtifactRegistry, :count).by(-1)
end
it 'deletes the file' do
expect { subject.process }.to change { File.exist?(job_artifact.file.path) }.from(true).to(false)
end
end
context 'when the delete fails' do
before do
expect(File).to receive(:delete).with(job_artifact.file.path).and_raise("Cannot delete")
end
it 'does not remove the tracking database entry' do
expect { subject.process }.not_to change(Geo::JobArtifactRegistry, :count)
end
end
end
context 'without a file' do
before do
FileUtils.rm(job_artifact.file.path)
end
it 'removes the tracking database entry' do
expect { subject.process }.to change(Geo::JobArtifactRegistry, :count).by(-1)
end
end
end
context 'without a tracking database entry' do
it 'does not create a tracking database entry' do
expect { subject.process }.not_to change(Geo::JobArtifactRegistry, :count)
end
it 'does not delete the file (yet, due to possible race condition)' do
expect { subject.process }.not_to change { File.exist?(job_artifact.file.path) }.from(true)
end
end
end
end
require 'spec_helper'
describe Gitlab::Geo::LogCursor::Events::LfsObjectDeletedEvent, :postgresql, :clean_gitlab_redis_shared_state do
let(:logger) { Gitlab::Geo::LogCursor::Logger.new(described_class, Logger::INFO) }
let(:event_log) { create(:geo_event_log, :lfs_object_deleted_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
let(:lfs_object_deleted_event) { event_log.lfs_object_deleted_event }
let(:lfs_object) { lfs_object_deleted_event.lfs_object }
subject { described_class.new(lfs_object_deleted_event, Time.now, logger) }
around do |example|
Sidekiq::Testing.fake! { example.run }
end
describe '#process' do
it 'does not create a tracking database entry' do
expect { subject.process }.not_to change(Geo::FileRegistry, :count)
end
it 'removes the tracking database entry if exist' do
create(:geo_file_registry, :lfs, file_id: lfs_object.id)
expect { subject.process }.to change(Geo::FileRegistry.lfs_objects, :count).by(-1)
end
it 'schedules a Geo::FileRemovalWorker job' do
file_path = File.join(LfsObjectUploader.root, lfs_object_deleted_event.file_path)
expect(::Geo::FileRemovalWorker).to receive(:perform_async).with(file_path)
subject.process
end
end
end
require 'spec_helper'
describe Gitlab::Geo::LogCursor::Events::RepositoriesChangedEvent, :postgresql, :clean_gitlab_redis_shared_state do
include ::EE::GeoHelpers
let(:logger) { Gitlab::Geo::LogCursor::Logger.new(described_class, Logger::INFO) }
set(:secondary) { create(:geo_node) }
let(:repositories_changed_event) { create(:geo_repositories_changed_event, geo_node: secondary) }
let(:event_log) { create(:geo_event_log, repositories_changed_event: repositories_changed_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
subject { described_class.new(repositories_changed_event, Time.now, logger) }
around do |example|
Sidekiq::Testing.fake! { example.run }
end
before do
stub_current_geo_node(secondary)
end
describe '#process' do
it 'schedules a GeoRepositoryDestroyWorker when event node is the current node' do
expect(Geo::RepositoriesCleanUpWorker).to receive(:perform_in).with(within(5.minutes).of(1.hour), secondary.id)
subject.process
end
it 'does not schedule a GeoRepositoryDestroyWorker when event node is not the current node' do
stub_current_geo_node(build(:geo_node))
expect(Geo::RepositoriesCleanUpWorker).not_to receive(:perform_in)
subject.process
end
end
end
require 'spec_helper'
describe Gitlab::Geo::LogCursor::Events::RepositoryCreatedEvent, :postgresql, :clean_gitlab_redis_shared_state do
let(:logger) { Gitlab::Geo::LogCursor::Logger.new(described_class, Logger::INFO) }
let(:project) { create(:project) }
let(:repository_created_event) { create(:geo_repository_created_event, project: project) }
let(:event_log) { create(:geo_event_log, repository_created_event: repository_created_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
subject { described_class.new(repository_created_event, Time.now, logger) }
around do |example|
Sidekiq::Testing.fake! { example.run }
end
RSpec.shared_examples 'RepositoryCreatedEvent' do
it 'creates a new project registry' do
expect { subject.process }.to change(Geo::ProjectRegistry, :count).by(1)
end
it 'sets resync attributes to true' do
subject.process
registry = Geo::ProjectRegistry.last
expect(registry).to have_attributes(project_id: project.id, resync_repository: true, resync_wiki: true)
end
it 'sets resync_wiki to false if wiki_path is nil' do
repository_created_event.update!(wiki_path: nil)
subject.process
registry = Geo::ProjectRegistry.last
expect(registry).to have_attributes(project_id: project.id, resync_repository: true, resync_wiki: false)
end
end
describe '#process' do
before do
allow(Gitlab::Geo::ShardHealthCache).to receive(:healthy_shard?).with('default').and_return(healthy)
end
context 'when the associated shard is healthy' do
let(:healthy) { true }
it_behaves_like 'RepositoryCreatedEvent'
it 'schedules a Geo::ProjectSyncWorker' do
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(project.id, anything).once
subject.process
end
end
context 'when the associated shard is not healthy' do
let(:healthy) { false }
it_behaves_like 'RepositoryCreatedEvent'
it 'does not schedule a Geo::ProjectSyncWorker job' do
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(project.id, anything)
subject.process
end
end
end
end
require 'spec_helper'
describe Gitlab::Geo::LogCursor::Events::RepositoryDeletedEvent, :postgresql, :clean_gitlab_redis_shared_state do
let(:logger) { Gitlab::Geo::LogCursor::Logger.new(described_class, Logger::INFO) }
let(:event_log) { create(:geo_event_log, :deleted_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
let(:repository_deleted_event) { event_log.repository_deleted_event }
let(:project) { repository_deleted_event.project }
let(:deleted_project_name) { repository_deleted_event.deleted_project_name }
let(:deleted_path) { repository_deleted_event.deleted_path }
subject { described_class.new(repository_deleted_event, Time.now, logger) }
around do |example|
Sidekiq::Testing.fake! { example.run }
end
describe '#process' do
context 'when a tracking entry does not exist' do
it 'does not schedule a GeoRepositoryDestroyWorker' do
expect(::GeoRepositoryDestroyWorker).not_to receive(:perform_async)
.with(project.id, deleted_project_name, deleted_path, project.repository_storage)
subject.process
end
it 'does not create a tracking entry' do
expect { subject.process }.not_to change(Geo::ProjectRegistry, :count)
end
end
context 'when a tracking entry exists' do
let!(:tracking_entry) { create(:geo_project_registry, project: project) }
it 'removes the tracking entry' do
expect { subject.process }.to change(Geo::ProjectRegistry, :count).by(-1)
end
end
end
end
require 'spec_helper'
describe Gitlab::Geo::LogCursor::Events::RepositoryRenamedEvent, :postgresql, :clean_gitlab_redis_shared_state do
let(:logger) { Gitlab::Geo::LogCursor::Logger.new(described_class, Logger::INFO) }
let(:event_log) { create(:geo_event_log, :renamed_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
let(:repository_renamed_event) { event_log.repository_renamed_event }
let(:project) {repository_renamed_event.project }
let(:old_path_with_namespace) { repository_renamed_event.old_path_with_namespace }
let(:new_path_with_namespace) { repository_renamed_event.new_path_with_namespace }
subject { described_class.new(repository_renamed_event, Time.now, logger) }
around do |example|
Sidekiq::Testing.fake! { example.run }
end
describe '#process' do
context 'when a tracking entry does not exist' do
it 'does not create a tracking entry' do
expect { subject.process }.not_to change(Geo::ProjectRegistry, :count)
end
it 'does not schedule a Geo::RenameRepositoryWorker' do
expect(::Geo::RenameRepositoryWorker).not_to receive(:perform_async)
.with(project.id, old_path_with_namespace, new_path_with_namespace)
subject.process
end
end
it 'schedules a Geo::RenameRepositoryWorker' do
create(:geo_project_registry, project: project)
expect(::Geo::RenameRepositoryWorker).to receive(:perform_async)
.with(project.id, old_path_with_namespace, new_path_with_namespace)
subject.process
end
end
end
require 'spec_helper'
describe Gitlab::Geo::LogCursor::Events::RepositoryUpdatedEvent, :postgresql, :clean_gitlab_redis_shared_state do
include ::EE::GeoHelpers
let(:logger) { Gitlab::Geo::LogCursor::Logger.new(described_class, Logger::INFO) }
set(:secondary) { create(:geo_node) }
let(:project) { create(:project) }
let(:repository_updated_event) { create(:geo_repository_updated_event, project: project) }
let(:event_log) { create(:geo_event_log, repository_updated_event: repository_updated_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
subject { described_class.new(repository_updated_event, Time.now, logger) }
around do |example|
Sidekiq::Testing.fake! { example.run }
end
before do
stub_current_geo_node(secondary)
allow(Gitlab::Geo::ShardHealthCache).to receive(:healthy_shard?).with('broken').and_return(false)
end
RSpec.shared_examples 'RepositoryUpdatedEvent' do
it 'creates a new project registry if it does not exist' do
expect { subject.process }.to change(Geo::ProjectRegistry, :count).by(1)
end
context 'when we have an event source' do
before do
repository_updated_event.update!(source: event_source)
end
context 'when event source is a repository' do
let(:event_source) { Geo::RepositoryUpdatedEvent::REPOSITORY }
let!(:registry) { create(:geo_project_registry, :synced, :repository_verified, :repository_checksum_mismatch, project: repository_updated_event.project) }
it 'sets resync_repository to true' do
subject.process
reloaded_registry = registry.reload
expect(reloaded_registry.resync_repository).to be true
end
it 'resets the repository verification fields' do
subject.process
reloaded_registry = registry.reload
expect(reloaded_registry).to have_attributes(
repository_verification_checksum_sha: nil,
repository_checksum_mismatch: false,
last_repository_verification_failure: nil
)
end
end
context 'when the event source is a wiki' do
let(:event_source) { Geo::RepositoryUpdatedEvent::WIKI }
let!(:registry) { create(:geo_project_registry, :synced, :wiki_verified, :wiki_checksum_mismatch, project: repository_updated_event.project) }
it 'sets resync_wiki to true' do
subject.process
reloaded_registry = registry.reload
expect(reloaded_registry.resync_wiki).to be true
end
it 'resets the wiki repository verification fields' do
subject.process
reloaded_registry = registry.reload
expect(reloaded_registry.wiki_verification_checksum_sha).to be_nil
expect(reloaded_registry.wiki_checksum_mismatch).to be false
expect(reloaded_registry.last_wiki_verification_failure).to be_nil
end
end
end
end
describe '#process' do
let(:now) { Time.now }
before do
allow(Gitlab::Geo::ShardHealthCache).to receive(:healthy_shard?).with('default').and_return(healthy)
end
context 'when the associated shard is healthy' do
let(:healthy) { true }
it_behaves_like 'RepositoryUpdatedEvent'
it 'schedules a Geo::ProjectSyncWorker' do
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(project.id, now).once
Timecop.freeze(now) { subject.process }
end
end
context 'when associated shard is unhealthy' do
let(:healthy) { false }
it_behaves_like 'RepositoryUpdatedEvent'
it 'does not schedule a Geo::ProjectSyncWorker job' do
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(project.id, now)
Timecop.freeze(now) { subject.process }
end
end
end
end
require 'spec_helper'
describe Gitlab::Geo::LogCursor::Events::UploadDeletedEvent, :postgresql, :clean_gitlab_redis_shared_state do
let(:logger) { Gitlab::Geo::LogCursor::Logger.new(described_class, Logger::INFO) }
let(:project) { create(:project) }
let(:upload_deleted_event) { create(:geo_upload_deleted_event, project: project) }
let(:event_log) { create(:geo_event_log, upload_deleted_event: upload_deleted_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
subject { described_class.new(upload_deleted_event, Time.now, logger) }
around do |example|
Sidekiq::Testing.fake! { example.run }
end
describe '#process' do
context 'with default handling' do
let(:event_log) { create(:geo_event_log, :upload_deleted_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
let(:upload_deleted_event) { event_log.upload_deleted_event }
let(:upload) { upload_deleted_event.upload }
it 'does not create a tracking database entry' do
expect { subject.process }.not_to change(Geo::FileRegistry, :count)
end
it 'removes the tracking database entry if exist' do
create(:geo_file_registry, :avatar, file_id: upload.id)
expect { subject.process }.to change(Geo::FileRegistry.attachments, :count).by(-1)
end
end
end
end
require 'spec_helper'
describe Gitlab::Geo::LogCursor::Events do
describe '.fetch_in_batches' do
let!(:event_log_1) { create(:geo_event_log) }
let!(:event_log_2) { create(:geo_event_log) }
context 'when no event_log_state exist' do
it 'does not yield a group of events' do
expect { |b| described_class.fetch_in_batches(&b) }.not_to yield_with_args([event_log_1, event_log_2])
end
end
context 'when there is already an event_log_state' do
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log_1.id - 1) }
it 'yields a group of events' do
expect { |b| described_class.fetch_in_batches(&b) }.to yield_with_args([event_log_1, event_log_2])
end
it 'saves last event as last processed after yielding' do
described_class.fetch_in_batches { |batch| batch }
expect(Geo::EventLogState.last.event_id).to eq(event_log_2.id)
end
end
end
describe '.save_processed' do
it 'creates a new event_log_state when no event_log_state exist' do
expect { described_class.save_processed(1) }.to change(Geo::EventLogState, :count).by(1)
expect(Geo::EventLogState.last.event_id).to eq(1)
end
it 'updates the event_id when there is already an event_log_state' do
create(:geo_event_log_state)
expect { described_class.save_processed(2) }.not_to change(Geo::EventLogState, :count)
expect(Geo::EventLogState.last.event_id).to eq(2)
end
end
describe '.last_processed' do
context 'when system has not generated any event yet' do
it 'returns -1' do
expect(described_class.last_processed).to eq(-1)
end
end
context 'when there are existing events already but no event_log_state' do
let!(:event_log) { create(:geo_event_log) }
it 'returns last event id' do
expect(described_class.last_processed).to eq(event_log.id)
end
it 'saves last event as the last processed' do
expect { described_class.last_processed }.to change(Geo::EventLogState, :count).by(1)
expect(Geo::EventLogState.last.event_id).to eq(event_log.id)
end
end
context 'when there is already an event_log_state' do
let!(:event_log_state) { create(:geo_event_log_state) }
it 'returns last event from event_log_state' do
expect(described_class.last_processed).to eq(event_log_state.id)
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment