Commit 87006d9c authored by Allen Cook's avatar Allen Cook Committed by Stan Hu

Resolve "Handle Duplication of Resources on Migration"

parent 9618f99c
......@@ -29,7 +29,7 @@ class BulkImports::Tracker < ApplicationRecord
def self.stage_running?(entity_id, stage)
where(stage: stage, bulk_import_entity_id: entity_id)
.with_status(:created, :started)
.with_status(:created, :enqueued, :started)
.exists?
end
......@@ -45,15 +45,24 @@ class BulkImports::Tracker < ApplicationRecord
state :created, value: 0
state :started, value: 1
state :finished, value: 2
state :enqueued, value: 3
state :failed, value: -1
state :skipped, value: -2
event :start do
transition created: :started
transition enqueued: :started
# To avoid errors when re-starting a pipeline in case of network errors
transition started: :started
end
event :retry do
transition started: :enqueued
end
event :enqueue do
transition created: :enqueued
end
event :finish do
transition started: :finished
transition failed: :failed
......
......@@ -1931,7 +1931,7 @@
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent:
:idempotent: true
:tags: []
- :name: bulk_imports_export_request
:worker_name: BulkImports::ExportRequestWorker
......
......@@ -12,6 +12,9 @@ module BulkImports
worker_has_external_dependencies!
idempotent!
deduplicate :until_executed, including_scheduled: true
def perform(entity_id, current_stage = nil)
return if stage_running?(entity_id, current_stage)
......@@ -48,7 +51,7 @@ module BulkImports
end
def next_pipeline_trackers_for(entity_id)
BulkImports::Tracker.next_pipeline_trackers_for(entity_id)
BulkImports::Tracker.next_pipeline_trackers_for(entity_id).update(status_event: 'enqueue')
end
def logger
......
......@@ -16,7 +16,7 @@ module BulkImports
def perform(pipeline_tracker_id, stage, entity_id)
pipeline_tracker = ::BulkImports::Tracker
.with_status(:created, :started)
.with_status(:enqueued)
.find_by_id(pipeline_tracker_id)
if pipeline_tracker.present?
......@@ -68,6 +68,8 @@ module BulkImports
message: "Retrying error: #{e.message}"
)
pipeline_tracker.update!(status_event: 'retry', jid: jid)
reenqueue(pipeline_tracker, delay: e.retry_delay)
else
fail_tracker(pipeline_tracker, e)
......
......@@ -14,96 +14,118 @@ RSpec.describe BulkImports::EntityWorker do
)
end
it 'enqueues the first stage pipelines work' do
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger)
.to receive(:info)
.with(
worker: described_class.name,
entity_id: entity.id,
current_stage: nil
)
end
let(:job_args) { entity.id }
expect(BulkImports::PipelineWorker)
.to receive(:perform_async)
.with(
pipeline_tracker.id,
pipeline_tracker.stage,
entity.id
)
it 'updates pipeline trackers to enqueued state when selected' do
worker = BulkImports::EntityWorker.new
subject.perform(entity.id)
end
next_tracker = worker.send(:next_pipeline_trackers_for, entity.id).first
it 'do not enqueue a new pipeline job if the current stage still running' do
expect(BulkImports::PipelineWorker)
.not_to receive(:perform_async)
next_tracker.reload
subject.perform(entity.id, 0)
end
it 'enqueues the next stage pipelines when the current stage is finished' do
next_stage_pipeline_tracker = create(
:bulk_import_tracker,
entity: entity,
pipeline_name: 'Stage1::Pipeline',
stage: 1
)
expect(next_tracker.enqueued?).to be_truthy
pipeline_tracker.fail_op!
expect(worker.send(:next_pipeline_trackers_for, entity.id))
.not_to include(next_tracker)
end
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger)
.to receive(:info)
include_examples 'an idempotent worker' do
it 'enqueues the first stage pipelines work' do
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
# the worker runs twice but only executes once
expect(logger)
.to receive(:info).twice
.with(
worker: described_class.name,
entity_id: entity.id,
current_stage: nil
)
end
expect(BulkImports::PipelineWorker)
.to receive(:perform_async)
.with(
worker: described_class.name,
entity_id: entity.id,
current_stage: 0
pipeline_tracker.id,
pipeline_tracker.stage,
entity.id
)
subject
end
expect(BulkImports::PipelineWorker)
.to receive(:perform_async)
.with(
next_stage_pipeline_tracker.id,
next_stage_pipeline_tracker.stage,
entity.id
)
it 'logs and tracks the raised exceptions' do
exception = StandardError.new('Error!')
expect(BulkImports::PipelineWorker)
.to receive(:perform_async)
.and_raise(exception)
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger)
.to receive(:info).twice
.with(
worker: described_class.name,
entity_id: entity.id,
current_stage: nil
)
expect(logger)
.to receive(:error)
.with(
worker: described_class.name,
entity_id: entity.id,
current_stage: nil,
error_message: 'Error!'
)
end
expect(Gitlab::ErrorTracking)
.to receive(:track_exception)
.with(exception, entity_id: entity.id)
subject
end
subject.perform(entity.id, 0)
end
context 'in first stage' do
let(:job_args) { [entity.id, 0] }
it 'logs and tracks the raised exceptions' do
exception = StandardError.new('Error!')
it 'do not enqueue a new pipeline job if the current stage still running' do
expect(BulkImports::PipelineWorker)
.not_to receive(:perform_async)
expect(BulkImports::PipelineWorker)
.to receive(:perform_async)
.and_raise(exception)
subject
end
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger)
.to receive(:info)
.with(
worker: described_class.name,
entity_id: entity.id,
current_stage: nil
it 'enqueues the next stage pipelines when the current stage is finished' do
next_stage_pipeline_tracker = create(
:bulk_import_tracker,
entity: entity,
pipeline_name: 'Stage1::Pipeline',
stage: 1
)
expect(logger)
.to receive(:error)
.with(
worker: described_class.name,
entity_id: entity.id,
current_stage: nil,
error_message: 'Error!'
)
pipeline_tracker.fail_op!
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger)
.to receive(:info).twice
.with(
worker: described_class.name,
entity_id: entity.id,
current_stage: 0
)
end
expect(BulkImports::PipelineWorker)
.to receive(:perform_async)
.with(
next_stage_pipeline_tracker.id,
next_stage_pipeline_tracker.stage,
entity.id
)
subject
end
end
expect(Gitlab::ErrorTracking)
.to receive(:track_exception)
.with(exception, entity_id: entity.id)
subject.perform(entity.id)
end
end
......@@ -60,18 +60,8 @@ RSpec.describe BulkImports::PipelineWorker do
create(
:bulk_import_tracker,
entity: entity,
pipeline_name: 'FakePipeline'
)
end
end
it_behaves_like 'successfully runs the pipeline' do
let(:pipeline_tracker) do
create(
:bulk_import_tracker,
:started,
entity: entity,
pipeline_name: 'FakePipeline'
pipeline_name: 'FakePipeline',
status_event: 'enqueue'
)
end
end
......@@ -109,7 +99,8 @@ RSpec.describe BulkImports::PipelineWorker do
pipeline_tracker = create(
:bulk_import_tracker,
entity: entity,
pipeline_name: 'InexistentPipeline'
pipeline_name: 'InexistentPipeline',
status_event: 'enqueue'
)
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
......@@ -150,7 +141,8 @@ RSpec.describe BulkImports::PipelineWorker do
pipeline_tracker = create(
:bulk_import_tracker,
entity: entity,
pipeline_name: 'FakePipeline'
pipeline_name: 'FakePipeline',
status_event: 'enqueue'
)
exception = BulkImports::NetworkError.new(
......@@ -163,7 +155,21 @@ RSpec.describe BulkImports::PipelineWorker do
.and_raise(exception)
end
expect(subject).to receive(:jid).and_return('jid')
expect(subject).to receive(:jid).and_return('jid').twice
expect_any_instance_of(BulkImports::Tracker) do |tracker|
expect(tracker).to receive(:retry).and_call_original
end
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger)
.to receive(:info)
.with(
worker: described_class.name,
pipeline_name: 'FakePipeline',
entity_id: entity.id
)
end
expect(described_class)
.to receive(:perform_in)
......@@ -175,6 +181,10 @@ RSpec.describe BulkImports::PipelineWorker do
)
subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
pipeline_tracker.reload
expect(pipeline_tracker.enqueued?).to be_truthy
end
end
end
......@@ -200,7 +210,8 @@ RSpec.describe BulkImports::PipelineWorker do
create(
:bulk_import_tracker,
entity: entity,
pipeline_name: 'NdjsonPipeline'
pipeline_name: 'NdjsonPipeline',
status_event: 'enqueue'
)
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment