Commit 009950fb authored by Bob Van Landuyt's avatar Bob Van Landuyt

Merge branch 'georgekoltsov/graphql-extractor-yield-one-record' into 'master'

Process one record at a time in Bulk Import pipelines

See merge request gitlab-org/gitlab!52330
parents 78da8949 a381517d
---
title: Process one record at a time in Bulk Import pipelines
merge_request: 52330
author:
type: changed
......@@ -16,22 +16,22 @@ module EE
first: 100,
after: $cursor
) {
pageInfo {
endCursor
hasNextPage
page_info: pageInfo {
end_cursor: endCursor
has_next_page: hasNextPage
}
nodes {
title
description
state
createdAt
closedAt
startDate
startDateFixed
startDateIsFixed
dueDateFixed
dueDateIsFixed
relativePosition
created_at: createdAt
closed_at: closedAt
start_date: startDate
start_date_fixed: startDateFixed
start_date_is_fixed: startDateIsFixed
due_date_fixed: dueDateFixed
due_date_is_fixed: dueDateIsFixed
relative_position: relativePosition
confidential
}
}
......@@ -46,6 +46,18 @@ module EE
cursor: entity.next_page_for(:epics)
}
end
def base_path
%w[data group epics]
end
def data_path
base_path << 'nodes'
end
def page_info_path
base_path << 'page_info'
end
end
end
end
......
......@@ -10,19 +10,11 @@ module EE
end
def load(context, data)
Array.wrap(data['nodes']).each do |args|
::Epics::CreateService.new(
context.entity.group,
context.current_user,
args
).execute
end
context.entity.update_tracker_for(
relation: :epics,
has_next_page: data.dig('page_info', 'has_next_page'),
next_page: data.dig('page_info', 'end_cursor')
)
::Epics::CreateService.new(
context.entity.group,
context.current_user,
data
).execute
end
end
end
......
......@@ -10,14 +10,18 @@ module EE
extractor ::BulkImports::Common::Extractors::GraphqlExtractor,
query: EE::BulkImports::Groups::Graphql::GetEpicsQuery
transformer ::BulkImports::Common::Transformers::HashKeyDigger, key_path: %w[data group epics]
transformer ::BulkImports::Common::Transformers::UnderscorifyKeysTransformer
transformer ::BulkImports::Common::Transformers::ProhibitedAttributesTransformer
loader EE::BulkImports::Groups::Loaders::EpicsLoader
def after_run(context)
if context.entity.has_next_page?(:epics)
def after_run(context, extracted_data)
context.entity.update_tracker_for(
relation: :epics,
has_next_page: extracted_data.has_next_page?,
next_page: extracted_data.next_page
)
if extracted_data.has_next_page?
run(context)
end
end
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe EE::BulkImports::Groups::Graphql::GetEpicsQuery do
describe '#variables' do
let(:entity) { double(source_full_path: 'test', next_page_for: 'next_page') }
it 'returns query variables based on entity information' do
expected = { full_path: entity.source_full_path, cursor: entity.next_page_for }
expect(described_class.variables(entity)).to eq(expected)
end
end
describe '#data_path' do
it 'returns data path' do
expected = %w[data group epics nodes]
expect(described_class.data_path).to eq(expected)
end
end
describe '#page_info_path' do
it 'returns pagination information path' do
expected = %w[data group epics page_info]
expect(described_class.page_info_path).to eq(expected)
end
end
end
......@@ -16,35 +16,18 @@ RSpec.describe EE::BulkImports::Groups::Loaders::EpicsLoader do
let(:data) do
{
'page_info' => {
'end_cursor' => 'endCursorValue',
'has_next_page' => true
},
'nodes' => [
{
'title' => 'epic1',
'state' => 'opened',
'confidential' => false
},
{
'title' => 'epic2',
'state' => 'closed',
'confidential' => true
}
]
'title' => 'epic1',
'state' => 'opened',
'confidential' => false
}
end
subject { described_class.new }
it 'creates the epics and update the entity tracker' do
expect { subject.load(context, data) }.to change(::Epic, :count).by(2)
it 'creates the epic' do
expect { subject.load(context, data) }.to change(::Epic, :count).by(1)
tracker = entity.trackers.last
expect(group.epics.count).to eq(2)
expect(tracker.has_next_page).to eq(true)
expect(tracker.next_page).to eq('endCursorValue')
expect(group.epics.count).to eq(1)
end
end
end
......@@ -3,30 +3,29 @@
require 'spec_helper'
RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline do
describe '#run' do
let(:user) { create(:user) }
let(:group) { create(:group) }
let(:entity) do
create(
:bulk_import_entity,
source_full_path: 'source/full/path',
destination_name: 'My Destination Group',
destination_namespace: group.full_path,
group: group
)
end
let(:context) do
BulkImports::Pipeline::Context.new(
current_user: user,
entity: entity
)
end
let(:user) { create(:user) }
let(:group) { create(:group) }
let(:cursor) { 'cursor' }
let(:entity) do
create(
:bulk_import_entity,
source_full_path: 'source/full/path',
destination_name: 'My Destination Group',
destination_namespace: group.full_path,
group: group
)
end
subject { described_class.new }
let(:context) do
BulkImports::Pipeline::Context.new(
current_user: user,
entity: entity
)
end
describe '#run' do
it 'imports group epics into destination group' do
first_page = extractor_data(has_next_page: true, cursor: 'nextPageCursor')
first_page = extractor_data(has_next_page: true, cursor: cursor)
last_page = extractor_data(has_next_page: false)
allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor|
......@@ -39,6 +38,38 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline do
end
end
describe '#after_run' do
context 'when extracted data has next page' do
it 'updates tracker information and runs pipeline again' do
data = extractor_data(has_next_page: true, cursor: cursor)
expect(subject).to receive(:run).with(context)
subject.after_run(context, data)
tracker = entity.trackers.find_by(relation: :epics)
expect(tracker.has_next_page).to eq(true)
expect(tracker.next_page).to eq(cursor)
end
end
context 'when extracted data has no next page' do
it 'updates tracker information and does not run pipeline' do
data = extractor_data(has_next_page: false)
expect(subject).not_to receive(:run).with(context)
subject.after_run(context, data)
tracker = entity.trackers.find_by(relation: :epics)
expect(tracker.has_next_page).to eq(false)
expect(tracker.next_page).to be_nil
end
end
end
describe 'pipeline parts' do
it { expect(described_class).to include_module(BulkImports::Pipeline) }
it { expect(described_class).to include_module(BulkImports::Pipeline::Runner) }
......@@ -56,8 +87,6 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline do
it 'has transformers' do
expect(described_class.transformers)
.to contain_exactly(
{ klass: BulkImports::Common::Transformers::HashKeyDigger, options: { key_path: %w[data group epics] } },
{ klass: BulkImports::Common::Transformers::UnderscorifyKeysTransformer, options: nil },
{ klass: BulkImports::Common::Transformers::ProhibitedAttributesTransformer, options: nil }
)
end
......@@ -68,26 +97,19 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline do
end
def extractor_data(has_next_page:, cursor: nil)
[
data = [
{
'data' => {
'group' => {
'epics' => {
'page_info' => {
'end_cursor' => cursor,
'has_next_page' => has_next_page
},
'nodes' => [
{
'title' => 'epic1',
'state' => 'closed',
'confidential' => true
}
]
}
}
}
'title' => 'epic1',
'state' => 'closed',
'confidential' => true
}
]
page_info = {
'end_cursor' => cursor,
'has_next_page' => has_next_page
}
BulkImports::Pipeline::ExtractedData.new(data: data, page_info: page_info)
end
end
......@@ -4,17 +4,22 @@ module BulkImports
module Common
module Extractors
class GraphqlExtractor
def initialize(query)
@query = query[:query]
def initialize(options = {})
@query = options[:query]
end
def extract(context)
client = graphql_client(context)
client.execute(
response = client.execute(
client.parse(query.to_s),
query.variables(context.entity)
).original_hash.deep_dup
BulkImports::Pipeline::ExtractedData.new(
data: response.dig(*query.data_path),
page_info: response.dig(*query.page_info_path)
)
end
private
......@@ -27,10 +32,6 @@ module BulkImports
token: context.configuration.access_token
)
end
def parsed_query
@parsed_query ||= graphql_client.parse(query.to_s)
end
end
end
end
......
# frozen_string_literal: true
module BulkImports
module Common
module Transformers
class HashKeyDigger
def initialize(options = {})
@key_path = options[:key_path]
end
def transform(_, data)
raise ArgumentError, "Given data must be a Hash" unless data.is_a?(Hash)
data.dig(*Array.wrap(key_path))
end
private
attr_reader :key_path
end
end
end
end
# frozen_string_literal: true
module BulkImports
module Common
module Transformers
class UnderscorifyKeysTransformer
def initialize(options = {})
@options = options
end
def transform(_, data)
data.deep_transform_keys do |key|
key.to_s.underscore
end
end
end
end
end
end
......@@ -9,9 +9,11 @@ module BulkImports
def extract(context)
encoded_parent_path = ERB::Util.url_encode(context.entity.source_full_path)
http_client(context.entity.bulk_import.configuration)
response = http_client(context.entity.bulk_import.configuration)
.each_page(:get, "groups/#{encoded_parent_path}/subgroups")
.flat_map(&:itself)
BulkImports::Pipeline::ExtractedData.new(data: response)
end
private
......
......@@ -12,18 +12,18 @@ module BulkImports
group(fullPath: $full_path) {
name
path
fullPath
full_path: fullPath
description
visibility
emailsDisabled
lfsEnabled
mentionsDisabled
projectCreationLevel
requestAccessEnabled
requireTwoFactorAuthentication
shareWithGroupLock
subgroupCreationLevel
twoFactorGracePeriod
emails_disabled: emailsDisabled
lfs_enabled: lfsEnabled
mentions_disabled: mentionsDisabled
project_creation_level: projectCreationLevel
request_access_enabled: requestAccessEnabled
require_two_factor_authentication: requireTwoFactorAuthentication
share_with_group_lock: shareWithGroupLock
subgroup_creation_level: subgroupCreationLevel
two_factor_grace_period: twoFactorGracePeriod
}
}
GRAPHQL
......@@ -32,6 +32,18 @@ module BulkImports
def variables(entity)
{ full_path: entity.source_full_path }
end
def base_path
%w[data group]
end
def data_path
base_path
end
def page_info_path
base_path << 'page_info'
end
end
end
end
......
......@@ -32,6 +32,18 @@ module BulkImports
cursor: entity.next_page_for(:labels)
}
end
def base_path
%w[data group labels]
end
def data_path
base_path << 'nodes'
end
def page_info_path
base_path << 'page_info'
end
end
end
end
......
......@@ -7,16 +7,7 @@ module BulkImports
def initialize(*); end
def load(context, data)
Array.wrap(data['nodes']).each do |entry|
Labels::CreateService.new(entry)
.execute(group: context.entity.group)
end
context.entity.update_tracker_for(
relation: :labels,
has_next_page: data.dig('page_info', 'has_next_page'),
next_page: data.dig('page_info', 'end_cursor')
)
Labels::CreateService.new(data).execute(group: context.entity.group)
end
end
end
......
......@@ -10,8 +10,6 @@ module BulkImports
extractor Common::Extractors::GraphqlExtractor, query: Graphql::GetGroupQuery
transformer Common::Transformers::HashKeyDigger, key_path: %w[data group]
transformer Common::Transformers::UnderscorifyKeysTransformer
transformer Common::Transformers::ProhibitedAttributesTransformer
transformer Groups::Transformers::GroupAttributesTransformer
......
......@@ -9,13 +9,18 @@ module BulkImports
extractor BulkImports::Common::Extractors::GraphqlExtractor,
query: BulkImports::Groups::Graphql::GetLabelsQuery
transformer BulkImports::Common::Transformers::HashKeyDigger, key_path: %w[data group labels]
transformer Common::Transformers::ProhibitedAttributesTransformer
loader BulkImports::Groups::Loaders::LabelsLoader
def after_run(context)
if context.entity.has_next_page?(:labels)
def after_run(context, extracted_data)
context.entity.update_tracker_for(
relation: :labels,
has_next_page: extracted_data.has_next_page?,
next_page: extracted_data.next_page
)
if extracted_data.has_next_page?
run(context)
end
end
......
# frozen_string_literal: true
module BulkImports
module Pipeline
class ExtractedData
attr_reader :data
def initialize(data: nil, page_info: {})
@data = Array.wrap(data)
@page_info = page_info
end
def has_next_page?
@page_info['has_next_page']
end
def next_page
@page_info['end_cursor']
end
def each(&block)
data.each(&block)
end
end
end
end
......@@ -12,7 +12,9 @@ module BulkImports
info(context, message: 'Pipeline started', pipeline_class: pipeline)
Array.wrap(extracted_data_from(context)).each do |entry|
extracted_data = extracted_data_from(context)
extracted_data&.each do |entry|
transformers.each do |transformer|
entry = run_pipeline_step(:transformer, transformer.class.name, context) do
transformer.transform(context, entry)
......@@ -24,7 +26,7 @@ module BulkImports
end
end
after_run(context) if respond_to?(:after_run)
after_run(context, extracted_data) if respond_to?(:after_run)
rescue MarkedAsFailedError
log_skip(context)
end
......@@ -43,6 +45,8 @@ module BulkImports
log_import_failure(e, step, context)
mark_as_failed(context) if abort_on_failure?
nil
end
def extracted_data_from(context)
......
......@@ -5,8 +5,18 @@ require 'spec_helper'
RSpec.describe BulkImports::Common::Extractors::GraphqlExtractor do
let(:graphql_client) { instance_double(BulkImports::Clients::Graphql) }
let(:import_entity) { create(:bulk_import_entity) }
let(:response) { double(original_hash: { foo: :bar }) }
let(:query) { { query: double(to_s: 'test', variables: {}) } }
let(:response) { double(original_hash: { 'data' => { 'foo' => 'bar' }, 'page_info' => {} }) }
let(:options) do
{
query: double(
to_s: 'test',
variables: {},
data_path: %w[data foo],
page_info_path: %w[data page_info]
)
}
end
let(:context) do
instance_double(
BulkImports::Pipeline::Context,
......@@ -14,58 +24,20 @@ RSpec.describe BulkImports::Common::Extractors::GraphqlExtractor do
)
end
subject { described_class.new(query) }
before do
allow(subject).to receive(:graphql_client).and_return(graphql_client)
allow(graphql_client).to receive(:parse)
end
subject { described_class.new(options) }
describe '#extract' do
before do
allow(subject).to receive(:query_variables).and_return({})
allow(graphql_client).to receive(:execute).and_return(response)
end
it 'returns original hash' do
expect(subject.extract(context)).to eq({ foo: :bar })
end
end
describe 'query variables' do
before do
allow(subject).to receive(:graphql_client).and_return(graphql_client)
allow(graphql_client).to receive(:parse)
allow(graphql_client).to receive(:execute).and_return(response)
end
context 'when variables are present' do
let(:variables) { { foo: :bar } }
let(:query) { { query: double(to_s: 'test', variables: variables) } }
it 'builds graphql query variables for import entity' do
expect(graphql_client).to receive(:execute).with(anything, variables)
subject.extract(context).first
end
end
context 'when no variables are present' do
let(:query) { { query: double(to_s: 'test', variables: nil) } }
it 'returns empty hash' do
expect(graphql_client).to receive(:execute).with(anything, nil)
subject.extract(context).first
end
end
context 'when variables are empty hash' do
let(:query) { { query: double(to_s: 'test', variables: {}) } }
it 'makes graphql request with empty hash' do
expect(graphql_client).to receive(:execute).with(anything, {})
it 'returns ExtractedData' do
extracted_data = subject.extract(context)
subject.extract(context).first
end
expect(extracted_data).to be_instance_of(BulkImports::Pipeline::ExtractedData)
expect(extracted_data.data).to contain_exactly('bar')
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::Common::Transformers::HashKeyDigger do
describe '#transform' do
it 'when the key_path is an array' do
data = { foo: { bar: :value } }
key_path = %i[foo bar]
transformed = described_class.new(key_path: key_path).transform(nil, data)
expect(transformed).to eq(:value)
end
it 'when the key_path is not an array' do
data = { foo: { bar: :value } }
key_path = :foo
transformed = described_class.new(key_path: key_path).transform(nil, data)
expect(transformed).to eq({ bar: :value })
end
it "when the data is not a hash" do
expect { described_class.new(key_path: nil).transform(nil, nil) }
.to raise_error(ArgumentError, "Given data must be a Hash")
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::Common::Transformers::UnderscorifyKeysTransformer do
describe '#transform' do
it 'deep underscorifies hash keys' do
data = {
'fullPath' => 'Foo',
'snakeKeys' => {
'snakeCaseKey' => 'Bar',
'moreKeys' => {
'anotherSnakeCaseKey' => 'Test'
}
}
}
transformed_data = described_class.new.transform(nil, data)
expect(transformed_data).to have_key('full_path')
expect(transformed_data).to have_key('snake_keys')
expect(transformed_data['snake_keys']).to have_key('snake_case_key')
expect(transformed_data['snake_keys']).to have_key('more_keys')
expect(transformed_data.dig('snake_keys', 'more_keys')).to have_key('another_snake_case_key')
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::Groups::Extractors::SubgroupsExtractor do
describe '#extract' do
it 'returns ExtractedData response' do
user = create(:user)
bulk_import = create(:bulk_import)
entity = create(:bulk_import_entity, bulk_import: bulk_import)
configuration = create(:bulk_import_configuration, bulk_import: bulk_import)
response = [{ 'test' => 'group' }]
context = BulkImports::Pipeline::Context.new(
current_user: user,
entity: entity,
configuration: configuration
)
allow_next_instance_of(BulkImports::Clients::Http) do |client|
allow(client).to receive(:each_page).and_return(response)
end
extracted_data = subject.extract(context)
expect(extracted_data).to be_instance_of(BulkImports::Pipeline::ExtractedData)
expect(extracted_data.data).to eq(response)
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::Groups::Graphql::GetGroupQuery do
describe '#variables' do
let(:entity) { double(source_full_path: 'test') }
it 'returns query variables based on entity information' do
expected = { full_path: entity.source_full_path }
expect(described_class.variables(entity)).to eq(expected)
end
end
describe '#data_path' do
it 'returns data path' do
expected = %w[data group]
expect(described_class.data_path).to eq(expected)
end
end
describe '#page_info_path' do
it 'returns pagination information path' do
expected = %w[data group page_info]
expect(described_class.page_info_path).to eq(expected)
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::Groups::Graphql::GetLabelsQuery do
describe '#variables' do
let(:entity) { double(source_full_path: 'test', next_page_for: 'next_page') }
it 'returns query variables based on entity information' do
expected = { full_path: entity.source_full_path, cursor: entity.next_page_for }
expect(described_class.variables(entity)).to eq(expected)
end
end
describe '#data_path' do
it 'returns data path' do
expected = %w[data group labels nodes]
expect(described_class.data_path).to eq(expected)
end
end
describe '#page_info_path' do
it 'returns pagination information path' do
expected = %w[data group labels page_info]
expect(described_class.page_info_path).to eq(expected)
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::Groups::Loaders::LabelsLoader do
describe '#load' do
let(:user) { create(:user) }
let(:group) { create(:group) }
let(:entity) { create(:bulk_import_entity, group: group) }
let(:context) do
BulkImports::Pipeline::Context.new(
entity: entity,
current_user: user
)
end
let(:data) do
{
'title' => 'label',
'description' => 'description',
'color' => '#FFFFFF'
}
end
it 'creates the label' do
expect { subject.load(context, data) }.to change(Label, :count).by(1)
label = group.labels.first
expect(label.title).to eq(data['title'])
expect(label.description).to eq(data['description'])
expect(label.color).to eq(data['color'])
end
end
end
......@@ -24,19 +24,15 @@ RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do
let(:group_data) do
{
'data' => {
'group' => {
'name' => 'source_name',
'fullPath' => 'source/full/path',
'visibility' => 'private',
'projectCreationLevel' => 'developer',
'subgroupCreationLevel' => 'maintainer',
'description' => 'Group Description',
'emailsDisabled' => true,
'lfsEnabled' => false,
'mentionsDisabled' => true
}
}
'name' => 'source_name',
'full_path' => 'source/full/path',
'visibility' => 'private',
'project_creation_level' => 'developer',
'subgroup_creation_level' => 'maintainer',
'description' => 'Group Description',
'emails_disabled' => true,
'lfs_enabled' => false,
'mentions_disabled' => true
}
end
......@@ -60,13 +56,13 @@ RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do
expect(imported_group).not_to be_nil
expect(imported_group.parent).to eq(parent)
expect(imported_group.path).to eq(group_path)
expect(imported_group.description).to eq(group_data.dig('data', 'group', 'description'))
expect(imported_group.visibility).to eq(group_data.dig('data', 'group', 'visibility'))
expect(imported_group.project_creation_level).to eq(Gitlab::Access.project_creation_string_options[group_data.dig('data', 'group', 'projectCreationLevel')])
expect(imported_group.subgroup_creation_level).to eq(Gitlab::Access.subgroup_creation_string_options[group_data.dig('data', 'group', 'subgroupCreationLevel')])
expect(imported_group.lfs_enabled?).to eq(group_data.dig('data', 'group', 'lfsEnabled'))
expect(imported_group.emails_disabled?).to eq(group_data.dig('data', 'group', 'emailsDisabled'))
expect(imported_group.mentions_disabled?).to eq(group_data.dig('data', 'group', 'mentionsDisabled'))
expect(imported_group.description).to eq(group_data['description'])
expect(imported_group.visibility).to eq(group_data['visibility'])
expect(imported_group.project_creation_level).to eq(Gitlab::Access.project_creation_string_options[group_data['project_creation_level']])
expect(imported_group.subgroup_creation_level).to eq(Gitlab::Access.subgroup_creation_string_options[group_data['subgroup_creation_level']])
expect(imported_group.lfs_enabled?).to eq(group_data['lfs_enabled'])
expect(imported_group.emails_disabled?).to eq(group_data['emails_disabled'])
expect(imported_group.mentions_disabled?).to eq(group_data['mentions_disabled'])
end
end
......@@ -87,8 +83,6 @@ RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do
it 'has transformers' do
expect(described_class.transformers)
.to contain_exactly(
{ klass: BulkImports::Common::Transformers::HashKeyDigger, options: { key_path: %w[data group] } },
{ klass: BulkImports::Common::Transformers::UnderscorifyKeysTransformer, options: nil },
{ klass: BulkImports::Common::Transformers::ProhibitedAttributesTransformer, options: nil },
{ klass: BulkImports::Groups::Transformers::GroupAttributesTransformer, options: nil }
)
......
......@@ -5,6 +5,7 @@ require 'spec_helper'
RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do
let(:user) { create(:user) }
let(:group) { create(:group) }
let(:cursor) { 'cursor' }
let(:entity) do
create(
:bulk_import_entity,
......@@ -22,31 +23,26 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do
)
end
def extractor_data(title:, has_next_page:, cursor: "")
{
"data" => {
"group" => {
"labels" => {
"page_info" => {
"end_cursor" => cursor,
"has_next_page" => has_next_page
},
"nodes" => [
{
"title" => title,
"description" => "desc",
"color" => "#428BCA"
}
]
}
}
def extractor_data(title:, has_next_page:, cursor: nil)
data = [
{
'title' => title,
'description' => 'desc',
'color' => '#428BCA'
}
]
page_info = {
'end_cursor' => cursor,
'has_next_page' => has_next_page
}
BulkImports::Pipeline::ExtractedData.new(data: data, page_info: page_info)
end
describe '#run' do
it 'imports a group labels' do
first_page = extractor_data(title: 'label1', has_next_page: true, cursor: 'nextPageCursor')
first_page = extractor_data(title: 'label1', has_next_page: true, cursor: cursor)
last_page = extractor_data(title: 'label2', has_next_page: false)
allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor|
......@@ -65,6 +61,38 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do
end
end
describe '#after_run' do
context 'when extracted data has next page' do
it 'updates tracker information and runs pipeline again' do
data = extractor_data(title: 'label', has_next_page: true, cursor: cursor)
expect(subject).to receive(:run).with(context)
subject.after_run(context, data)
tracker = entity.trackers.find_by(relation: :labels)
expect(tracker.has_next_page).to eq(true)
expect(tracker.next_page).to eq(cursor)
end
end
context 'when extracted data has no next page' do
it 'updates tracker information and does not run pipeline' do
data = extractor_data(title: 'label', has_next_page: false)
expect(subject).not_to receive(:run).with(context)
subject.after_run(context, data)
tracker = entity.trackers.find_by(relation: :labels)
expect(tracker.has_next_page).to eq(false)
expect(tracker.next_page).to be_nil
end
end
end
describe 'pipeline parts' do
it { expect(described_class).to include_module(BulkImports::Pipeline) }
it { expect(described_class).to include_module(BulkImports::Pipeline::Runner) }
......@@ -82,7 +110,6 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do
it 'has transformers' do
expect(described_class.transformers)
.to contain_exactly(
{ klass: BulkImports::Common::Transformers::HashKeyDigger, options: { key_path: %w[data group labels] } },
{ klass: BulkImports::Common::Transformers::ProhibitedAttributesTransformer, options: nil }
)
end
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::Pipeline::ExtractedData do
let(:data) { 'data' }
let(:has_next_page) { true }
let(:cursor) { 'cursor' }
let(:page_info) do
{
'has_next_page' => has_next_page,
'end_cursor' => cursor
}
end
subject { described_class.new(data: data, page_info: page_info) }
describe '#has_next_page?' do
context 'when next page is present' do
it 'returns true' do
expect(subject.has_next_page?).to eq(true)
end
end
context 'when next page is not present' do
let(:has_next_page) { false }
it 'returns false' do
expect(subject.has_next_page?).to eq(false)
end
end
end
describe '#next_page' do
it 'returns next page cursor information' do
expect(subject.next_page).to eq(cursor)
end
end
describe '#each' do
context 'when block is present' do
it 'yields each data item' do
expect { |b| subject.each(&b) }.to yield_control
end
end
context 'when block is not present' do
it 'returns enumerator' do
expect(subject.each).to be_instance_of(Enumerator)
end
end
end
end
......@@ -53,18 +53,26 @@ RSpec.describe BulkImports::Pipeline::Runner do
end
it 'runs pipeline extractor, transformer, loader' do
entries = [{ foo: :bar }]
extracted_data = BulkImports::Pipeline::ExtractedData.new(data: { foo: :bar })
expect_next_instance_of(BulkImports::Extractor) do |extractor|
expect(extractor).to receive(:extract).with(context).and_return(entries)
expect(extractor)
.to receive(:extract)
.with(context)
.and_return(extracted_data)
end
expect_next_instance_of(BulkImports::Transformer) do |transformer|
expect(transformer).to receive(:transform).with(context, entries.first).and_return(entries.first)
expect(transformer)
.to receive(:transform)
.with(context, extracted_data.data.first)
.and_return(extracted_data.data.first)
end
expect_next_instance_of(BulkImports::Loader) do |loader|
expect(loader).to receive(:load).with(context, entries.first)
expect(loader)
.to receive(:load)
.with(context, extracted_data.data.first)
end
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment