Commit cea74ea4 authored by George Koltsov's avatar George Koltsov

Update Bulk Import state more accurately

  - Update Bulk Import state to failed if all
    entities fail
  - Update Entity state to failed if all trackers
    fail excluding entity finisher, since this
    tracker only updates the status without doing
    any import

Changelog: added
parent 6c385b20
......@@ -15,7 +15,8 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker
@bulk_import = BulkImport.find_by_id(bulk_import_id)
return unless @bulk_import
return if @bulk_import.finished?
return if @bulk_import.finished? || @bulk_import.failed?
return @bulk_import.fail_op! if all_entities_failed?
return @bulk_import.finish! if all_entities_processed? && @bulk_import.started?
return re_enqueue if max_batch_size_exceeded? # Do not start more jobs if max allowed are already running
......@@ -55,6 +56,10 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker
entities.all? { |entity| entity.finished? || entity.failed? }
end
def all_entities_failed?
entities.all? { |entity| entity.failed? }
end
def max_batch_size_exceeded?
started_entities.count >= DEFAULT_BATCH_SIZE
end
......
......@@ -10,29 +10,39 @@ module BulkImports
def initialize(context)
@context = context
@entity = @context.entity
@trackers = @entity.trackers
end
def run
return if context.entity.finished?
return if entity.finished? || entity.failed?
context.entity.finish!
if all_other_trackers_failed?
entity.fail_op!
else
entity.finish!
end
logger.info(
bulk_import_id: context.bulk_import.id,
bulk_import_entity_id: context.entity.id,
bulk_import_entity_type: context.entity.source_type,
pipeline_class: self.class.name,
message: 'Entity finished'
message: "Entity #{entity.status_name}"
)
end
private
attr_reader :context
attr_reader :context, :entity, :trackers
def logger
@logger ||= Gitlab::Import::Logger.build
end
def all_other_trackers_failed?
trackers.where.not(relation: self.class.name).all? { |tracker| tracker.failed? } # rubocop: disable CodeReuse/ActiveRecord
end
end
end
end
......
......@@ -19,5 +19,11 @@ FactoryBot.define do
sequence(:jid) { |n| "bulk_import_entity_#{n}" }
end
trait :failed do
status { -1 }
sequence(:jid) { |n| "bulk_import_entity_#{n}" }
end
end
end
......@@ -25,8 +25,10 @@ RSpec.describe BulkImports::Groups::Pipelines::EntityFinisher do
.to change(entity, :status_name).to(:finished)
end
it 'does nothing when the entity is already finished' do
entity = create(:bulk_import_entity, :finished)
context 'when entity is in a final finished or failed state' do
shared_examples 'performs no state update' do |entity_state|
it 'does nothing' do
entity = create(:bulk_import_entity, entity_state)
pipeline_tracker = create(:bulk_import_tracker, entity: entity)
context = BulkImports::Pipeline::Context.new(pipeline_tracker)
subject = described_class.new(context)
......@@ -34,4 +36,22 @@ RSpec.describe BulkImports::Groups::Pipelines::EntityFinisher do
expect { subject.run }
.not_to change(entity, :status_name)
end
end
include_examples 'performs no state update', :finished
include_examples 'performs no state update', :failed
end
context 'when all entity trackers failed' do
it 'marks entity as failed' do
entity = create(:bulk_import_entity, :started)
create(:bulk_import_tracker, :failed, entity: entity)
pipeline_tracker = create(:bulk_import_tracker, entity: entity, relation: described_class)
context = BulkImports::Pipeline::Context.new(pipeline_tracker)
described_class.new(context).run
expect(entity.reload.failed?).to eq(true)
end
end
end
......@@ -22,6 +22,16 @@ RSpec.describe BulkImportWorker do
end
end
context 'when bulk import is failed' do
it 'does nothing' do
bulk_import = create(:bulk_import, :failed)
expect(described_class).not_to receive(:perform_in)
subject.perform(bulk_import.id)
end
end
context 'when all entities are processed' do
it 'marks bulk import as finished' do
bulk_import = create(:bulk_import, :started)
......@@ -34,6 +44,18 @@ RSpec.describe BulkImportWorker do
end
end
context 'when all entities are failed' do
it 'marks bulk import as failed' do
bulk_import = create(:bulk_import, :started)
create(:bulk_import_entity, :failed, bulk_import: bulk_import)
create(:bulk_import_entity, :failed, bulk_import: bulk_import)
subject.perform(bulk_import.id)
expect(bulk_import.reload.failed?).to eq(true)
end
end
context 'when maximum allowed number of import entities in progress' do
it 'reenqueues itself' do
bulk_import = create(:bulk_import, :started)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment