Commit a381517d authored by George Koltsov's avatar George Koltsov

Process one record at a time in Bulk Import pipelines

  - Update BulkImports pipelines to process one record
  at a time, instead of operating on a whole collection
  - It is useful when during a transformation of a record
  we need to check if a previously processed record is
  persisted in db or not
parent ed29856f
---
title: Process one record at a time in Bulk Import pipelines
merge_request: 52330
author:
type: changed
...@@ -16,22 +16,22 @@ module EE ...@@ -16,22 +16,22 @@ module EE
first: 100, first: 100,
after: $cursor after: $cursor
) { ) {
pageInfo { page_info: pageInfo {
endCursor end_cursor: endCursor
hasNextPage has_next_page: hasNextPage
} }
nodes { nodes {
title title
description description
state state
createdAt created_at: createdAt
closedAt closed_at: closedAt
startDate start_date: startDate
startDateFixed start_date_fixed: startDateFixed
startDateIsFixed start_date_is_fixed: startDateIsFixed
dueDateFixed due_date_fixed: dueDateFixed
dueDateIsFixed due_date_is_fixed: dueDateIsFixed
relativePosition relative_position: relativePosition
confidential confidential
} }
} }
...@@ -46,6 +46,18 @@ module EE ...@@ -46,6 +46,18 @@ module EE
cursor: entity.next_page_for(:epics) cursor: entity.next_page_for(:epics)
} }
end end
def base_path
%w[data group epics]
end
def data_path
base_path << 'nodes'
end
def page_info_path
base_path << 'page_info'
end
end end
end end
end end
......
...@@ -10,19 +10,11 @@ module EE ...@@ -10,19 +10,11 @@ module EE
end end
def load(context, data) def load(context, data)
Array.wrap(data['nodes']).each do |args| ::Epics::CreateService.new(
::Epics::CreateService.new( context.entity.group,
context.entity.group, context.current_user,
context.current_user, data
args ).execute
).execute
end
context.entity.update_tracker_for(
relation: :epics,
has_next_page: data.dig('page_info', 'has_next_page'),
next_page: data.dig('page_info', 'end_cursor')
)
end end
end end
end end
......
...@@ -10,14 +10,18 @@ module EE ...@@ -10,14 +10,18 @@ module EE
extractor ::BulkImports::Common::Extractors::GraphqlExtractor, extractor ::BulkImports::Common::Extractors::GraphqlExtractor,
query: EE::BulkImports::Groups::Graphql::GetEpicsQuery query: EE::BulkImports::Groups::Graphql::GetEpicsQuery
transformer ::BulkImports::Common::Transformers::HashKeyDigger, key_path: %w[data group epics]
transformer ::BulkImports::Common::Transformers::UnderscorifyKeysTransformer
transformer ::BulkImports::Common::Transformers::ProhibitedAttributesTransformer transformer ::BulkImports::Common::Transformers::ProhibitedAttributesTransformer
loader EE::BulkImports::Groups::Loaders::EpicsLoader loader EE::BulkImports::Groups::Loaders::EpicsLoader
def after_run(context) def after_run(context, extracted_data)
if context.entity.has_next_page?(:epics) context.entity.update_tracker_for(
relation: :epics,
has_next_page: extracted_data.has_next_page?,
next_page: extracted_data.next_page
)
if extracted_data.has_next_page?
run(context) run(context)
end end
end end
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe EE::BulkImports::Groups::Graphql::GetEpicsQuery do
describe '#variables' do
let(:entity) { double(source_full_path: 'test', next_page_for: 'next_page') }
it 'returns query variables based on entity information' do
expected = { full_path: entity.source_full_path, cursor: entity.next_page_for }
expect(described_class.variables(entity)).to eq(expected)
end
end
describe '#data_path' do
it 'returns data path' do
expected = %w[data group epics nodes]
expect(described_class.data_path).to eq(expected)
end
end
describe '#page_info_path' do
it 'returns pagination information path' do
expected = %w[data group epics page_info]
expect(described_class.page_info_path).to eq(expected)
end
end
end
...@@ -16,35 +16,18 @@ RSpec.describe EE::BulkImports::Groups::Loaders::EpicsLoader do ...@@ -16,35 +16,18 @@ RSpec.describe EE::BulkImports::Groups::Loaders::EpicsLoader do
let(:data) do let(:data) do
{ {
'page_info' => { 'title' => 'epic1',
'end_cursor' => 'endCursorValue', 'state' => 'opened',
'has_next_page' => true 'confidential' => false
},
'nodes' => [
{
'title' => 'epic1',
'state' => 'opened',
'confidential' => false
},
{
'title' => 'epic2',
'state' => 'closed',
'confidential' => true
}
]
} }
end end
subject { described_class.new } subject { described_class.new }
it 'creates the epics and update the entity tracker' do it 'creates the epic' do
expect { subject.load(context, data) }.to change(::Epic, :count).by(2) expect { subject.load(context, data) }.to change(::Epic, :count).by(1)
tracker = entity.trackers.last expect(group.epics.count).to eq(1)
expect(group.epics.count).to eq(2)
expect(tracker.has_next_page).to eq(true)
expect(tracker.next_page).to eq('endCursorValue')
end end
end end
end end
...@@ -3,30 +3,29 @@ ...@@ -3,30 +3,29 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline do RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline do
describe '#run' do let(:user) { create(:user) }
let(:user) { create(:user) } let(:group) { create(:group) }
let(:group) { create(:group) } let(:cursor) { 'cursor' }
let(:entity) do let(:entity) do
create( create(
:bulk_import_entity, :bulk_import_entity,
source_full_path: 'source/full/path', source_full_path: 'source/full/path',
destination_name: 'My Destination Group', destination_name: 'My Destination Group',
destination_namespace: group.full_path, destination_namespace: group.full_path,
group: group group: group
) )
end end
let(:context) do
BulkImports::Pipeline::Context.new(
current_user: user,
entity: entity
)
end
subject { described_class.new } let(:context) do
BulkImports::Pipeline::Context.new(
current_user: user,
entity: entity
)
end
describe '#run' do
it 'imports group epics into destination group' do it 'imports group epics into destination group' do
first_page = extractor_data(has_next_page: true, cursor: 'nextPageCursor') first_page = extractor_data(has_next_page: true, cursor: cursor)
last_page = extractor_data(has_next_page: false) last_page = extractor_data(has_next_page: false)
allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor| allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor|
...@@ -39,6 +38,38 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline do ...@@ -39,6 +38,38 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline do
end end
end end
describe '#after_run' do
context 'when extracted data has next page' do
it 'updates tracker information and runs pipeline again' do
data = extractor_data(has_next_page: true, cursor: cursor)
expect(subject).to receive(:run).with(context)
subject.after_run(context, data)
tracker = entity.trackers.find_by(relation: :epics)
expect(tracker.has_next_page).to eq(true)
expect(tracker.next_page).to eq(cursor)
end
end
context 'when extracted data has no next page' do
it 'updates tracker information and does not run pipeline' do
data = extractor_data(has_next_page: false)
expect(subject).not_to receive(:run).with(context)
subject.after_run(context, data)
tracker = entity.trackers.find_by(relation: :epics)
expect(tracker.has_next_page).to eq(false)
expect(tracker.next_page).to be_nil
end
end
end
describe 'pipeline parts' do describe 'pipeline parts' do
it { expect(described_class).to include_module(BulkImports::Pipeline) } it { expect(described_class).to include_module(BulkImports::Pipeline) }
it { expect(described_class).to include_module(BulkImports::Pipeline::Runner) } it { expect(described_class).to include_module(BulkImports::Pipeline::Runner) }
...@@ -56,8 +87,6 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline do ...@@ -56,8 +87,6 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline do
it 'has transformers' do it 'has transformers' do
expect(described_class.transformers) expect(described_class.transformers)
.to contain_exactly( .to contain_exactly(
{ klass: BulkImports::Common::Transformers::HashKeyDigger, options: { key_path: %w[data group epics] } },
{ klass: BulkImports::Common::Transformers::UnderscorifyKeysTransformer, options: nil },
{ klass: BulkImports::Common::Transformers::ProhibitedAttributesTransformer, options: nil } { klass: BulkImports::Common::Transformers::ProhibitedAttributesTransformer, options: nil }
) )
end end
...@@ -68,26 +97,19 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline do ...@@ -68,26 +97,19 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline do
end end
def extractor_data(has_next_page:, cursor: nil) def extractor_data(has_next_page:, cursor: nil)
[ data = [
{ {
'data' => { 'title' => 'epic1',
'group' => { 'state' => 'closed',
'epics' => { 'confidential' => true
'page_info' => {
'end_cursor' => cursor,
'has_next_page' => has_next_page
},
'nodes' => [
{
'title' => 'epic1',
'state' => 'closed',
'confidential' => true
}
]
}
}
}
} }
] ]
page_info = {
'end_cursor' => cursor,
'has_next_page' => has_next_page
}
BulkImports::Pipeline::ExtractedData.new(data: data, page_info: page_info)
end end
end end
...@@ -4,17 +4,22 @@ module BulkImports ...@@ -4,17 +4,22 @@ module BulkImports
module Common module Common
module Extractors module Extractors
class GraphqlExtractor class GraphqlExtractor
def initialize(query) def initialize(options = {})
@query = query[:query] @query = options[:query]
end end
def extract(context) def extract(context)
client = graphql_client(context) client = graphql_client(context)
client.execute( response = client.execute(
client.parse(query.to_s), client.parse(query.to_s),
query.variables(context.entity) query.variables(context.entity)
).original_hash.deep_dup ).original_hash.deep_dup
BulkImports::Pipeline::ExtractedData.new(
data: response.dig(*query.data_path),
page_info: response.dig(*query.page_info_path)
)
end end
private private
...@@ -27,10 +32,6 @@ module BulkImports ...@@ -27,10 +32,6 @@ module BulkImports
token: context.configuration.access_token token: context.configuration.access_token
) )
end end
def parsed_query
@parsed_query ||= graphql_client.parse(query.to_s)
end
end end
end end
end end
......
# frozen_string_literal: true
module BulkImports
module Common
module Transformers
class HashKeyDigger
def initialize(options = {})
@key_path = options[:key_path]
end
def transform(_, data)
raise ArgumentError, "Given data must be a Hash" unless data.is_a?(Hash)
data.dig(*Array.wrap(key_path))
end
private
attr_reader :key_path
end
end
end
end
# frozen_string_literal: true
module BulkImports
module Common
module Transformers
class UnderscorifyKeysTransformer
def initialize(options = {})
@options = options
end
def transform(_, data)
data.deep_transform_keys do |key|
key.to_s.underscore
end
end
end
end
end
end
...@@ -9,9 +9,11 @@ module BulkImports ...@@ -9,9 +9,11 @@ module BulkImports
def extract(context) def extract(context)
encoded_parent_path = ERB::Util.url_encode(context.entity.source_full_path) encoded_parent_path = ERB::Util.url_encode(context.entity.source_full_path)
http_client(context.entity.bulk_import.configuration) response = http_client(context.entity.bulk_import.configuration)
.each_page(:get, "groups/#{encoded_parent_path}/subgroups") .each_page(:get, "groups/#{encoded_parent_path}/subgroups")
.flat_map(&:itself) .flat_map(&:itself)
BulkImports::Pipeline::ExtractedData.new(data: response)
end end
private private
......
...@@ -12,18 +12,18 @@ module BulkImports ...@@ -12,18 +12,18 @@ module BulkImports
group(fullPath: $full_path) { group(fullPath: $full_path) {
name name
path path
fullPath full_path: fullPath
description description
visibility visibility
emailsDisabled emails_disabled: emailsDisabled
lfsEnabled lfs_enabled: lfsEnabled
mentionsDisabled mentions_disabled: mentionsDisabled
projectCreationLevel project_creation_level: projectCreationLevel
requestAccessEnabled request_access_enabled: requestAccessEnabled
requireTwoFactorAuthentication require_two_factor_authentication: requireTwoFactorAuthentication
shareWithGroupLock share_with_group_lock: shareWithGroupLock
subgroupCreationLevel subgroup_creation_level: subgroupCreationLevel
twoFactorGracePeriod two_factor_grace_period: twoFactorGracePeriod
} }
} }
GRAPHQL GRAPHQL
...@@ -32,6 +32,18 @@ module BulkImports ...@@ -32,6 +32,18 @@ module BulkImports
def variables(entity) def variables(entity)
{ full_path: entity.source_full_path } { full_path: entity.source_full_path }
end end
def base_path
%w[data group]
end
def data_path
base_path
end
def page_info_path
base_path << 'page_info'
end
end end
end end
end end
......
...@@ -32,6 +32,18 @@ module BulkImports ...@@ -32,6 +32,18 @@ module BulkImports
cursor: entity.next_page_for(:labels) cursor: entity.next_page_for(:labels)
} }
end end
def base_path
%w[data group labels]
end
def data_path
base_path << 'nodes'
end
def page_info_path
base_path << 'page_info'
end
end end
end end
end end
......
...@@ -7,16 +7,7 @@ module BulkImports ...@@ -7,16 +7,7 @@ module BulkImports
def initialize(*); end def initialize(*); end
def load(context, data) def load(context, data)
Array.wrap(data['nodes']).each do |entry| Labels::CreateService.new(data).execute(group: context.entity.group)
Labels::CreateService.new(entry)
.execute(group: context.entity.group)
end
context.entity.update_tracker_for(
relation: :labels,
has_next_page: data.dig('page_info', 'has_next_page'),
next_page: data.dig('page_info', 'end_cursor')
)
end end
end end
end end
......
...@@ -10,8 +10,6 @@ module BulkImports ...@@ -10,8 +10,6 @@ module BulkImports
extractor Common::Extractors::GraphqlExtractor, query: Graphql::GetGroupQuery extractor Common::Extractors::GraphqlExtractor, query: Graphql::GetGroupQuery
transformer Common::Transformers::HashKeyDigger, key_path: %w[data group]
transformer Common::Transformers::UnderscorifyKeysTransformer
transformer Common::Transformers::ProhibitedAttributesTransformer transformer Common::Transformers::ProhibitedAttributesTransformer
transformer Groups::Transformers::GroupAttributesTransformer transformer Groups::Transformers::GroupAttributesTransformer
......
...@@ -9,13 +9,18 @@ module BulkImports ...@@ -9,13 +9,18 @@ module BulkImports
extractor BulkImports::Common::Extractors::GraphqlExtractor, extractor BulkImports::Common::Extractors::GraphqlExtractor,
query: BulkImports::Groups::Graphql::GetLabelsQuery query: BulkImports::Groups::Graphql::GetLabelsQuery
transformer BulkImports::Common::Transformers::HashKeyDigger, key_path: %w[data group labels]
transformer Common::Transformers::ProhibitedAttributesTransformer transformer Common::Transformers::ProhibitedAttributesTransformer
loader BulkImports::Groups::Loaders::LabelsLoader loader BulkImports::Groups::Loaders::LabelsLoader
def after_run(context) def after_run(context, extracted_data)
if context.entity.has_next_page?(:labels) context.entity.update_tracker_for(
relation: :labels,
has_next_page: extracted_data.has_next_page?,
next_page: extracted_data.next_page
)
if extracted_data.has_next_page?
run(context) run(context)
end end
end end
......
# frozen_string_literal: true
module BulkImports
module Pipeline
class ExtractedData
attr_reader :data
def initialize(data: nil, page_info: {})
@data = Array.wrap(data)
@page_info = page_info
end
def has_next_page?
@page_info['has_next_page']
end
def next_page
@page_info['end_cursor']
end
def each(&block)
data.each(&block)
end
end
end
end
...@@ -12,7 +12,9 @@ module BulkImports ...@@ -12,7 +12,9 @@ module BulkImports
info(context, message: 'Pipeline started', pipeline_class: pipeline) info(context, message: 'Pipeline started', pipeline_class: pipeline)
Array.wrap(extracted_data_from(context)).each do |entry| extracted_data = extracted_data_from(context)
extracted_data&.each do |entry|
transformers.each do |transformer| transformers.each do |transformer|
entry = run_pipeline_step(:transformer, transformer.class.name, context) do entry = run_pipeline_step(:transformer, transformer.class.name, context) do
transformer.transform(context, entry) transformer.transform(context, entry)
...@@ -24,7 +26,7 @@ module BulkImports ...@@ -24,7 +26,7 @@ module BulkImports
end end
end end
after_run(context) if respond_to?(:after_run) after_run(context, extracted_data) if respond_to?(:after_run)
rescue MarkedAsFailedError rescue MarkedAsFailedError
log_skip(context) log_skip(context)
end end
...@@ -43,6 +45,8 @@ module BulkImports ...@@ -43,6 +45,8 @@ module BulkImports
log_import_failure(e, step, context) log_import_failure(e, step, context)
mark_as_failed(context) if abort_on_failure? mark_as_failed(context) if abort_on_failure?
nil
end end
def extracted_data_from(context) def extracted_data_from(context)
......
...@@ -5,8 +5,18 @@ require 'spec_helper' ...@@ -5,8 +5,18 @@ require 'spec_helper'
RSpec.describe BulkImports::Common::Extractors::GraphqlExtractor do RSpec.describe BulkImports::Common::Extractors::GraphqlExtractor do
let(:graphql_client) { instance_double(BulkImports::Clients::Graphql) } let(:graphql_client) { instance_double(BulkImports::Clients::Graphql) }
let(:import_entity) { create(:bulk_import_entity) } let(:import_entity) { create(:bulk_import_entity) }
let(:response) { double(original_hash: { foo: :bar }) } let(:response) { double(original_hash: { 'data' => { 'foo' => 'bar' }, 'page_info' => {} }) }
let(:query) { { query: double(to_s: 'test', variables: {}) } } let(:options) do
{
query: double(
to_s: 'test',
variables: {},
data_path: %w[data foo],
page_info_path: %w[data page_info]
)
}
end
let(:context) do let(:context) do
instance_double( instance_double(
BulkImports::Pipeline::Context, BulkImports::Pipeline::Context,
...@@ -14,58 +24,20 @@ RSpec.describe BulkImports::Common::Extractors::GraphqlExtractor do ...@@ -14,58 +24,20 @@ RSpec.describe BulkImports::Common::Extractors::GraphqlExtractor do
) )
end end
subject { described_class.new(query) } subject { described_class.new(options) }
before do
allow(subject).to receive(:graphql_client).and_return(graphql_client)
allow(graphql_client).to receive(:parse)
end
describe '#extract' do describe '#extract' do
before do before do
allow(subject).to receive(:query_variables).and_return({}) allow(subject).to receive(:graphql_client).and_return(graphql_client)
allow(graphql_client).to receive(:execute).and_return(response) allow(graphql_client).to receive(:parse)
end
it 'returns original hash' do
expect(subject.extract(context)).to eq({ foo: :bar })
end
end
describe 'query variables' do
before do
allow(graphql_client).to receive(:execute).and_return(response) allow(graphql_client).to receive(:execute).and_return(response)
end end
context 'when variables are present' do it 'returns ExtractedData' do
let(:variables) { { foo: :bar } } extracted_data = subject.extract(context)
let(:query) { { query: double(to_s: 'test', variables: variables) } }
it 'builds graphql query variables for import entity' do
expect(graphql_client).to receive(:execute).with(anything, variables)
subject.extract(context).first
end
end
context 'when no variables are present' do
let(:query) { { query: double(to_s: 'test', variables: nil) } }
it 'returns empty hash' do
expect(graphql_client).to receive(:execute).with(anything, nil)
subject.extract(context).first
end
end
context 'when variables are empty hash' do
let(:query) { { query: double(to_s: 'test', variables: {}) } }
it 'makes graphql request with empty hash' do
expect(graphql_client).to receive(:execute).with(anything, {})
subject.extract(context).first expect(extracted_data).to be_instance_of(BulkImports::Pipeline::ExtractedData)
end expect(extracted_data.data).to contain_exactly('bar')
end end
end end
end end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::Common::Transformers::HashKeyDigger do
describe '#transform' do
it 'when the key_path is an array' do
data = { foo: { bar: :value } }
key_path = %i[foo bar]
transformed = described_class.new(key_path: key_path).transform(nil, data)
expect(transformed).to eq(:value)
end
it 'when the key_path is not an array' do
data = { foo: { bar: :value } }
key_path = :foo
transformed = described_class.new(key_path: key_path).transform(nil, data)
expect(transformed).to eq({ bar: :value })
end
it "when the data is not a hash" do
expect { described_class.new(key_path: nil).transform(nil, nil) }
.to raise_error(ArgumentError, "Given data must be a Hash")
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::Common::Transformers::UnderscorifyKeysTransformer do
describe '#transform' do
it 'deep underscorifies hash keys' do
data = {
'fullPath' => 'Foo',
'snakeKeys' => {
'snakeCaseKey' => 'Bar',
'moreKeys' => {
'anotherSnakeCaseKey' => 'Test'
}
}
}
transformed_data = described_class.new.transform(nil, data)
expect(transformed_data).to have_key('full_path')
expect(transformed_data).to have_key('snake_keys')
expect(transformed_data['snake_keys']).to have_key('snake_case_key')
expect(transformed_data['snake_keys']).to have_key('more_keys')
expect(transformed_data.dig('snake_keys', 'more_keys')).to have_key('another_snake_case_key')
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::Groups::Extractors::SubgroupsExtractor do
describe '#extract' do
it 'returns ExtractedData response' do
user = create(:user)
bulk_import = create(:bulk_import)
entity = create(:bulk_import_entity, bulk_import: bulk_import)
configuration = create(:bulk_import_configuration, bulk_import: bulk_import)
response = [{ 'test' => 'group' }]
context = BulkImports::Pipeline::Context.new(
current_user: user,
entity: entity,
configuration: configuration
)
allow_next_instance_of(BulkImports::Clients::Http) do |client|
allow(client).to receive(:each_page).and_return(response)
end
extracted_data = subject.extract(context)
expect(extracted_data).to be_instance_of(BulkImports::Pipeline::ExtractedData)
expect(extracted_data.data).to eq(response)
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::Groups::Graphql::GetGroupQuery do
describe '#variables' do
let(:entity) { double(source_full_path: 'test') }
it 'returns query variables based on entity information' do
expected = { full_path: entity.source_full_path }
expect(described_class.variables(entity)).to eq(expected)
end
end
describe '#data_path' do
it 'returns data path' do
expected = %w[data group]
expect(described_class.data_path).to eq(expected)
end
end
describe '#page_info_path' do
it 'returns pagination information path' do
expected = %w[data group page_info]
expect(described_class.page_info_path).to eq(expected)
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::Groups::Graphql::GetLabelsQuery do
describe '#variables' do
let(:entity) { double(source_full_path: 'test', next_page_for: 'next_page') }
it 'returns query variables based on entity information' do
expected = { full_path: entity.source_full_path, cursor: entity.next_page_for }
expect(described_class.variables(entity)).to eq(expected)
end
end
describe '#data_path' do
it 'returns data path' do
expected = %w[data group labels nodes]
expect(described_class.data_path).to eq(expected)
end
end
describe '#page_info_path' do
it 'returns pagination information path' do
expected = %w[data group labels page_info]
expect(described_class.page_info_path).to eq(expected)
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::Groups::Loaders::LabelsLoader do
describe '#load' do
let(:user) { create(:user) }
let(:group) { create(:group) }
let(:entity) { create(:bulk_import_entity, group: group) }
let(:context) do
BulkImports::Pipeline::Context.new(
entity: entity,
current_user: user
)
end
let(:data) do
{
'title' => 'label',
'description' => 'description',
'color' => '#FFFFFF'
}
end
it 'creates the label' do
expect { subject.load(context, data) }.to change(Label, :count).by(1)
label = group.labels.first
expect(label.title).to eq(data['title'])
expect(label.description).to eq(data['description'])
expect(label.color).to eq(data['color'])
end
end
end
...@@ -24,19 +24,15 @@ RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do ...@@ -24,19 +24,15 @@ RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do
let(:group_data) do let(:group_data) do
{ {
'data' => { 'name' => 'source_name',
'group' => { 'full_path' => 'source/full/path',
'name' => 'source_name', 'visibility' => 'private',
'fullPath' => 'source/full/path', 'project_creation_level' => 'developer',
'visibility' => 'private', 'subgroup_creation_level' => 'maintainer',
'projectCreationLevel' => 'developer', 'description' => 'Group Description',
'subgroupCreationLevel' => 'maintainer', 'emails_disabled' => true,
'description' => 'Group Description', 'lfs_enabled' => false,
'emailsDisabled' => true, 'mentions_disabled' => true
'lfsEnabled' => false,
'mentionsDisabled' => true
}
}
} }
end end
...@@ -60,13 +56,13 @@ RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do ...@@ -60,13 +56,13 @@ RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do
expect(imported_group).not_to be_nil expect(imported_group).not_to be_nil
expect(imported_group.parent).to eq(parent) expect(imported_group.parent).to eq(parent)
expect(imported_group.path).to eq(group_path) expect(imported_group.path).to eq(group_path)
expect(imported_group.description).to eq(group_data.dig('data', 'group', 'description')) expect(imported_group.description).to eq(group_data['description'])
expect(imported_group.visibility).to eq(group_data.dig('data', 'group', 'visibility')) expect(imported_group.visibility).to eq(group_data['visibility'])
expect(imported_group.project_creation_level).to eq(Gitlab::Access.project_creation_string_options[group_data.dig('data', 'group', 'projectCreationLevel')]) expect(imported_group.project_creation_level).to eq(Gitlab::Access.project_creation_string_options[group_data['project_creation_level']])
expect(imported_group.subgroup_creation_level).to eq(Gitlab::Access.subgroup_creation_string_options[group_data.dig('data', 'group', 'subgroupCreationLevel')]) expect(imported_group.subgroup_creation_level).to eq(Gitlab::Access.subgroup_creation_string_options[group_data['subgroup_creation_level']])
expect(imported_group.lfs_enabled?).to eq(group_data.dig('data', 'group', 'lfsEnabled')) expect(imported_group.lfs_enabled?).to eq(group_data['lfs_enabled'])
expect(imported_group.emails_disabled?).to eq(group_data.dig('data', 'group', 'emailsDisabled')) expect(imported_group.emails_disabled?).to eq(group_data['emails_disabled'])
expect(imported_group.mentions_disabled?).to eq(group_data.dig('data', 'group', 'mentionsDisabled')) expect(imported_group.mentions_disabled?).to eq(group_data['mentions_disabled'])
end end
end end
...@@ -87,8 +83,6 @@ RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do ...@@ -87,8 +83,6 @@ RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do
it 'has transformers' do it 'has transformers' do
expect(described_class.transformers) expect(described_class.transformers)
.to contain_exactly( .to contain_exactly(
{ klass: BulkImports::Common::Transformers::HashKeyDigger, options: { key_path: %w[data group] } },
{ klass: BulkImports::Common::Transformers::UnderscorifyKeysTransformer, options: nil },
{ klass: BulkImports::Common::Transformers::ProhibitedAttributesTransformer, options: nil }, { klass: BulkImports::Common::Transformers::ProhibitedAttributesTransformer, options: nil },
{ klass: BulkImports::Groups::Transformers::GroupAttributesTransformer, options: nil } { klass: BulkImports::Groups::Transformers::GroupAttributesTransformer, options: nil }
) )
......
...@@ -5,6 +5,7 @@ require 'spec_helper' ...@@ -5,6 +5,7 @@ require 'spec_helper'
RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do
let(:user) { create(:user) } let(:user) { create(:user) }
let(:group) { create(:group) } let(:group) { create(:group) }
let(:cursor) { 'cursor' }
let(:entity) do let(:entity) do
create( create(
:bulk_import_entity, :bulk_import_entity,
...@@ -22,31 +23,26 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do ...@@ -22,31 +23,26 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do
) )
end end
def extractor_data(title:, has_next_page:, cursor: "") def extractor_data(title:, has_next_page:, cursor: nil)
{ data = [
"data" => { {
"group" => { 'title' => title,
"labels" => { 'description' => 'desc',
"page_info" => { 'color' => '#428BCA'
"end_cursor" => cursor,
"has_next_page" => has_next_page
},
"nodes" => [
{
"title" => title,
"description" => "desc",
"color" => "#428BCA"
}
]
}
}
} }
]
page_info = {
'end_cursor' => cursor,
'has_next_page' => has_next_page
} }
BulkImports::Pipeline::ExtractedData.new(data: data, page_info: page_info)
end end
describe '#run' do describe '#run' do
it 'imports a group labels' do it 'imports a group labels' do
first_page = extractor_data(title: 'label1', has_next_page: true, cursor: 'nextPageCursor') first_page = extractor_data(title: 'label1', has_next_page: true, cursor: cursor)
last_page = extractor_data(title: 'label2', has_next_page: false) last_page = extractor_data(title: 'label2', has_next_page: false)
allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor| allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor|
...@@ -65,6 +61,38 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do ...@@ -65,6 +61,38 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do
end end
end end
describe '#after_run' do
context 'when extracted data has next page' do
it 'updates tracker information and runs pipeline again' do
data = extractor_data(title: 'label', has_next_page: true, cursor: cursor)
expect(subject).to receive(:run).with(context)
subject.after_run(context, data)
tracker = entity.trackers.find_by(relation: :labels)
expect(tracker.has_next_page).to eq(true)
expect(tracker.next_page).to eq(cursor)
end
end
context 'when extracted data has no next page' do
it 'updates tracker information and does not run pipeline' do
data = extractor_data(title: 'label', has_next_page: false)
expect(subject).not_to receive(:run).with(context)
subject.after_run(context, data)
tracker = entity.trackers.find_by(relation: :labels)
expect(tracker.has_next_page).to eq(false)
expect(tracker.next_page).to be_nil
end
end
end
describe 'pipeline parts' do describe 'pipeline parts' do
it { expect(described_class).to include_module(BulkImports::Pipeline) } it { expect(described_class).to include_module(BulkImports::Pipeline) }
it { expect(described_class).to include_module(BulkImports::Pipeline::Runner) } it { expect(described_class).to include_module(BulkImports::Pipeline::Runner) }
...@@ -82,7 +110,6 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do ...@@ -82,7 +110,6 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do
it 'has transformers' do it 'has transformers' do
expect(described_class.transformers) expect(described_class.transformers)
.to contain_exactly( .to contain_exactly(
{ klass: BulkImports::Common::Transformers::HashKeyDigger, options: { key_path: %w[data group labels] } },
{ klass: BulkImports::Common::Transformers::ProhibitedAttributesTransformer, options: nil } { klass: BulkImports::Common::Transformers::ProhibitedAttributesTransformer, options: nil }
) )
end end
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::Pipeline::ExtractedData do
let(:data) { 'data' }
let(:has_next_page) { true }
let(:cursor) { 'cursor' }
let(:page_info) do
{
'has_next_page' => has_next_page,
'end_cursor' => cursor
}
end
subject { described_class.new(data: data, page_info: page_info) }
describe '#has_next_page?' do
context 'when next page is present' do
it 'returns true' do
expect(subject.has_next_page?).to eq(true)
end
end
context 'when next page is not present' do
let(:has_next_page) { false }
it 'returns false' do
expect(subject.has_next_page?).to eq(false)
end
end
end
describe '#next_page' do
it 'returns next page cursor information' do
expect(subject.next_page).to eq(cursor)
end
end
describe '#each' do
context 'when block is present' do
it 'yields each data item' do
expect { |b| subject.each(&b) }.to yield_control
end
end
context 'when block is not present' do
it 'returns enumerator' do
expect(subject.each).to be_instance_of(Enumerator)
end
end
end
end
...@@ -53,18 +53,26 @@ RSpec.describe BulkImports::Pipeline::Runner do ...@@ -53,18 +53,26 @@ RSpec.describe BulkImports::Pipeline::Runner do
end end
it 'runs pipeline extractor, transformer, loader' do it 'runs pipeline extractor, transformer, loader' do
entries = [{ foo: :bar }] extracted_data = BulkImports::Pipeline::ExtractedData.new(data: { foo: :bar })
expect_next_instance_of(BulkImports::Extractor) do |extractor| expect_next_instance_of(BulkImports::Extractor) do |extractor|
expect(extractor).to receive(:extract).with(context).and_return(entries) expect(extractor)
.to receive(:extract)
.with(context)
.and_return(extracted_data)
end end
expect_next_instance_of(BulkImports::Transformer) do |transformer| expect_next_instance_of(BulkImports::Transformer) do |transformer|
expect(transformer).to receive(:transform).with(context, entries.first).and_return(entries.first) expect(transformer)
.to receive(:transform)
.with(context, extracted_data.data.first)
.and_return(extracted_data.data.first)
end end
expect_next_instance_of(BulkImports::Loader) do |loader| expect_next_instance_of(BulkImports::Loader) do |loader|
expect(loader).to receive(:load).with(context, entries.first) expect(loader)
.to receive(:load)
.with(context, extracted_data.data.first)
end end
expect_next_instance_of(Gitlab::Import::Logger) do |logger| expect_next_instance_of(Gitlab::Import::Logger) do |logger|
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment