Commit fe9bc7b1 authored by Kassio Borges's avatar Kassio Borges

BulkImports: Track pipeline work with BulkImports::Tracker#status

= Context

Create a database representation, with the `BulkImports::Tracker`, for
each BulkImport Pipeline (`include BulkImports::Pipeline`). This way,
the pipeline progress can be tracked using the `status` column (state
machine).

With this change we can run each pipeline on its own background job
(https://gitlab.com/gitlab-org/gitlab/-/issues/323384), which will bring
the power of retrying the pipeline in case of failures, like rate
limiting.

Besides that, having each pipeline progress in the database will be
handy to create a better UI to give an accurate sense of progress to the
user in the future.

- Merge request that introduced the `status` column to
  `BulkImports::tracker`:
  https://gitlab.com/gitlab-org/gitlab/-/merge_requests/5568

= The change

Before this change, `BulkImports::Tracker` was tracking only the
pagination status, when required. So, a record was only created when the
pipeline required pagination handling.

Now, before running a pipeline a `BulkImports::Tracker` record is
created, and when the pipeline is finished, failed or skipped, these
status are also updated/tracked in the pipeline's `BulkImports::Tracker`
record.

The pipeline status is required to run them in individual background
jobs because the pipelines have an order to run. For instance, we cannot
import Epics before importing the Group or the Group labels.

- Related to: https://gitlab.com/gitlab-org/gitlab/-/issues/324109

= Next step

Create the `BulkImports::PipelineWorker` to run each pipeline on its own
job.

- https://gitlab.com/gitlab-org/gitlab/-/issues/323384

= References

- Epic: https://gitlab.com/groups/gitlab-org/-/epics/5544
- Spike where the idea was tested:
  https://gitlab.com/gitlab-org/gitlab/-/merge_requests/54970
parent f254fe87
......@@ -68,25 +68,6 @@ class BulkImports::Entity < ApplicationRecord
end
end
def update_tracker_for(relation:, has_next_page:, next_page: nil)
attributes = {
relation: relation,
has_next_page: has_next_page,
next_page: next_page,
bulk_import_entity_id: id
}
trackers.upsert(attributes, unique_by: %i[bulk_import_entity_id relation])
end
def has_next_page?(relation)
trackers.find_by(relation: relation)&.has_next_page
end
def next_page_for(relation)
trackers.find_by(relation: relation)&.next_page
end
private
def validate_parent_is_a_group
......
......@@ -3,6 +3,8 @@
class BulkImports::Tracker < ApplicationRecord
self.table_name = 'bulk_import_trackers'
alias_attribute :pipeline_name, :relation
belongs_to :entity,
class_name: 'BulkImports::Entity',
foreign_key: :bulk_import_entity_id,
......@@ -28,6 +30,10 @@ class BulkImports::Tracker < ApplicationRecord
end
event :finish do
# When applying the concurrent model,
# remove the created => finished transaction
# https://gitlab.com/gitlab-org/gitlab/-/issues/323384
transition created: :finished
transition started: :finished
transition failed: :failed
transition skipped: :skipped
......
---
title: 'BulkImports: Track pipeline worker with BulkImports::Tracker#status'
merge_request: 56242
author:
type: changed
......@@ -32,11 +32,10 @@ module EE
def variables(context)
iid = context.extra[:epic_iid]
tracker = "epic_#{iid}_award_emoji"
{
full_path: context.entity.source_full_path,
cursor: context.entity.next_page_for(tracker),
cursor: context.tracker.next_page,
epic_iid: iid
}
end
......
......@@ -34,11 +34,10 @@ module EE
def variables(context)
iid = context.extra[:epic_iid]
tracker = "epic_#{iid}_events"
{
full_path: context.entity.source_full_path,
cursor: context.entity.next_page_for(tracker),
cursor: context.tracker.next_page,
epic_iid: iid
}
end
......
......@@ -61,7 +61,7 @@ module EE
def variables(context)
{
full_path: context.entity.source_full_path,
cursor: context.entity.next_page_for(:epics)
cursor: context.tracker.next_page
}
end
......
......@@ -35,7 +35,7 @@ module EE
def variables(context)
{
full_path: context.entity.source_full_path,
cursor: context.entity.next_page_for(:iterations)
cursor: context.tracker.next_page
}
end
......
......@@ -25,11 +25,7 @@ module EE
end
def after_run(extracted_data)
iid = context.extra[:epic_iid]
tracker = "epic_#{iid}_award_emoji"
context.entity.update_tracker_for(
relation: tracker,
tracker.update(
has_next_page: extracted_data.has_next_page?,
next_page: extracted_data.next_page
)
......
......@@ -48,11 +48,7 @@ module EE
end
def after_run(extracted_data)
iid = context.extra[:epic_iid]
tracker = "epic_#{iid}_events"
context.entity.update_tracker_for(
relation: tracker,
tracker.update(
has_next_page: extracted_data.has_next_page?,
next_page: extracted_data.next_page
)
......
......@@ -24,8 +24,7 @@ module EE
end
def after_run(extracted_data)
context.entity.update_tracker_for(
relation: :epics,
tracker.update(
has_next_page: extracted_data.has_next_page?,
next_page: extracted_data.next_page
)
......
......@@ -21,8 +21,7 @@ module EE
end
def after_run(extracted_data)
context.entity.update_tracker_for(
relation: :iterations,
tracker.update(
has_next_page: extracted_data.has_next_page?,
next_page: extracted_data.next_page
)
......
......@@ -4,7 +4,7 @@ require 'spec_helper'
RSpec.describe EE::BulkImports::Groups::Graphql::GetEpicAwardEmojiQuery do
it 'has a valid query' do
context = BulkImports::Pipeline::Context.new(create(:bulk_import_entity), epic_iid: 1)
context = BulkImports::Pipeline::Context.new(create(:bulk_import_tracker), epic_iid: 1)
result = GitlabSchema.execute(
described_class.to_s,
......
......@@ -4,7 +4,7 @@ require 'spec_helper'
RSpec.describe EE::BulkImports::Groups::Graphql::GetEpicEventsQuery do
it 'has a valid query' do
context = BulkImports::Pipeline::Context.new(create(:bulk_import_entity), epic_iid: 1)
context = BulkImports::Pipeline::Context.new(create(:bulk_import_tracker), epic_iid: 1)
result = GitlabSchema.execute(
described_class.to_s,
......
......@@ -4,7 +4,7 @@ require 'spec_helper'
RSpec.describe EE::BulkImports::Groups::Graphql::GetEpicsQuery do
it 'has a valid query' do
context = BulkImports::Pipeline::Context.new(create(:bulk_import_entity))
context = BulkImports::Pipeline::Context.new(create(:bulk_import_tracker))
result = GitlabSchema.execute(
described_class.to_s,
......
......@@ -4,8 +4,8 @@ require 'spec_helper'
RSpec.describe EE::BulkImports::Groups::Graphql::GetIterationsQuery do
it 'has a valid query' do
entity = create(:bulk_import_entity)
context = BulkImports::Pipeline::Context.new(entity)
tracker = create(:bulk_import_tracker)
context = BulkImports::Pipeline::Context.new(tracker)
query = GraphQL::Query.new(
GitlabSchema,
......
......@@ -9,7 +9,8 @@ RSpec.describe EE::BulkImports::Groups::Loaders::EpicAwardEmojiLoader do
let_it_be(:epic) { create(:epic, group: group, iid: 1) }
let_it_be(:bulk_import) { create(:bulk_import, user: user) }
let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import, group: group) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(entity) }
let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
let_it_be(:data) do
{
......
......@@ -7,8 +7,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline do
let_it_be(:user) { create(:user) }
let_it_be(:group) { create(:group) }
let_it_be(:epic) { create(:epic, group: group) }
let_it_be(:tracker) { "epic_#{epic.iid}_award_emoji" }
let_it_be(:bulk_import) { create(:bulk_import, user: user) }
let_it_be(:entity) do
create(
:bulk_import_entity,
......@@ -20,7 +20,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline do
)
end
let_it_be(:context) { BulkImports::Pipeline::Context.new(entity) }
let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
before do
stub_licensed_features(epics: true)
......@@ -39,7 +40,7 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline do
describe '#run' do
it 'imports epic award emoji' do
data = extractor_data(has_next_page: false, cursor: cursor)
data = extractor_data(has_next_page: false)
allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor|
allow(extractor)
......@@ -61,10 +62,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline do
subject.after_run(data)
page_tracker = entity.trackers.find_by(relation: tracker)
expect(page_tracker.has_next_page).to eq(true)
expect(page_tracker.next_page).to eq(cursor)
expect(tracker.has_next_page).to eq(true)
expect(tracker.next_page).to eq(cursor)
end
end
......@@ -76,10 +75,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline do
subject.after_run(data)
page_tracker = entity.trackers.find_by(relation: tracker)
expect(page_tracker.has_next_page).to eq(false)
expect(page_tracker.next_page).to be_nil
expect(tracker.has_next_page).to eq(false)
expect(tracker.next_page).to be_nil
end
it 'updates context with next epic iid' do
......
......@@ -7,8 +7,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicEventsPipeline do
let_it_be(:user) { create(:user) }
let_it_be(:group) { create(:group) }
let_it_be(:epic) { create(:epic, group: group) }
let_it_be(:tracker) { "epic_#{epic.iid}_events" }
let_it_be(:bulk_import) { create(:bulk_import, user: user) }
let_it_be(:entity) do
create(
:bulk_import_entity,
......@@ -20,7 +20,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicEventsPipeline do
)
end
let_it_be(:context) { BulkImports::Pipeline::Context.new(entity) }
let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
before do
stub_licensed_features(epics: true)
......@@ -39,7 +40,7 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicEventsPipeline do
describe '#run' do
it 'imports epic events and resource state events' do
data = extractor_data(has_next_page: false, cursor: cursor)
data = extractor_data(has_next_page: false)
allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor|
allow(extractor)
......@@ -102,10 +103,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicEventsPipeline do
subject.after_run(data)
page_tracker = entity.trackers.find_by(relation: tracker)
expect(page_tracker.has_next_page).to eq(true)
expect(page_tracker.next_page).to eq(cursor)
expect(tracker.has_next_page).to eq(true)
expect(tracker.next_page).to eq(cursor)
end
end
......@@ -117,10 +116,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicEventsPipeline do
subject.after_run(data)
page_tracker = entity.trackers.find_by(relation: tracker)
expect(page_tracker.has_next_page).to eq(false)
expect(page_tracker.next_page).to be_nil
expect(tracker.has_next_page).to eq(false)
expect(tracker.next_page).to be_nil
end
it 'updates context with next epic iid' do
......
......@@ -6,8 +6,9 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline, :clean_gitlab_
let_it_be(:cursor) { 'cursor' }
let_it_be(:user) { create(:user) }
let_it_be(:group) { create(:group) }
let(:bulk_import) { create(:bulk_import, user: user) }
let(:entity) do
let_it_be(:bulk_import) { create(:bulk_import, user: user) }
let_it_be(:entity) do
create(
:bulk_import_entity,
group: group,
......@@ -18,7 +19,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline, :clean_gitlab_
)
end
let(:context) { BulkImports::Pipeline::Context.new(entity) }
let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
before do
stub_licensed_features(epics: true)
......@@ -116,8 +118,6 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline, :clean_gitlab_
subject.after_run(data)
tracker = entity.trackers.find_by(relation: :epics)
expect(tracker.has_next_page).to eq(true)
expect(tracker.next_page).to eq(cursor)
end
......@@ -131,8 +131,6 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline, :clean_gitlab_
subject.after_run(data)
tracker = entity.trackers.find_by(relation: :epics)
expect(tracker.has_next_page).to eq(false)
expect(tracker.next_page).to be_nil
end
......
......@@ -9,7 +9,7 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::IterationsPipeline do
let_it_be(:timestamp) { Time.new(2020, 01, 01).utc }
let_it_be(:bulk_import) { create(:bulk_import, user: user) }
let(:entity) do
let_it_be(:entity) do
create(
:bulk_import_entity,
bulk_import: bulk_import,
......@@ -20,7 +20,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::IterationsPipeline do
)
end
let(:context) { BulkImports::Pipeline::Context.new(entity) }
let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
subject { described_class.new(context) }
......@@ -85,8 +86,6 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::IterationsPipeline do
subject.after_run(data)
tracker = entity.trackers.find_by(relation: :iterations)
expect(tracker.has_next_page).to eq(true)
expect(tracker.next_page).to eq(cursor)
end
......@@ -100,8 +99,6 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::IterationsPipeline do
subject.after_run(data)
tracker = entity.trackers.find_by(relation: :iterations)
expect(tracker.has_next_page).to eq(false)
expect(tracker.next_page).to be_nil
end
......
......@@ -7,7 +7,8 @@ RSpec.describe EE::BulkImports::Groups::Transformers::EpicAttributesTransformer
let_it_be(:group) { create(:group) }
let_it_be(:bulk_import) { create(:bulk_import, user: importer_user) }
let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import, group: group) }
let_it_be(:context) { ::BulkImports::Pipeline::Context.new(entity) }
let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
describe '#transform' do
it 'transforms the epic attributes' do
......
......@@ -3,14 +3,14 @@
require 'spec_helper'
RSpec.describe BulkImports::Importers::GroupImporter do
let(:user) { create(:user) }
let(:group) { create(:group) }
let(:bulk_import) { create(:bulk_import, user: user) }
let(:bulk_import_entity) { create(:bulk_import_entity, :started, bulk_import: bulk_import, group: group) }
let(:bulk_import_configuration) { create(:bulk_import_configuration, bulk_import: bulk_import) }
let(:context) { BulkImports::Pipeline::Context.new(bulk_import_entity) }
let_it_be(:user) { create(:user) }
let_it_be(:group) { create(:group) }
let_it_be(:bulk_import) { create(:bulk_import, user: user) }
let_it_be(:entity) { create(:bulk_import_entity, :started, bulk_import: bulk_import, group: group) }
let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
subject { described_class.new(bulk_import_entity) }
subject { described_class.new(entity) }
before do
allow(BulkImports::Pipeline::Context).to receive(:new).and_return(context)
......@@ -25,10 +25,12 @@ RSpec.describe BulkImports::Importers::GroupImporter do
expect_to_run_pipeline BulkImports::Groups::Pipelines::MilestonesPipeline, context: context
expect_to_run_pipeline EE::BulkImports::Groups::Pipelines::EpicsPipeline, context: context
expect_to_run_pipeline EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline, context: context
expect_to_run_pipeline EE::BulkImports::Groups::Pipelines::EpicEventsPipeline, context: context
expect_to_run_pipeline EE::BulkImports::Groups::Pipelines::IterationsPipeline, context: context
subject.execute
expect(bulk_import_entity.reload).to be_finished
expect(entity.reload).to be_finished
end
end
......
......@@ -31,7 +31,7 @@ module BulkImports
def variables(context)
{
full_path: context.entity.source_full_path,
cursor: context.entity.next_page_for(:labels)
cursor: context.tracker.next_page
}
end
......
......@@ -34,7 +34,7 @@ module BulkImports
def variables(context)
{
full_path: context.entity.source_full_path,
cursor: context.entity.next_page_for(:group_members)
cursor: context.tracker.next_page
}
end
......
......@@ -33,7 +33,7 @@ module BulkImports
def variables(context)
{
full_path: context.entity.source_full_path,
cursor: context.entity.next_page_for(:milestones)
cursor: context.tracker.next_page
}
end
......
......@@ -16,8 +16,7 @@ module BulkImports
end
def after_run(extracted_data)
context.entity.update_tracker_for(
relation: :labels,
tracker.update(
has_next_page: extracted_data.has_next_page?,
next_page: extracted_data.next_page
)
......
......@@ -19,8 +19,7 @@ module BulkImports
end
def after_run(extracted_data)
context.entity.update_tracker_for(
relation: :group_members,
tracker.update(
has_next_page: extracted_data.has_next_page?,
next_page: extracted_data.next_page
)
......
......@@ -20,8 +20,7 @@ module BulkImports
end
def after_run(extracted_data)
context.entity.update_tracker_for(
relation: :milestones,
tracker.update(
has_next_page: extracted_data.has_next_page?,
next_page: extracted_data.next_page
)
......
......@@ -8,9 +8,18 @@ module BulkImports
end
def execute
context = BulkImports::Pipeline::Context.new(entity)
pipelines.each.with_index do |pipeline, stage|
pipeline_tracker = entity.trackers.create!(
pipeline_name: pipeline,
stage: stage
)
pipelines.each { |pipeline| pipeline.new(context).run }
context = BulkImports::Pipeline::Context.new(pipeline_tracker)
pipeline.new(context).run
pipeline_tracker.finish!
end
entity.finish!
end
......
......@@ -15,6 +15,10 @@ module BulkImports
@context = context
end
def tracker
@tracker ||= context.tracker
end
included do
private
......
......@@ -3,25 +3,33 @@
module BulkImports
module Pipeline
class Context
attr_reader :entity, :bulk_import
attr_accessor :extra
def initialize(entity, extra = {})
@entity = entity
@bulk_import = entity.bulk_import
attr_reader :tracker
def initialize(tracker, extra = {})
@tracker = tracker
@extra = extra
end
def entity
@entity ||= tracker.entity
end
def group
entity.group
@group ||= entity.group
end
def bulk_import
@bulk_import ||= entity.bulk_import
end
def current_user
bulk_import.user
@current_user ||= bulk_import.user
end
def configuration
bulk_import.configuration
@configuration ||= bulk_import.configuration
end
end
end
......
......@@ -11,11 +11,14 @@ module BulkImports
end
def has_next_page?
@page_info['has_next_page']
Gitlab::Utils.to_boolean(
@page_info&.dig('has_next_page'),
default: false
)
end
def next_page
@page_info['end_cursor']
@page_info&.dig('end_cursor')
end
def each(&block)
......
......@@ -26,7 +26,7 @@ module BulkImports
end
end
if respond_to?(:after_run)
if extracted_data && respond_to?(:after_run)
run_pipeline_step(:after_run) do
after_run(extracted_data)
end
......@@ -34,7 +34,7 @@ module BulkImports
info(message: 'Pipeline finished')
rescue MarkedAsFailedError
log_skip
skip!('Skipping pipeline due to failed entity')
end
private # rubocop:disable Lint/UselessAccessModifier
......@@ -46,7 +46,11 @@ module BulkImports
yield
rescue MarkedAsFailedError
log_skip(step => class_name)
skip!(
'Skipping pipeline due to failed entity',
pipeline_step: step,
step_class: class_name
)
rescue => e
log_import_failure(e, step)
......@@ -65,10 +69,13 @@ module BulkImports
warn(message: 'Pipeline failed')
context.entity.fail_op!
tracker.fail_op!
end
def log_skip(extra = {})
info({ message: 'Skipping due to failed pipeline status' }.merge(extra))
def skip!(message, extra = {})
warn({ message: message }.merge(extra))
tracker.skip!
end
def log_import_failure(exception, step)
......
......@@ -8,7 +8,8 @@ RSpec.describe BulkImports::Common::Transformers::UserReferenceTransformer do
let_it_be(:group) { create(:group) }
let_it_be(:bulk_import) { create(:bulk_import) }
let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import, group: group) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(entity) }
let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
let(:hash) do
{
......
......@@ -8,8 +8,9 @@ RSpec.describe BulkImports::Groups::Extractors::SubgroupsExtractor do
bulk_import = create(:bulk_import)
create(:bulk_import_configuration, bulk_import: bulk_import)
entity = create(:bulk_import_entity, bulk_import: bulk_import)
tracker = create(:bulk_import_tracker, entity: entity)
response = [{ 'test' => 'group' }]
context = BulkImports::Pipeline::Context.new(entity)
context = BulkImports::Pipeline::Context.new(tracker)
allow_next_instance_of(BulkImports::Clients::Http) do |client|
allow(client).to receive(:each_page).and_return(response)
......
......@@ -4,10 +4,10 @@ require 'spec_helper'
RSpec.describe BulkImports::Groups::Graphql::GetGroupQuery do
describe '#variables' do
let(:entity) { double(source_full_path: 'test', bulk_import: nil) }
let(:context) { BulkImports::Pipeline::Context.new(entity) }
it 'returns query variables based on entity information' do
entity = double(source_full_path: 'test', bulk_import: nil)
tracker = double(entity: entity)
context = BulkImports::Pipeline::Context.new(tracker)
expected = { full_path: entity.source_full_path }
expect(described_class.variables(context)).to eq(expected)
......
......@@ -4,8 +4,8 @@ require 'spec_helper'
RSpec.describe BulkImports::Groups::Graphql::GetLabelsQuery do
it 'has a valid query' do
entity = create(:bulk_import_entity)
context = BulkImports::Pipeline::Context.new(entity)
tracker = create(:bulk_import_tracker)
context = BulkImports::Pipeline::Context.new(tracker)
query = GraphQL::Query.new(
GitlabSchema,
......
......@@ -4,8 +4,8 @@ require 'spec_helper'
RSpec.describe BulkImports::Groups::Graphql::GetMembersQuery do
it 'has a valid query' do
entity = create(:bulk_import_entity)
context = BulkImports::Pipeline::Context.new(entity)
tracker = create(:bulk_import_tracker)
context = BulkImports::Pipeline::Context.new(tracker)
query = GraphQL::Query.new(
GitlabSchema,
......
......@@ -4,8 +4,8 @@ require 'spec_helper'
RSpec.describe BulkImports::Groups::Graphql::GetMilestonesQuery do
it 'has a valid query' do
entity = create(:bulk_import_entity)
context = BulkImports::Pipeline::Context.new(entity)
tracker = create(:bulk_import_tracker)
context = BulkImports::Pipeline::Context.new(tracker)
query = GraphQL::Query.new(
GitlabSchema,
......
......@@ -4,12 +4,13 @@ require 'spec_helper'
RSpec.describe BulkImports::Groups::Loaders::GroupLoader do
describe '#load' do
let(:user) { create(:user) }
let(:data) { { foo: :bar } }
let_it_be(:user) { create(:user) }
let_it_be(:bulk_import) { create(:bulk_import, user: user) }
let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import) }
let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
let(:service_double) { instance_double(::Groups::CreateService) }
let(:bulk_import) { create(:bulk_import, user: user) }
let(:entity) { create(:bulk_import_entity, bulk_import: bulk_import) }
let(:context) { BulkImports::Pipeline::Context.new(entity) }
let(:data) { { foo: :bar } }
subject { described_class.new }
......
......@@ -4,10 +4,11 @@ require 'spec_helper'
RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do
describe '#run' do
let(:user) { create(:user) }
let(:parent) { create(:group) }
let(:bulk_import) { create(:bulk_import, user: user) }
let(:entity) do
let_it_be(:user) { create(:user) }
let_it_be(:parent) { create(:group) }
let_it_be(:bulk_import) { create(:bulk_import, user: user) }
let_it_be(:entity) do
create(
:bulk_import_entity,
bulk_import: bulk_import,
......@@ -17,7 +18,8 @@ RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do
)
end
let(:context) { BulkImports::Pipeline::Context.new(entity) }
let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
let(:group_data) do
{
......@@ -37,7 +39,7 @@ RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do
before do
allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor|
allow(extractor).to receive(:extract).and_return([group_data])
allow(extractor).to receive(:extract).and_return(BulkImports::Pipeline::ExtractedData.new(data: group_data))
end
parent.add_owner(user)
......
......@@ -3,11 +3,12 @@
require 'spec_helper'
RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do
let(:user) { create(:user) }
let(:group) { create(:group) }
let(:cursor) { 'cursor' }
let(:timestamp) { Time.new(2020, 01, 01).utc }
let(:entity) do
let_it_be(:user) { create(:user) }
let_it_be(:group) { create(:group) }
let_it_be(:cursor) { 'cursor' }
let_it_be(:timestamp) { Time.new(2020, 01, 01).utc }
let_it_be(:entity) do
create(
:bulk_import_entity,
source_full_path: 'source/full/path',
......@@ -17,7 +18,8 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do
)
end
let(:context) { BulkImports::Pipeline::Context.new(entity) }
let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
subject { described_class.new(context) }
......@@ -72,8 +74,6 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do
subject.after_run(data)
tracker = entity.trackers.find_by(relation: :labels)
expect(tracker.has_next_page).to eq(true)
expect(tracker.next_page).to eq(cursor)
end
......@@ -87,8 +87,6 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do
subject.after_run(data)
tracker = entity.trackers.find_by(relation: :labels)
expect(tracker.has_next_page).to eq(false)
expect(tracker.next_page).to be_nil
end
......
......@@ -11,7 +11,8 @@ RSpec.describe BulkImports::Groups::Pipelines::MembersPipeline do
let_it_be(:cursor) { 'cursor' }
let_it_be(:bulk_import) { create(:bulk_import, user: user) }
let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import, group: group) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(entity) }
let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
subject { described_class.new(context) }
......
......@@ -9,7 +9,7 @@ RSpec.describe BulkImports::Groups::Pipelines::MilestonesPipeline do
let_it_be(:timestamp) { Time.new(2020, 01, 01).utc }
let_it_be(:bulk_import) { create(:bulk_import, user: user) }
let(:entity) do
let_it_be(:entity) do
create(
:bulk_import_entity,
bulk_import: bulk_import,
......@@ -20,7 +20,8 @@ RSpec.describe BulkImports::Groups::Pipelines::MilestonesPipeline do
)
end
let(:context) { BulkImports::Pipeline::Context.new(entity) }
let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
subject { described_class.new(context) }
......@@ -84,8 +85,6 @@ RSpec.describe BulkImports::Groups::Pipelines::MilestonesPipeline do
subject.after_run(data)
tracker = entity.trackers.find_by(relation: :milestones)
expect(tracker.has_next_page).to eq(true)
expect(tracker.next_page).to eq(cursor)
end
......@@ -99,8 +98,6 @@ RSpec.describe BulkImports::Groups::Pipelines::MilestonesPipeline do
subject.after_run(data)
tracker = entity.trackers.find_by(relation: :milestones)
expect(tracker.has_next_page).to eq(false)
expect(tracker.next_page).to be_nil
end
......
......@@ -6,19 +6,13 @@ RSpec.describe BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline do
let_it_be(:user) { create(:user) }
let_it_be(:group) { create(:group, path: 'group') }
let_it_be(:parent) { create(:group, name: 'imported-group', path: 'imported-group') }
let(:context) { BulkImports::Pipeline::Context.new(parent_entity) }
let_it_be(:parent_entity) { create(:bulk_import_entity, destination_namespace: parent.full_path, group: parent) }
let_it_be(:tracker) { create(:bulk_import_tracker, entity: parent_entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
subject { described_class.new(context) }
describe '#run' do
let!(:parent_entity) do
create(
:bulk_import_entity,
destination_namespace: parent.full_path,
group: parent
)
end
let(:subgroup_data) do
[
{
......
......@@ -4,11 +4,12 @@ require 'spec_helper'
RSpec.describe BulkImports::Groups::Transformers::GroupAttributesTransformer do
describe '#transform' do
let(:user) { create(:user) }
let(:parent) { create(:group) }
let(:group) { create(:group, name: 'My Source Group', parent: parent) }
let(:bulk_import) { create(:bulk_import, user: user) }
let(:entity) do
let_it_be(:user) { create(:user) }
let_it_be(:parent) { create(:group) }
let_it_be(:group) { create(:group, name: 'My Source Group', parent: parent) }
let_it_be(:bulk_import) { create(:bulk_import, user: user) }
let_it_be(:entity) do
create(
:bulk_import_entity,
bulk_import: bulk_import,
......@@ -18,7 +19,8 @@ RSpec.describe BulkImports::Groups::Transformers::GroupAttributesTransformer do
)
end
let(:context) { BulkImports::Pipeline::Context.new(entity) }
let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
let(:data) do
{
......@@ -82,14 +84,7 @@ RSpec.describe BulkImports::Groups::Transformers::GroupAttributesTransformer do
context 'when destination namespace is empty' do
it 'does not set parent id' do
entity = create(
:bulk_import_entity,
bulk_import: bulk_import,
source_full_path: 'source/full/path',
destination_name: group.name,
destination_namespace: ''
)
context = BulkImports::Pipeline::Context.new(entity)
entity.update!(destination_namespace: '')
transformed_data = subject.transform(context, data)
......
......@@ -8,7 +8,8 @@ RSpec.describe BulkImports::Groups::Transformers::MemberAttributesTransformer do
let_it_be(:group) { create(:group) }
let_it_be(:bulk_import) { create(:bulk_import, user: user) }
let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import, group: group) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(entity) }
let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
it 'returns nil when receives no data' do
expect(subject.transform(context, nil)).to eq(nil)
......
......@@ -3,18 +3,18 @@
require 'spec_helper'
RSpec.describe BulkImports::Importers::GroupImporter do
let(:user) { create(:user) }
let(:group) { create(:group) }
let(:bulk_import) { create(:bulk_import) }
let(:bulk_import_entity) { create(:bulk_import_entity, :started, bulk_import: bulk_import, group: group) }
let(:bulk_import_configuration) { create(:bulk_import_configuration, bulk_import: bulk_import) }
let(:context) { BulkImports::Pipeline::Context.new(bulk_import_entity) }
let_it_be(:user) { create(:user) }
let_it_be(:group) { create(:group) }
let_it_be(:bulk_import) { create(:bulk_import) }
let_it_be(:entity) { create(:bulk_import_entity, :started, group: group) }
let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity, pipeline_name: described_class.name) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
before do
allow(BulkImports::Pipeline::Context).to receive(:new).and_return(context)
end
subject { described_class.new(bulk_import_entity) }
subject { described_class.new(entity) }
describe '#execute' do
it 'starts the entity and run its pipelines' do
......@@ -33,18 +33,18 @@ RSpec.describe BulkImports::Importers::GroupImporter do
subject.execute
expect(bulk_import_entity.reload).to be_finished
expect(entity).to be_finished
end
context 'when failed' do
let(:bulk_import_entity) { create(:bulk_import_entity, :failed, bulk_import: bulk_import, group: group) }
let(:entity) { create(:bulk_import_entity, :failed, bulk_import: bulk_import, group: group) }
it 'does not transition entity to finished state' do
allow(bulk_import_entity).to receive(:start!)
allow(entity).to receive(:start!)
subject.execute
expect(bulk_import_entity.reload).to be_failed
expect(entity.reload).to be_failed
end
end
end
......
......@@ -3,29 +3,52 @@
require 'spec_helper'
RSpec.describe BulkImports::Pipeline::Context do
let(:group) { instance_double(Group) }
let(:user) { instance_double(User) }
let(:bulk_import) { instance_double(BulkImport, user: user, configuration: :config) }
let(:entity) do
instance_double(
BulkImports::Entity,
bulk_import: bulk_import,
group: group
let_it_be(:user) { create(:user) }
let_it_be(:group) { create(:group) }
let_it_be(:bulk_import) { create(:bulk_import, user: user) }
let_it_be(:entity) do
create(
:bulk_import_entity,
source_full_path: 'source/full/path',
destination_name: 'My Destination Group',
destination_namespace: group.full_path,
group: group,
bulk_import: bulk_import
)
end
let_it_be(:tracker) do
create(
:bulk_import_tracker,
entity: entity,
pipeline_name: described_class.name
)
end
subject { described_class.new(entity) }
subject { described_class.new(tracker, extra: :data) }
describe '#entity' do
it { expect(subject.entity).to eq(entity) }
end
describe '#group' do
it { expect(subject.group).to eq(group) }
end
describe '#bulk_import' do
it { expect(subject.bulk_import).to eq(bulk_import) }
end
describe '#current_user' do
it { expect(subject.current_user).to eq(user) }
end
describe '#current_user' do
describe '#configuration' do
it { expect(subject.configuration).to eq(bulk_import.configuration) }
end
describe '#extra' do
it { expect(subject.extra).to eq(extra: :data) }
end
end
......@@ -45,8 +45,9 @@ RSpec.describe BulkImports::Pipeline::Runner do
stub_const('BulkImports::MyPipeline', pipeline)
end
let_it_be_with_refind(:entity) { create(:bulk_import_entity) }
let(:context) { BulkImports::Pipeline::Context.new(entity, extra: :data) }
let_it_be_with_reload(:entity) { create(:bulk_import_entity) }
let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker, extra: :data) }
subject { BulkImports::MyPipeline.new(context) }
......@@ -170,12 +171,7 @@ RSpec.describe BulkImports::Pipeline::Runner do
BulkImports::MyPipeline.abort_on_failure!
end
it 'marks entity as failed' do
expect { subject.run }
.to change(entity, :status_name).to(:failed)
end
it 'logs warn message' do
it 'logs a warn message and marks entity as failed' do
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger).to receive(:warn)
.with(
......@@ -188,6 +184,9 @@ RSpec.describe BulkImports::Pipeline::Runner do
end
subject.run
expect(entity.status_name).to eq(:failed)
expect(tracker.status_name).to eq(:failed)
end
end
......@@ -206,11 +205,11 @@ RSpec.describe BulkImports::Pipeline::Runner do
entity.fail_op!
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger).to receive(:info)
expect(logger).to receive(:warn)
.with(
log_params(
context,
message: 'Skipping due to failed pipeline status',
message: 'Skipping pipeline due to failed entity',
pipeline_class: 'BulkImports::MyPipeline'
)
)
......
......@@ -3,6 +3,8 @@
require 'spec_helper'
RSpec.describe BulkImports::Pipeline do
let(:context) { instance_double(BulkImports::Pipeline::Context, tracker: nil) }
before do
stub_const('BulkImports::Extractor', Class.new)
stub_const('BulkImports::Transformer', Class.new)
......@@ -44,7 +46,7 @@ RSpec.describe BulkImports::Pipeline do
end
it 'returns itself when retrieving extractor & loader' do
pipeline = BulkImports::AnotherPipeline.new(nil)
pipeline = BulkImports::AnotherPipeline.new(context)
expect(pipeline.send(:extractor)).to eq(pipeline)
expect(pipeline.send(:loader)).to eq(pipeline)
......@@ -83,7 +85,7 @@ RSpec.describe BulkImports::Pipeline do
expect(BulkImports::Transformer).to receive(:new).with(foo: :bar)
expect(BulkImports::Loader).to receive(:new).with(foo: :bar)
pipeline = BulkImports::MyPipeline.new(nil)
pipeline = BulkImports::MyPipeline.new(context)
pipeline.send(:extractor)
pipeline.send(:transformers)
......@@ -109,7 +111,7 @@ RSpec.describe BulkImports::Pipeline do
expect(BulkImports::Transformer).to receive(:new).with(no_args)
expect(BulkImports::Loader).to receive(:new).with(no_args)
pipeline = BulkImports::NoOptionsPipeline.new(nil)
pipeline = BulkImports::NoOptionsPipeline.new(context)
pipeline.send(:extractor)
pipeline.send(:transformers)
......@@ -135,7 +137,7 @@ RSpec.describe BulkImports::Pipeline do
transformer = double
allow(BulkImports::Transformer).to receive(:new).and_return(transformer)
pipeline = BulkImports::TransformersPipeline.new(nil)
pipeline = BulkImports::TransformersPipeline.new(context)
expect(pipeline.send(:transformers)).to eq([pipeline, transformer])
end
......
......@@ -125,68 +125,4 @@ RSpec.describe BulkImports::Entity, type: :model do
end
end
end
describe "#update_tracker_for" do
let(:entity) { create(:bulk_import_entity) }
it "inserts new tracker when it does not exist" do
expect do
entity.update_tracker_for(relation: :relation, has_next_page: false)
end.to change(BulkImports::Tracker, :count).by(1)
tracker = entity.trackers.last
expect(tracker.relation).to eq('relation')
expect(tracker.has_next_page).to eq(false)
expect(tracker.next_page).to eq(nil)
end
it "updates the tracker if it already exist" do
create(
:bulk_import_tracker,
relation: :relation,
has_next_page: false,
entity: entity
)
expect do
entity.update_tracker_for(relation: :relation, has_next_page: true, next_page: 'nextPage')
end.not_to change(BulkImports::Tracker, :count)
tracker = entity.trackers.last
expect(tracker.relation).to eq('relation')
expect(tracker.has_next_page).to eq(true)
expect(tracker.next_page).to eq('nextPage')
end
end
describe "#has_next_page?" do
it "queries for the given relation if it has more pages to be fetched" do
entity = create(:bulk_import_entity)
create(
:bulk_import_tracker,
relation: :relation,
has_next_page: false,
entity: entity
)
expect(entity.has_next_page?(:relation)).to eq(false)
end
end
describe "#next_page_for" do
it "queries for the next page of the given relation" do
entity = create(:bulk_import_entity)
create(
:bulk_import_tracker,
relation: :relation,
has_next_page: false,
next_page: 'nextPage',
entity: entity
)
expect(entity.next_page_for(:relation)).to eq('nextPage')
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment