Commit 5defc140 authored by Arturo Herrero's avatar Arturo Herrero

Merge branch 'kassio/bulkimports-track-pipeline-work' into 'master'

[RUN AS-IF-FOSS] BulkImports: Track pipeline worker with BulkImports::Tracker#status

See merge request gitlab-org/gitlab!56242
parents 62d45567 fe9bc7b1
...@@ -68,25 +68,6 @@ class BulkImports::Entity < ApplicationRecord ...@@ -68,25 +68,6 @@ class BulkImports::Entity < ApplicationRecord
end end
end end
def update_tracker_for(relation:, has_next_page:, next_page: nil)
attributes = {
relation: relation,
has_next_page: has_next_page,
next_page: next_page,
bulk_import_entity_id: id
}
trackers.upsert(attributes, unique_by: %i[bulk_import_entity_id relation])
end
def has_next_page?(relation)
trackers.find_by(relation: relation)&.has_next_page
end
def next_page_for(relation)
trackers.find_by(relation: relation)&.next_page
end
private private
def validate_parent_is_a_group def validate_parent_is_a_group
......
...@@ -3,6 +3,8 @@ ...@@ -3,6 +3,8 @@
class BulkImports::Tracker < ApplicationRecord class BulkImports::Tracker < ApplicationRecord
self.table_name = 'bulk_import_trackers' self.table_name = 'bulk_import_trackers'
alias_attribute :pipeline_name, :relation
belongs_to :entity, belongs_to :entity,
class_name: 'BulkImports::Entity', class_name: 'BulkImports::Entity',
foreign_key: :bulk_import_entity_id, foreign_key: :bulk_import_entity_id,
...@@ -28,6 +30,10 @@ class BulkImports::Tracker < ApplicationRecord ...@@ -28,6 +30,10 @@ class BulkImports::Tracker < ApplicationRecord
end end
event :finish do event :finish do
# When applying the concurrent model,
# remove the created => finished transaction
# https://gitlab.com/gitlab-org/gitlab/-/issues/323384
transition created: :finished
transition started: :finished transition started: :finished
transition failed: :failed transition failed: :failed
transition skipped: :skipped transition skipped: :skipped
......
---
title: 'BulkImports: Track pipeline worker with BulkImports::Tracker#status'
merge_request: 56242
author:
type: changed
...@@ -32,11 +32,10 @@ module EE ...@@ -32,11 +32,10 @@ module EE
def variables(context) def variables(context)
iid = context.extra[:epic_iid] iid = context.extra[:epic_iid]
tracker = "epic_#{iid}_award_emoji"
{ {
full_path: context.entity.source_full_path, full_path: context.entity.source_full_path,
cursor: context.entity.next_page_for(tracker), cursor: context.tracker.next_page,
epic_iid: iid epic_iid: iid
} }
end end
......
...@@ -34,11 +34,10 @@ module EE ...@@ -34,11 +34,10 @@ module EE
def variables(context) def variables(context)
iid = context.extra[:epic_iid] iid = context.extra[:epic_iid]
tracker = "epic_#{iid}_events"
{ {
full_path: context.entity.source_full_path, full_path: context.entity.source_full_path,
cursor: context.entity.next_page_for(tracker), cursor: context.tracker.next_page,
epic_iid: iid epic_iid: iid
} }
end end
......
...@@ -61,7 +61,7 @@ module EE ...@@ -61,7 +61,7 @@ module EE
def variables(context) def variables(context)
{ {
full_path: context.entity.source_full_path, full_path: context.entity.source_full_path,
cursor: context.entity.next_page_for(:epics) cursor: context.tracker.next_page
} }
end end
......
...@@ -35,7 +35,7 @@ module EE ...@@ -35,7 +35,7 @@ module EE
def variables(context) def variables(context)
{ {
full_path: context.entity.source_full_path, full_path: context.entity.source_full_path,
cursor: context.entity.next_page_for(:iterations) cursor: context.tracker.next_page
} }
end end
......
...@@ -25,11 +25,7 @@ module EE ...@@ -25,11 +25,7 @@ module EE
end end
def after_run(extracted_data) def after_run(extracted_data)
iid = context.extra[:epic_iid] tracker.update(
tracker = "epic_#{iid}_award_emoji"
context.entity.update_tracker_for(
relation: tracker,
has_next_page: extracted_data.has_next_page?, has_next_page: extracted_data.has_next_page?,
next_page: extracted_data.next_page next_page: extracted_data.next_page
) )
......
...@@ -48,11 +48,7 @@ module EE ...@@ -48,11 +48,7 @@ module EE
end end
def after_run(extracted_data) def after_run(extracted_data)
iid = context.extra[:epic_iid] tracker.update(
tracker = "epic_#{iid}_events"
context.entity.update_tracker_for(
relation: tracker,
has_next_page: extracted_data.has_next_page?, has_next_page: extracted_data.has_next_page?,
next_page: extracted_data.next_page next_page: extracted_data.next_page
) )
......
...@@ -24,8 +24,7 @@ module EE ...@@ -24,8 +24,7 @@ module EE
end end
def after_run(extracted_data) def after_run(extracted_data)
context.entity.update_tracker_for( tracker.update(
relation: :epics,
has_next_page: extracted_data.has_next_page?, has_next_page: extracted_data.has_next_page?,
next_page: extracted_data.next_page next_page: extracted_data.next_page
) )
......
...@@ -21,8 +21,7 @@ module EE ...@@ -21,8 +21,7 @@ module EE
end end
def after_run(extracted_data) def after_run(extracted_data)
context.entity.update_tracker_for( tracker.update(
relation: :iterations,
has_next_page: extracted_data.has_next_page?, has_next_page: extracted_data.has_next_page?,
next_page: extracted_data.next_page next_page: extracted_data.next_page
) )
......
...@@ -4,7 +4,7 @@ require 'spec_helper' ...@@ -4,7 +4,7 @@ require 'spec_helper'
RSpec.describe EE::BulkImports::Groups::Graphql::GetEpicAwardEmojiQuery do RSpec.describe EE::BulkImports::Groups::Graphql::GetEpicAwardEmojiQuery do
it 'has a valid query' do it 'has a valid query' do
context = BulkImports::Pipeline::Context.new(create(:bulk_import_entity), epic_iid: 1) context = BulkImports::Pipeline::Context.new(create(:bulk_import_tracker), epic_iid: 1)
result = GitlabSchema.execute( result = GitlabSchema.execute(
described_class.to_s, described_class.to_s,
......
...@@ -4,7 +4,7 @@ require 'spec_helper' ...@@ -4,7 +4,7 @@ require 'spec_helper'
RSpec.describe EE::BulkImports::Groups::Graphql::GetEpicEventsQuery do RSpec.describe EE::BulkImports::Groups::Graphql::GetEpicEventsQuery do
it 'has a valid query' do it 'has a valid query' do
context = BulkImports::Pipeline::Context.new(create(:bulk_import_entity), epic_iid: 1) context = BulkImports::Pipeline::Context.new(create(:bulk_import_tracker), epic_iid: 1)
result = GitlabSchema.execute( result = GitlabSchema.execute(
described_class.to_s, described_class.to_s,
......
...@@ -4,7 +4,7 @@ require 'spec_helper' ...@@ -4,7 +4,7 @@ require 'spec_helper'
RSpec.describe EE::BulkImports::Groups::Graphql::GetEpicsQuery do RSpec.describe EE::BulkImports::Groups::Graphql::GetEpicsQuery do
it 'has a valid query' do it 'has a valid query' do
context = BulkImports::Pipeline::Context.new(create(:bulk_import_entity)) context = BulkImports::Pipeline::Context.new(create(:bulk_import_tracker))
result = GitlabSchema.execute( result = GitlabSchema.execute(
described_class.to_s, described_class.to_s,
......
...@@ -4,8 +4,8 @@ require 'spec_helper' ...@@ -4,8 +4,8 @@ require 'spec_helper'
RSpec.describe EE::BulkImports::Groups::Graphql::GetIterationsQuery do RSpec.describe EE::BulkImports::Groups::Graphql::GetIterationsQuery do
it 'has a valid query' do it 'has a valid query' do
entity = create(:bulk_import_entity) tracker = create(:bulk_import_tracker)
context = BulkImports::Pipeline::Context.new(entity) context = BulkImports::Pipeline::Context.new(tracker)
query = GraphQL::Query.new( query = GraphQL::Query.new(
GitlabSchema, GitlabSchema,
......
...@@ -9,7 +9,8 @@ RSpec.describe EE::BulkImports::Groups::Loaders::EpicAwardEmojiLoader do ...@@ -9,7 +9,8 @@ RSpec.describe EE::BulkImports::Groups::Loaders::EpicAwardEmojiLoader do
let_it_be(:epic) { create(:epic, group: group, iid: 1) } let_it_be(:epic) { create(:epic, group: group, iid: 1) }
let_it_be(:bulk_import) { create(:bulk_import, user: user) } let_it_be(:bulk_import) { create(:bulk_import, user: user) }
let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import, group: group) } let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import, group: group) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(entity) } let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
let_it_be(:data) do let_it_be(:data) do
{ {
......
...@@ -7,8 +7,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline do ...@@ -7,8 +7,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline do
let_it_be(:user) { create(:user) } let_it_be(:user) { create(:user) }
let_it_be(:group) { create(:group) } let_it_be(:group) { create(:group) }
let_it_be(:epic) { create(:epic, group: group) } let_it_be(:epic) { create(:epic, group: group) }
let_it_be(:tracker) { "epic_#{epic.iid}_award_emoji" }
let_it_be(:bulk_import) { create(:bulk_import, user: user) } let_it_be(:bulk_import) { create(:bulk_import, user: user) }
let_it_be(:entity) do let_it_be(:entity) do
create( create(
:bulk_import_entity, :bulk_import_entity,
...@@ -20,7 +20,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline do ...@@ -20,7 +20,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline do
) )
end end
let_it_be(:context) { BulkImports::Pipeline::Context.new(entity) } let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
before do before do
stub_licensed_features(epics: true) stub_licensed_features(epics: true)
...@@ -39,7 +40,7 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline do ...@@ -39,7 +40,7 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline do
describe '#run' do describe '#run' do
it 'imports epic award emoji' do it 'imports epic award emoji' do
data = extractor_data(has_next_page: false, cursor: cursor) data = extractor_data(has_next_page: false)
allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor| allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor|
allow(extractor) allow(extractor)
...@@ -61,10 +62,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline do ...@@ -61,10 +62,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline do
subject.after_run(data) subject.after_run(data)
page_tracker = entity.trackers.find_by(relation: tracker) expect(tracker.has_next_page).to eq(true)
expect(tracker.next_page).to eq(cursor)
expect(page_tracker.has_next_page).to eq(true)
expect(page_tracker.next_page).to eq(cursor)
end end
end end
...@@ -76,10 +75,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline do ...@@ -76,10 +75,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline do
subject.after_run(data) subject.after_run(data)
page_tracker = entity.trackers.find_by(relation: tracker) expect(tracker.has_next_page).to eq(false)
expect(tracker.next_page).to be_nil
expect(page_tracker.has_next_page).to eq(false)
expect(page_tracker.next_page).to be_nil
end end
it 'updates context with next epic iid' do it 'updates context with next epic iid' do
......
...@@ -7,8 +7,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicEventsPipeline do ...@@ -7,8 +7,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicEventsPipeline do
let_it_be(:user) { create(:user) } let_it_be(:user) { create(:user) }
let_it_be(:group) { create(:group) } let_it_be(:group) { create(:group) }
let_it_be(:epic) { create(:epic, group: group) } let_it_be(:epic) { create(:epic, group: group) }
let_it_be(:tracker) { "epic_#{epic.iid}_events" }
let_it_be(:bulk_import) { create(:bulk_import, user: user) } let_it_be(:bulk_import) { create(:bulk_import, user: user) }
let_it_be(:entity) do let_it_be(:entity) do
create( create(
:bulk_import_entity, :bulk_import_entity,
...@@ -20,7 +20,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicEventsPipeline do ...@@ -20,7 +20,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicEventsPipeline do
) )
end end
let_it_be(:context) { BulkImports::Pipeline::Context.new(entity) } let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
before do before do
stub_licensed_features(epics: true) stub_licensed_features(epics: true)
...@@ -39,7 +40,7 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicEventsPipeline do ...@@ -39,7 +40,7 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicEventsPipeline do
describe '#run' do describe '#run' do
it 'imports epic events and resource state events' do it 'imports epic events and resource state events' do
data = extractor_data(has_next_page: false, cursor: cursor) data = extractor_data(has_next_page: false)
allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor| allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor|
allow(extractor) allow(extractor)
...@@ -102,10 +103,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicEventsPipeline do ...@@ -102,10 +103,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicEventsPipeline do
subject.after_run(data) subject.after_run(data)
page_tracker = entity.trackers.find_by(relation: tracker) expect(tracker.has_next_page).to eq(true)
expect(tracker.next_page).to eq(cursor)
expect(page_tracker.has_next_page).to eq(true)
expect(page_tracker.next_page).to eq(cursor)
end end
end end
...@@ -117,10 +116,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicEventsPipeline do ...@@ -117,10 +116,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicEventsPipeline do
subject.after_run(data) subject.after_run(data)
page_tracker = entity.trackers.find_by(relation: tracker) expect(tracker.has_next_page).to eq(false)
expect(tracker.next_page).to be_nil
expect(page_tracker.has_next_page).to eq(false)
expect(page_tracker.next_page).to be_nil
end end
it 'updates context with next epic iid' do it 'updates context with next epic iid' do
......
...@@ -6,8 +6,9 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline, :clean_gitlab_ ...@@ -6,8 +6,9 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline, :clean_gitlab_
let_it_be(:cursor) { 'cursor' } let_it_be(:cursor) { 'cursor' }
let_it_be(:user) { create(:user) } let_it_be(:user) { create(:user) }
let_it_be(:group) { create(:group) } let_it_be(:group) { create(:group) }
let(:bulk_import) { create(:bulk_import, user: user) } let_it_be(:bulk_import) { create(:bulk_import, user: user) }
let(:entity) do
let_it_be(:entity) do
create( create(
:bulk_import_entity, :bulk_import_entity,
group: group, group: group,
...@@ -18,7 +19,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline, :clean_gitlab_ ...@@ -18,7 +19,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline, :clean_gitlab_
) )
end end
let(:context) { BulkImports::Pipeline::Context.new(entity) } let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
before do before do
stub_licensed_features(epics: true) stub_licensed_features(epics: true)
...@@ -116,8 +118,6 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline, :clean_gitlab_ ...@@ -116,8 +118,6 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline, :clean_gitlab_
subject.after_run(data) subject.after_run(data)
tracker = entity.trackers.find_by(relation: :epics)
expect(tracker.has_next_page).to eq(true) expect(tracker.has_next_page).to eq(true)
expect(tracker.next_page).to eq(cursor) expect(tracker.next_page).to eq(cursor)
end end
...@@ -131,8 +131,6 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline, :clean_gitlab_ ...@@ -131,8 +131,6 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline, :clean_gitlab_
subject.after_run(data) subject.after_run(data)
tracker = entity.trackers.find_by(relation: :epics)
expect(tracker.has_next_page).to eq(false) expect(tracker.has_next_page).to eq(false)
expect(tracker.next_page).to be_nil expect(tracker.next_page).to be_nil
end end
......
...@@ -9,7 +9,7 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::IterationsPipeline do ...@@ -9,7 +9,7 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::IterationsPipeline do
let_it_be(:timestamp) { Time.new(2020, 01, 01).utc } let_it_be(:timestamp) { Time.new(2020, 01, 01).utc }
let_it_be(:bulk_import) { create(:bulk_import, user: user) } let_it_be(:bulk_import) { create(:bulk_import, user: user) }
let(:entity) do let_it_be(:entity) do
create( create(
:bulk_import_entity, :bulk_import_entity,
bulk_import: bulk_import, bulk_import: bulk_import,
...@@ -20,7 +20,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::IterationsPipeline do ...@@ -20,7 +20,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::IterationsPipeline do
) )
end end
let(:context) { BulkImports::Pipeline::Context.new(entity) } let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
subject { described_class.new(context) } subject { described_class.new(context) }
...@@ -85,8 +86,6 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::IterationsPipeline do ...@@ -85,8 +86,6 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::IterationsPipeline do
subject.after_run(data) subject.after_run(data)
tracker = entity.trackers.find_by(relation: :iterations)
expect(tracker.has_next_page).to eq(true) expect(tracker.has_next_page).to eq(true)
expect(tracker.next_page).to eq(cursor) expect(tracker.next_page).to eq(cursor)
end end
...@@ -100,8 +99,6 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::IterationsPipeline do ...@@ -100,8 +99,6 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::IterationsPipeline do
subject.after_run(data) subject.after_run(data)
tracker = entity.trackers.find_by(relation: :iterations)
expect(tracker.has_next_page).to eq(false) expect(tracker.has_next_page).to eq(false)
expect(tracker.next_page).to be_nil expect(tracker.next_page).to be_nil
end end
......
...@@ -7,7 +7,8 @@ RSpec.describe EE::BulkImports::Groups::Transformers::EpicAttributesTransformer ...@@ -7,7 +7,8 @@ RSpec.describe EE::BulkImports::Groups::Transformers::EpicAttributesTransformer
let_it_be(:group) { create(:group) } let_it_be(:group) { create(:group) }
let_it_be(:bulk_import) { create(:bulk_import, user: importer_user) } let_it_be(:bulk_import) { create(:bulk_import, user: importer_user) }
let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import, group: group) } let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import, group: group) }
let_it_be(:context) { ::BulkImports::Pipeline::Context.new(entity) } let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
describe '#transform' do describe '#transform' do
it 'transforms the epic attributes' do it 'transforms the epic attributes' do
......
...@@ -3,14 +3,14 @@ ...@@ -3,14 +3,14 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe BulkImports::Importers::GroupImporter do RSpec.describe BulkImports::Importers::GroupImporter do
let(:user) { create(:user) } let_it_be(:user) { create(:user) }
let(:group) { create(:group) } let_it_be(:group) { create(:group) }
let(:bulk_import) { create(:bulk_import, user: user) } let_it_be(:bulk_import) { create(:bulk_import, user: user) }
let(:bulk_import_entity) { create(:bulk_import_entity, :started, bulk_import: bulk_import, group: group) } let_it_be(:entity) { create(:bulk_import_entity, :started, bulk_import: bulk_import, group: group) }
let(:bulk_import_configuration) { create(:bulk_import_configuration, bulk_import: bulk_import) } let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let(:context) { BulkImports::Pipeline::Context.new(bulk_import_entity) } let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
subject { described_class.new(bulk_import_entity) } subject { described_class.new(entity) }
before do before do
allow(BulkImports::Pipeline::Context).to receive(:new).and_return(context) allow(BulkImports::Pipeline::Context).to receive(:new).and_return(context)
...@@ -25,10 +25,12 @@ RSpec.describe BulkImports::Importers::GroupImporter do ...@@ -25,10 +25,12 @@ RSpec.describe BulkImports::Importers::GroupImporter do
expect_to_run_pipeline BulkImports::Groups::Pipelines::MilestonesPipeline, context: context expect_to_run_pipeline BulkImports::Groups::Pipelines::MilestonesPipeline, context: context
expect_to_run_pipeline EE::BulkImports::Groups::Pipelines::EpicsPipeline, context: context expect_to_run_pipeline EE::BulkImports::Groups::Pipelines::EpicsPipeline, context: context
expect_to_run_pipeline EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline, context: context expect_to_run_pipeline EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline, context: context
expect_to_run_pipeline EE::BulkImports::Groups::Pipelines::EpicEventsPipeline, context: context
expect_to_run_pipeline EE::BulkImports::Groups::Pipelines::IterationsPipeline, context: context
subject.execute subject.execute
expect(bulk_import_entity.reload).to be_finished expect(entity.reload).to be_finished
end end
end end
......
...@@ -31,7 +31,7 @@ module BulkImports ...@@ -31,7 +31,7 @@ module BulkImports
def variables(context) def variables(context)
{ {
full_path: context.entity.source_full_path, full_path: context.entity.source_full_path,
cursor: context.entity.next_page_for(:labels) cursor: context.tracker.next_page
} }
end end
......
...@@ -34,7 +34,7 @@ module BulkImports ...@@ -34,7 +34,7 @@ module BulkImports
def variables(context) def variables(context)
{ {
full_path: context.entity.source_full_path, full_path: context.entity.source_full_path,
cursor: context.entity.next_page_for(:group_members) cursor: context.tracker.next_page
} }
end end
......
...@@ -33,7 +33,7 @@ module BulkImports ...@@ -33,7 +33,7 @@ module BulkImports
def variables(context) def variables(context)
{ {
full_path: context.entity.source_full_path, full_path: context.entity.source_full_path,
cursor: context.entity.next_page_for(:milestones) cursor: context.tracker.next_page
} }
end end
......
...@@ -16,8 +16,7 @@ module BulkImports ...@@ -16,8 +16,7 @@ module BulkImports
end end
def after_run(extracted_data) def after_run(extracted_data)
context.entity.update_tracker_for( tracker.update(
relation: :labels,
has_next_page: extracted_data.has_next_page?, has_next_page: extracted_data.has_next_page?,
next_page: extracted_data.next_page next_page: extracted_data.next_page
) )
......
...@@ -19,8 +19,7 @@ module BulkImports ...@@ -19,8 +19,7 @@ module BulkImports
end end
def after_run(extracted_data) def after_run(extracted_data)
context.entity.update_tracker_for( tracker.update(
relation: :group_members,
has_next_page: extracted_data.has_next_page?, has_next_page: extracted_data.has_next_page?,
next_page: extracted_data.next_page next_page: extracted_data.next_page
) )
......
...@@ -20,8 +20,7 @@ module BulkImports ...@@ -20,8 +20,7 @@ module BulkImports
end end
def after_run(extracted_data) def after_run(extracted_data)
context.entity.update_tracker_for( tracker.update(
relation: :milestones,
has_next_page: extracted_data.has_next_page?, has_next_page: extracted_data.has_next_page?,
next_page: extracted_data.next_page next_page: extracted_data.next_page
) )
......
...@@ -8,9 +8,18 @@ module BulkImports ...@@ -8,9 +8,18 @@ module BulkImports
end end
def execute def execute
context = BulkImports::Pipeline::Context.new(entity) pipelines.each.with_index do |pipeline, stage|
pipeline_tracker = entity.trackers.create!(
pipeline_name: pipeline,
stage: stage
)
pipelines.each { |pipeline| pipeline.new(context).run } context = BulkImports::Pipeline::Context.new(pipeline_tracker)
pipeline.new(context).run
pipeline_tracker.finish!
end
entity.finish! entity.finish!
end end
......
...@@ -15,6 +15,10 @@ module BulkImports ...@@ -15,6 +15,10 @@ module BulkImports
@context = context @context = context
end end
def tracker
@tracker ||= context.tracker
end
included do included do
private private
......
...@@ -3,25 +3,33 @@ ...@@ -3,25 +3,33 @@
module BulkImports module BulkImports
module Pipeline module Pipeline
class Context class Context
attr_reader :entity, :bulk_import
attr_accessor :extra attr_accessor :extra
def initialize(entity, extra = {}) attr_reader :tracker
@entity = entity
@bulk_import = entity.bulk_import def initialize(tracker, extra = {})
@tracker = tracker
@extra = extra @extra = extra
end end
def entity
@entity ||= tracker.entity
end
def group def group
entity.group @group ||= entity.group
end
def bulk_import
@bulk_import ||= entity.bulk_import
end end
def current_user def current_user
bulk_import.user @current_user ||= bulk_import.user
end end
def configuration def configuration
bulk_import.configuration @configuration ||= bulk_import.configuration
end end
end end
end end
......
...@@ -11,11 +11,14 @@ module BulkImports ...@@ -11,11 +11,14 @@ module BulkImports
end end
def has_next_page? def has_next_page?
@page_info['has_next_page'] Gitlab::Utils.to_boolean(
@page_info&.dig('has_next_page'),
default: false
)
end end
def next_page def next_page
@page_info['end_cursor'] @page_info&.dig('end_cursor')
end end
def each(&block) def each(&block)
......
...@@ -26,7 +26,7 @@ module BulkImports ...@@ -26,7 +26,7 @@ module BulkImports
end end
end end
if respond_to?(:after_run) if extracted_data && respond_to?(:after_run)
run_pipeline_step(:after_run) do run_pipeline_step(:after_run) do
after_run(extracted_data) after_run(extracted_data)
end end
...@@ -34,7 +34,7 @@ module BulkImports ...@@ -34,7 +34,7 @@ module BulkImports
info(message: 'Pipeline finished') info(message: 'Pipeline finished')
rescue MarkedAsFailedError rescue MarkedAsFailedError
log_skip skip!('Skipping pipeline due to failed entity')
end end
private # rubocop:disable Lint/UselessAccessModifier private # rubocop:disable Lint/UselessAccessModifier
...@@ -46,7 +46,11 @@ module BulkImports ...@@ -46,7 +46,11 @@ module BulkImports
yield yield
rescue MarkedAsFailedError rescue MarkedAsFailedError
log_skip(step => class_name) skip!(
'Skipping pipeline due to failed entity',
pipeline_step: step,
step_class: class_name
)
rescue => e rescue => e
log_import_failure(e, step) log_import_failure(e, step)
...@@ -65,10 +69,13 @@ module BulkImports ...@@ -65,10 +69,13 @@ module BulkImports
warn(message: 'Pipeline failed') warn(message: 'Pipeline failed')
context.entity.fail_op! context.entity.fail_op!
tracker.fail_op!
end end
def log_skip(extra = {}) def skip!(message, extra = {})
info({ message: 'Skipping due to failed pipeline status' }.merge(extra)) warn({ message: message }.merge(extra))
tracker.skip!
end end
def log_import_failure(exception, step) def log_import_failure(exception, step)
......
...@@ -8,7 +8,8 @@ RSpec.describe BulkImports::Common::Transformers::UserReferenceTransformer do ...@@ -8,7 +8,8 @@ RSpec.describe BulkImports::Common::Transformers::UserReferenceTransformer do
let_it_be(:group) { create(:group) } let_it_be(:group) { create(:group) }
let_it_be(:bulk_import) { create(:bulk_import) } let_it_be(:bulk_import) { create(:bulk_import) }
let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import, group: group) } let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import, group: group) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(entity) } let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
let(:hash) do let(:hash) do
{ {
......
...@@ -8,8 +8,9 @@ RSpec.describe BulkImports::Groups::Extractors::SubgroupsExtractor do ...@@ -8,8 +8,9 @@ RSpec.describe BulkImports::Groups::Extractors::SubgroupsExtractor do
bulk_import = create(:bulk_import) bulk_import = create(:bulk_import)
create(:bulk_import_configuration, bulk_import: bulk_import) create(:bulk_import_configuration, bulk_import: bulk_import)
entity = create(:bulk_import_entity, bulk_import: bulk_import) entity = create(:bulk_import_entity, bulk_import: bulk_import)
tracker = create(:bulk_import_tracker, entity: entity)
response = [{ 'test' => 'group' }] response = [{ 'test' => 'group' }]
context = BulkImports::Pipeline::Context.new(entity) context = BulkImports::Pipeline::Context.new(tracker)
allow_next_instance_of(BulkImports::Clients::Http) do |client| allow_next_instance_of(BulkImports::Clients::Http) do |client|
allow(client).to receive(:each_page).and_return(response) allow(client).to receive(:each_page).and_return(response)
......
...@@ -4,10 +4,10 @@ require 'spec_helper' ...@@ -4,10 +4,10 @@ require 'spec_helper'
RSpec.describe BulkImports::Groups::Graphql::GetGroupQuery do RSpec.describe BulkImports::Groups::Graphql::GetGroupQuery do
describe '#variables' do describe '#variables' do
let(:entity) { double(source_full_path: 'test', bulk_import: nil) }
let(:context) { BulkImports::Pipeline::Context.new(entity) }
it 'returns query variables based on entity information' do it 'returns query variables based on entity information' do
entity = double(source_full_path: 'test', bulk_import: nil)
tracker = double(entity: entity)
context = BulkImports::Pipeline::Context.new(tracker)
expected = { full_path: entity.source_full_path } expected = { full_path: entity.source_full_path }
expect(described_class.variables(context)).to eq(expected) expect(described_class.variables(context)).to eq(expected)
......
...@@ -4,8 +4,8 @@ require 'spec_helper' ...@@ -4,8 +4,8 @@ require 'spec_helper'
RSpec.describe BulkImports::Groups::Graphql::GetLabelsQuery do RSpec.describe BulkImports::Groups::Graphql::GetLabelsQuery do
it 'has a valid query' do it 'has a valid query' do
entity = create(:bulk_import_entity) tracker = create(:bulk_import_tracker)
context = BulkImports::Pipeline::Context.new(entity) context = BulkImports::Pipeline::Context.new(tracker)
query = GraphQL::Query.new( query = GraphQL::Query.new(
GitlabSchema, GitlabSchema,
......
...@@ -4,8 +4,8 @@ require 'spec_helper' ...@@ -4,8 +4,8 @@ require 'spec_helper'
RSpec.describe BulkImports::Groups::Graphql::GetMembersQuery do RSpec.describe BulkImports::Groups::Graphql::GetMembersQuery do
it 'has a valid query' do it 'has a valid query' do
entity = create(:bulk_import_entity) tracker = create(:bulk_import_tracker)
context = BulkImports::Pipeline::Context.new(entity) context = BulkImports::Pipeline::Context.new(tracker)
query = GraphQL::Query.new( query = GraphQL::Query.new(
GitlabSchema, GitlabSchema,
......
...@@ -4,8 +4,8 @@ require 'spec_helper' ...@@ -4,8 +4,8 @@ require 'spec_helper'
RSpec.describe BulkImports::Groups::Graphql::GetMilestonesQuery do RSpec.describe BulkImports::Groups::Graphql::GetMilestonesQuery do
it 'has a valid query' do it 'has a valid query' do
entity = create(:bulk_import_entity) tracker = create(:bulk_import_tracker)
context = BulkImports::Pipeline::Context.new(entity) context = BulkImports::Pipeline::Context.new(tracker)
query = GraphQL::Query.new( query = GraphQL::Query.new(
GitlabSchema, GitlabSchema,
......
...@@ -4,12 +4,13 @@ require 'spec_helper' ...@@ -4,12 +4,13 @@ require 'spec_helper'
RSpec.describe BulkImports::Groups::Loaders::GroupLoader do RSpec.describe BulkImports::Groups::Loaders::GroupLoader do
describe '#load' do describe '#load' do
let(:user) { create(:user) } let_it_be(:user) { create(:user) }
let(:data) { { foo: :bar } } let_it_be(:bulk_import) { create(:bulk_import, user: user) }
let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import) }
let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
let(:service_double) { instance_double(::Groups::CreateService) } let(:service_double) { instance_double(::Groups::CreateService) }
let(:bulk_import) { create(:bulk_import, user: user) } let(:data) { { foo: :bar } }
let(:entity) { create(:bulk_import_entity, bulk_import: bulk_import) }
let(:context) { BulkImports::Pipeline::Context.new(entity) }
subject { described_class.new } subject { described_class.new }
......
...@@ -4,10 +4,11 @@ require 'spec_helper' ...@@ -4,10 +4,11 @@ require 'spec_helper'
RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do
describe '#run' do describe '#run' do
let(:user) { create(:user) } let_it_be(:user) { create(:user) }
let(:parent) { create(:group) } let_it_be(:parent) { create(:group) }
let(:bulk_import) { create(:bulk_import, user: user) } let_it_be(:bulk_import) { create(:bulk_import, user: user) }
let(:entity) do
let_it_be(:entity) do
create( create(
:bulk_import_entity, :bulk_import_entity,
bulk_import: bulk_import, bulk_import: bulk_import,
...@@ -17,7 +18,8 @@ RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do ...@@ -17,7 +18,8 @@ RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do
) )
end end
let(:context) { BulkImports::Pipeline::Context.new(entity) } let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
let(:group_data) do let(:group_data) do
{ {
...@@ -37,7 +39,7 @@ RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do ...@@ -37,7 +39,7 @@ RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do
before do before do
allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor| allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor|
allow(extractor).to receive(:extract).and_return([group_data]) allow(extractor).to receive(:extract).and_return(BulkImports::Pipeline::ExtractedData.new(data: group_data))
end end
parent.add_owner(user) parent.add_owner(user)
......
...@@ -3,11 +3,12 @@ ...@@ -3,11 +3,12 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do
let(:user) { create(:user) } let_it_be(:user) { create(:user) }
let(:group) { create(:group) } let_it_be(:group) { create(:group) }
let(:cursor) { 'cursor' } let_it_be(:cursor) { 'cursor' }
let(:timestamp) { Time.new(2020, 01, 01).utc } let_it_be(:timestamp) { Time.new(2020, 01, 01).utc }
let(:entity) do
let_it_be(:entity) do
create( create(
:bulk_import_entity, :bulk_import_entity,
source_full_path: 'source/full/path', source_full_path: 'source/full/path',
...@@ -17,7 +18,8 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do ...@@ -17,7 +18,8 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do
) )
end end
let(:context) { BulkImports::Pipeline::Context.new(entity) } let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
subject { described_class.new(context) } subject { described_class.new(context) }
...@@ -72,8 +74,6 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do ...@@ -72,8 +74,6 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do
subject.after_run(data) subject.after_run(data)
tracker = entity.trackers.find_by(relation: :labels)
expect(tracker.has_next_page).to eq(true) expect(tracker.has_next_page).to eq(true)
expect(tracker.next_page).to eq(cursor) expect(tracker.next_page).to eq(cursor)
end end
...@@ -87,8 +87,6 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do ...@@ -87,8 +87,6 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do
subject.after_run(data) subject.after_run(data)
tracker = entity.trackers.find_by(relation: :labels)
expect(tracker.has_next_page).to eq(false) expect(tracker.has_next_page).to eq(false)
expect(tracker.next_page).to be_nil expect(tracker.next_page).to be_nil
end end
......
...@@ -11,7 +11,8 @@ RSpec.describe BulkImports::Groups::Pipelines::MembersPipeline do ...@@ -11,7 +11,8 @@ RSpec.describe BulkImports::Groups::Pipelines::MembersPipeline do
let_it_be(:cursor) { 'cursor' } let_it_be(:cursor) { 'cursor' }
let_it_be(:bulk_import) { create(:bulk_import, user: user) } let_it_be(:bulk_import) { create(:bulk_import, user: user) }
let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import, group: group) } let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import, group: group) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(entity) } let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
subject { described_class.new(context) } subject { described_class.new(context) }
......
...@@ -9,7 +9,7 @@ RSpec.describe BulkImports::Groups::Pipelines::MilestonesPipeline do ...@@ -9,7 +9,7 @@ RSpec.describe BulkImports::Groups::Pipelines::MilestonesPipeline do
let_it_be(:timestamp) { Time.new(2020, 01, 01).utc } let_it_be(:timestamp) { Time.new(2020, 01, 01).utc }
let_it_be(:bulk_import) { create(:bulk_import, user: user) } let_it_be(:bulk_import) { create(:bulk_import, user: user) }
let(:entity) do let_it_be(:entity) do
create( create(
:bulk_import_entity, :bulk_import_entity,
bulk_import: bulk_import, bulk_import: bulk_import,
...@@ -20,7 +20,8 @@ RSpec.describe BulkImports::Groups::Pipelines::MilestonesPipeline do ...@@ -20,7 +20,8 @@ RSpec.describe BulkImports::Groups::Pipelines::MilestonesPipeline do
) )
end end
let(:context) { BulkImports::Pipeline::Context.new(entity) } let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
subject { described_class.new(context) } subject { described_class.new(context) }
...@@ -84,8 +85,6 @@ RSpec.describe BulkImports::Groups::Pipelines::MilestonesPipeline do ...@@ -84,8 +85,6 @@ RSpec.describe BulkImports::Groups::Pipelines::MilestonesPipeline do
subject.after_run(data) subject.after_run(data)
tracker = entity.trackers.find_by(relation: :milestones)
expect(tracker.has_next_page).to eq(true) expect(tracker.has_next_page).to eq(true)
expect(tracker.next_page).to eq(cursor) expect(tracker.next_page).to eq(cursor)
end end
...@@ -99,8 +98,6 @@ RSpec.describe BulkImports::Groups::Pipelines::MilestonesPipeline do ...@@ -99,8 +98,6 @@ RSpec.describe BulkImports::Groups::Pipelines::MilestonesPipeline do
subject.after_run(data) subject.after_run(data)
tracker = entity.trackers.find_by(relation: :milestones)
expect(tracker.has_next_page).to eq(false) expect(tracker.has_next_page).to eq(false)
expect(tracker.next_page).to be_nil expect(tracker.next_page).to be_nil
end end
......
...@@ -6,19 +6,13 @@ RSpec.describe BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline do ...@@ -6,19 +6,13 @@ RSpec.describe BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline do
let_it_be(:user) { create(:user) } let_it_be(:user) { create(:user) }
let_it_be(:group) { create(:group, path: 'group') } let_it_be(:group) { create(:group, path: 'group') }
let_it_be(:parent) { create(:group, name: 'imported-group', path: 'imported-group') } let_it_be(:parent) { create(:group, name: 'imported-group', path: 'imported-group') }
let(:context) { BulkImports::Pipeline::Context.new(parent_entity) } let_it_be(:parent_entity) { create(:bulk_import_entity, destination_namespace: parent.full_path, group: parent) }
let_it_be(:tracker) { create(:bulk_import_tracker, entity: parent_entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
subject { described_class.new(context) } subject { described_class.new(context) }
describe '#run' do describe '#run' do
let!(:parent_entity) do
create(
:bulk_import_entity,
destination_namespace: parent.full_path,
group: parent
)
end
let(:subgroup_data) do let(:subgroup_data) do
[ [
{ {
......
...@@ -4,11 +4,12 @@ require 'spec_helper' ...@@ -4,11 +4,12 @@ require 'spec_helper'
RSpec.describe BulkImports::Groups::Transformers::GroupAttributesTransformer do RSpec.describe BulkImports::Groups::Transformers::GroupAttributesTransformer do
describe '#transform' do describe '#transform' do
let(:user) { create(:user) } let_it_be(:user) { create(:user) }
let(:parent) { create(:group) } let_it_be(:parent) { create(:group) }
let(:group) { create(:group, name: 'My Source Group', parent: parent) } let_it_be(:group) { create(:group, name: 'My Source Group', parent: parent) }
let(:bulk_import) { create(:bulk_import, user: user) } let_it_be(:bulk_import) { create(:bulk_import, user: user) }
let(:entity) do
let_it_be(:entity) do
create( create(
:bulk_import_entity, :bulk_import_entity,
bulk_import: bulk_import, bulk_import: bulk_import,
...@@ -18,7 +19,8 @@ RSpec.describe BulkImports::Groups::Transformers::GroupAttributesTransformer do ...@@ -18,7 +19,8 @@ RSpec.describe BulkImports::Groups::Transformers::GroupAttributesTransformer do
) )
end end
let(:context) { BulkImports::Pipeline::Context.new(entity) } let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
let(:data) do let(:data) do
{ {
...@@ -82,14 +84,7 @@ RSpec.describe BulkImports::Groups::Transformers::GroupAttributesTransformer do ...@@ -82,14 +84,7 @@ RSpec.describe BulkImports::Groups::Transformers::GroupAttributesTransformer do
context 'when destination namespace is empty' do context 'when destination namespace is empty' do
it 'does not set parent id' do it 'does not set parent id' do
entity = create( entity.update!(destination_namespace: '')
:bulk_import_entity,
bulk_import: bulk_import,
source_full_path: 'source/full/path',
destination_name: group.name,
destination_namespace: ''
)
context = BulkImports::Pipeline::Context.new(entity)
transformed_data = subject.transform(context, data) transformed_data = subject.transform(context, data)
......
...@@ -8,7 +8,8 @@ RSpec.describe BulkImports::Groups::Transformers::MemberAttributesTransformer do ...@@ -8,7 +8,8 @@ RSpec.describe BulkImports::Groups::Transformers::MemberAttributesTransformer do
let_it_be(:group) { create(:group) } let_it_be(:group) { create(:group) }
let_it_be(:bulk_import) { create(:bulk_import, user: user) } let_it_be(:bulk_import) { create(:bulk_import, user: user) }
let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import, group: group) } let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import, group: group) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(entity) } let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
it 'returns nil when receives no data' do it 'returns nil when receives no data' do
expect(subject.transform(context, nil)).to eq(nil) expect(subject.transform(context, nil)).to eq(nil)
......
...@@ -3,18 +3,18 @@ ...@@ -3,18 +3,18 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe BulkImports::Importers::GroupImporter do RSpec.describe BulkImports::Importers::GroupImporter do
let(:user) { create(:user) } let_it_be(:user) { create(:user) }
let(:group) { create(:group) } let_it_be(:group) { create(:group) }
let(:bulk_import) { create(:bulk_import) } let_it_be(:bulk_import) { create(:bulk_import) }
let(:bulk_import_entity) { create(:bulk_import_entity, :started, bulk_import: bulk_import, group: group) } let_it_be(:entity) { create(:bulk_import_entity, :started, group: group) }
let(:bulk_import_configuration) { create(:bulk_import_configuration, bulk_import: bulk_import) } let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity, pipeline_name: described_class.name) }
let(:context) { BulkImports::Pipeline::Context.new(bulk_import_entity) } let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
before do before do
allow(BulkImports::Pipeline::Context).to receive(:new).and_return(context) allow(BulkImports::Pipeline::Context).to receive(:new).and_return(context)
end end
subject { described_class.new(bulk_import_entity) } subject { described_class.new(entity) }
describe '#execute' do describe '#execute' do
it 'starts the entity and run its pipelines' do it 'starts the entity and run its pipelines' do
...@@ -33,18 +33,18 @@ RSpec.describe BulkImports::Importers::GroupImporter do ...@@ -33,18 +33,18 @@ RSpec.describe BulkImports::Importers::GroupImporter do
subject.execute subject.execute
expect(bulk_import_entity.reload).to be_finished expect(entity).to be_finished
end end
context 'when failed' do context 'when failed' do
let(:bulk_import_entity) { create(:bulk_import_entity, :failed, bulk_import: bulk_import, group: group) } let(:entity) { create(:bulk_import_entity, :failed, bulk_import: bulk_import, group: group) }
it 'does not transition entity to finished state' do it 'does not transition entity to finished state' do
allow(bulk_import_entity).to receive(:start!) allow(entity).to receive(:start!)
subject.execute subject.execute
expect(bulk_import_entity.reload).to be_failed expect(entity.reload).to be_failed
end end
end end
end end
......
...@@ -3,29 +3,52 @@ ...@@ -3,29 +3,52 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe BulkImports::Pipeline::Context do RSpec.describe BulkImports::Pipeline::Context do
let(:group) { instance_double(Group) } let_it_be(:user) { create(:user) }
let(:user) { instance_double(User) } let_it_be(:group) { create(:group) }
let(:bulk_import) { instance_double(BulkImport, user: user, configuration: :config) } let_it_be(:bulk_import) { create(:bulk_import, user: user) }
let(:entity) do let_it_be(:entity) do
instance_double( create(
BulkImports::Entity, :bulk_import_entity,
bulk_import: bulk_import, source_full_path: 'source/full/path',
group: group destination_name: 'My Destination Group',
destination_namespace: group.full_path,
group: group,
bulk_import: bulk_import
)
end
let_it_be(:tracker) do
create(
:bulk_import_tracker,
entity: entity,
pipeline_name: described_class.name
) )
end end
subject { described_class.new(entity) } subject { described_class.new(tracker, extra: :data) }
describe '#entity' do
it { expect(subject.entity).to eq(entity) }
end
describe '#group' do describe '#group' do
it { expect(subject.group).to eq(group) } it { expect(subject.group).to eq(group) }
end end
describe '#bulk_import' do
it { expect(subject.bulk_import).to eq(bulk_import) }
end
describe '#current_user' do describe '#current_user' do
it { expect(subject.current_user).to eq(user) } it { expect(subject.current_user).to eq(user) }
end end
describe '#current_user' do describe '#configuration' do
it { expect(subject.configuration).to eq(bulk_import.configuration) } it { expect(subject.configuration).to eq(bulk_import.configuration) }
end end
describe '#extra' do
it { expect(subject.extra).to eq(extra: :data) }
end
end end
...@@ -45,8 +45,9 @@ RSpec.describe BulkImports::Pipeline::Runner do ...@@ -45,8 +45,9 @@ RSpec.describe BulkImports::Pipeline::Runner do
stub_const('BulkImports::MyPipeline', pipeline) stub_const('BulkImports::MyPipeline', pipeline)
end end
let_it_be_with_refind(:entity) { create(:bulk_import_entity) } let_it_be_with_reload(:entity) { create(:bulk_import_entity) }
let(:context) { BulkImports::Pipeline::Context.new(entity, extra: :data) } let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker, extra: :data) }
subject { BulkImports::MyPipeline.new(context) } subject { BulkImports::MyPipeline.new(context) }
...@@ -170,12 +171,7 @@ RSpec.describe BulkImports::Pipeline::Runner do ...@@ -170,12 +171,7 @@ RSpec.describe BulkImports::Pipeline::Runner do
BulkImports::MyPipeline.abort_on_failure! BulkImports::MyPipeline.abort_on_failure!
end end
it 'marks entity as failed' do it 'logs a warn message and marks entity as failed' do
expect { subject.run }
.to change(entity, :status_name).to(:failed)
end
it 'logs warn message' do
expect_next_instance_of(Gitlab::Import::Logger) do |logger| expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger).to receive(:warn) expect(logger).to receive(:warn)
.with( .with(
...@@ -188,6 +184,9 @@ RSpec.describe BulkImports::Pipeline::Runner do ...@@ -188,6 +184,9 @@ RSpec.describe BulkImports::Pipeline::Runner do
end end
subject.run subject.run
expect(entity.status_name).to eq(:failed)
expect(tracker.status_name).to eq(:failed)
end end
end end
...@@ -206,11 +205,11 @@ RSpec.describe BulkImports::Pipeline::Runner do ...@@ -206,11 +205,11 @@ RSpec.describe BulkImports::Pipeline::Runner do
entity.fail_op! entity.fail_op!
expect_next_instance_of(Gitlab::Import::Logger) do |logger| expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger).to receive(:info) expect(logger).to receive(:warn)
.with( .with(
log_params( log_params(
context, context,
message: 'Skipping due to failed pipeline status', message: 'Skipping pipeline due to failed entity',
pipeline_class: 'BulkImports::MyPipeline' pipeline_class: 'BulkImports::MyPipeline'
) )
) )
......
...@@ -3,6 +3,8 @@ ...@@ -3,6 +3,8 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe BulkImports::Pipeline do RSpec.describe BulkImports::Pipeline do
let(:context) { instance_double(BulkImports::Pipeline::Context, tracker: nil) }
before do before do
stub_const('BulkImports::Extractor', Class.new) stub_const('BulkImports::Extractor', Class.new)
stub_const('BulkImports::Transformer', Class.new) stub_const('BulkImports::Transformer', Class.new)
...@@ -44,7 +46,7 @@ RSpec.describe BulkImports::Pipeline do ...@@ -44,7 +46,7 @@ RSpec.describe BulkImports::Pipeline do
end end
it 'returns itself when retrieving extractor & loader' do it 'returns itself when retrieving extractor & loader' do
pipeline = BulkImports::AnotherPipeline.new(nil) pipeline = BulkImports::AnotherPipeline.new(context)
expect(pipeline.send(:extractor)).to eq(pipeline) expect(pipeline.send(:extractor)).to eq(pipeline)
expect(pipeline.send(:loader)).to eq(pipeline) expect(pipeline.send(:loader)).to eq(pipeline)
...@@ -83,7 +85,7 @@ RSpec.describe BulkImports::Pipeline do ...@@ -83,7 +85,7 @@ RSpec.describe BulkImports::Pipeline do
expect(BulkImports::Transformer).to receive(:new).with(foo: :bar) expect(BulkImports::Transformer).to receive(:new).with(foo: :bar)
expect(BulkImports::Loader).to receive(:new).with(foo: :bar) expect(BulkImports::Loader).to receive(:new).with(foo: :bar)
pipeline = BulkImports::MyPipeline.new(nil) pipeline = BulkImports::MyPipeline.new(context)
pipeline.send(:extractor) pipeline.send(:extractor)
pipeline.send(:transformers) pipeline.send(:transformers)
...@@ -109,7 +111,7 @@ RSpec.describe BulkImports::Pipeline do ...@@ -109,7 +111,7 @@ RSpec.describe BulkImports::Pipeline do
expect(BulkImports::Transformer).to receive(:new).with(no_args) expect(BulkImports::Transformer).to receive(:new).with(no_args)
expect(BulkImports::Loader).to receive(:new).with(no_args) expect(BulkImports::Loader).to receive(:new).with(no_args)
pipeline = BulkImports::NoOptionsPipeline.new(nil) pipeline = BulkImports::NoOptionsPipeline.new(context)
pipeline.send(:extractor) pipeline.send(:extractor)
pipeline.send(:transformers) pipeline.send(:transformers)
...@@ -135,7 +137,7 @@ RSpec.describe BulkImports::Pipeline do ...@@ -135,7 +137,7 @@ RSpec.describe BulkImports::Pipeline do
transformer = double transformer = double
allow(BulkImports::Transformer).to receive(:new).and_return(transformer) allow(BulkImports::Transformer).to receive(:new).and_return(transformer)
pipeline = BulkImports::TransformersPipeline.new(nil) pipeline = BulkImports::TransformersPipeline.new(context)
expect(pipeline.send(:transformers)).to eq([pipeline, transformer]) expect(pipeline.send(:transformers)).to eq([pipeline, transformer])
end end
......
...@@ -125,68 +125,4 @@ RSpec.describe BulkImports::Entity, type: :model do ...@@ -125,68 +125,4 @@ RSpec.describe BulkImports::Entity, type: :model do
end end
end end
end end
describe "#update_tracker_for" do
let(:entity) { create(:bulk_import_entity) }
it "inserts new tracker when it does not exist" do
expect do
entity.update_tracker_for(relation: :relation, has_next_page: false)
end.to change(BulkImports::Tracker, :count).by(1)
tracker = entity.trackers.last
expect(tracker.relation).to eq('relation')
expect(tracker.has_next_page).to eq(false)
expect(tracker.next_page).to eq(nil)
end
it "updates the tracker if it already exist" do
create(
:bulk_import_tracker,
relation: :relation,
has_next_page: false,
entity: entity
)
expect do
entity.update_tracker_for(relation: :relation, has_next_page: true, next_page: 'nextPage')
end.not_to change(BulkImports::Tracker, :count)
tracker = entity.trackers.last
expect(tracker.relation).to eq('relation')
expect(tracker.has_next_page).to eq(true)
expect(tracker.next_page).to eq('nextPage')
end
end
describe "#has_next_page?" do
it "queries for the given relation if it has more pages to be fetched" do
entity = create(:bulk_import_entity)
create(
:bulk_import_tracker,
relation: :relation,
has_next_page: false,
entity: entity
)
expect(entity.has_next_page?(:relation)).to eq(false)
end
end
describe "#next_page_for" do
it "queries for the next page of the given relation" do
entity = create(:bulk_import_entity)
create(
:bulk_import_tracker,
relation: :relation,
has_next_page: false,
next_page: 'nextPage',
entity: entity
)
expect(entity.next_page_for(:relation)).to eq('nextPage')
end
end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment