Commit eae71702 authored by Ash McKenzie's avatar Ash McKenzie

Merge branch 'georgekoltsov/limit-extractors-loaders-to-one' into 'master'

Limit Group Migration extractors and loaders to 1 per pipeline

See merge request gitlab-org/gitlab!50951
parents 00fba093 8f50a5b2
---
title: Limit Group Migration extractors and loaders to 1 per pipeline
merge_request: 50951
author:
type: changed
...@@ -46,13 +46,11 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline do ...@@ -46,13 +46,11 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline do
it { expect(described_class).to include_module(BulkImports::Pipeline::Runner) } it { expect(described_class).to include_module(BulkImports::Pipeline::Runner) }
it 'has extractors' do it 'has extractors' do
expect(described_class.extractors) expect(described_class.get_extractor)
.to contain_exactly( .to eq(
{ klass: BulkImports::Common::Extractors::GraphqlExtractor,
klass: BulkImports::Common::Extractors::GraphqlExtractor, options: {
options: { query: EE::BulkImports::Groups::Graphql::GetEpicsQuery
query: EE::BulkImports::Groups::Graphql::GetEpicsQuery
}
} }
) )
end end
...@@ -67,9 +65,7 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline do ...@@ -67,9 +65,7 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline do
end end
it 'has loaders' do it 'has loaders' do
expect(described_class.loaders).to contain_exactly({ expect(described_class.get_loader).to eq(klass: EE::BulkImports::Groups::Loaders::EpicsLoader, options: nil)
klass: EE::BulkImports::Groups::Loaders::EpicsLoader, options: nil
})
end end
end end
......
...@@ -10,16 +10,16 @@ module BulkImports ...@@ -10,16 +10,16 @@ module BulkImports
private private
def extractors def extractor
@extractors ||= self.class.extractors.map(&method(:instantiate)) @extractor ||= instantiate(self.class.get_extractor)
end end
def transformers def transformers
@transformers ||= self.class.transformers.map(&method(:instantiate)) @transformers ||= self.class.transformers.map(&method(:instantiate))
end end
def loaders def loader
@loaders ||= self.class.loaders.map(&method(:instantiate)) @loaders ||= instantiate(self.class.get_loader)
end end
def after_run def after_run
...@@ -41,7 +41,7 @@ module BulkImports ...@@ -41,7 +41,7 @@ module BulkImports
class_methods do class_methods do
def extractor(klass, options = nil) def extractor(klass, options = nil)
add_attribute(:extractors, klass, options) class_attributes[:extractor] = { klass: klass, options: options }
end end
def transformer(klass, options = nil) def transformer(klass, options = nil)
...@@ -49,23 +49,23 @@ module BulkImports ...@@ -49,23 +49,23 @@ module BulkImports
end end
def loader(klass, options = nil) def loader(klass, options = nil)
add_attribute(:loaders, klass, options) class_attributes[:loader] = { klass: klass, options: options }
end end
def after_run(&block) def after_run(&block)
class_attributes[:after_run] = block class_attributes[:after_run] = block
end end
def extractors def get_extractor
class_attributes[:extractors] class_attributes[:extractor]
end end
def transformers def transformers
class_attributes[:transformers] class_attributes[:transformers]
end end
def loaders def get_loader
class_attributes[:loaders] class_attributes[:loader]
end end
def after_run_callback def after_run_callback
......
...@@ -12,25 +12,15 @@ module BulkImports ...@@ -12,25 +12,15 @@ module BulkImports
info(context, message: 'Pipeline started', pipeline_class: pipeline) info(context, message: 'Pipeline started', pipeline_class: pipeline)
extractors.each do |extractor| Array.wrap(extracted_data_from(context)).each do |entry|
data = run_pipeline_step(:extractor, extractor.class.name, context) do transformers.each do |transformer|
extractor.extract(context) entry = run_pipeline_step(:transformer, transformer.class.name, context) do
transformer.transform(context, entry)
end
end end
if data && data.respond_to?(:each) run_pipeline_step(:loader, loader.class.name, context) do
data.each do |entry| loader.load(context, entry)
transformers.each do |transformer|
entry = run_pipeline_step(:transformer, transformer.class.name, context) do
transformer.transform(context, entry)
end
end
loaders.each do |loader|
run_pipeline_step(:loader, loader.class.name, context) do
loader.load(context, entry)
end
end
end
end end
end end
...@@ -55,6 +45,12 @@ module BulkImports ...@@ -55,6 +45,12 @@ module BulkImports
mark_as_failed(context) if abort_on_failure? mark_as_failed(context) if abort_on_failure?
end end
def extracted_data_from(context)
run_pipeline_step(:extractor, extractor.class.name, context) do
extractor.extract(context)
end
end
def mark_as_failed(context) def mark_as_failed(context)
warn(context, message: 'Pipeline failed', pipeline_class: pipeline) warn(context, message: 'Pipeline failed', pipeline_class: pipeline)
......
...@@ -75,13 +75,11 @@ RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do ...@@ -75,13 +75,11 @@ RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do
it { expect(described_class).to include_module(BulkImports::Pipeline::Runner) } it { expect(described_class).to include_module(BulkImports::Pipeline::Runner) }
it 'has extractors' do it 'has extractors' do
expect(described_class.extractors) expect(described_class.get_extractor)
.to contain_exactly( .to eq(
{ klass: BulkImports::Common::Extractors::GraphqlExtractor,
klass: BulkImports::Common::Extractors::GraphqlExtractor, options: {
options: { query: BulkImports::Groups::Graphql::GetGroupQuery
query: BulkImports::Groups::Graphql::GetGroupQuery
}
} }
) )
end end
...@@ -97,9 +95,7 @@ RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do ...@@ -97,9 +95,7 @@ RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do
end end
it 'has loaders' do it 'has loaders' do
expect(described_class.loaders).to contain_exactly({ expect(described_class.get_loader).to eq(klass: BulkImports::Groups::Loaders::GroupLoader, options: nil)
klass: BulkImports::Groups::Loaders::GroupLoader, options: nil
})
end end
end end
end end
...@@ -58,10 +58,7 @@ RSpec.describe BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline do ...@@ -58,10 +58,7 @@ RSpec.describe BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline do
it { expect(described_class).to include_module(BulkImports::Pipeline::Runner) } it { expect(described_class).to include_module(BulkImports::Pipeline::Runner) }
it 'has extractors' do it 'has extractors' do
expect(described_class.extractors).to contain_exactly( expect(described_class.get_extractor).to eq(klass: BulkImports::Groups::Extractors::SubgroupsExtractor, options: nil)
klass: BulkImports::Groups::Extractors::SubgroupsExtractor,
options: nil
)
end end
it 'has transformers' do it 'has transformers' do
...@@ -72,10 +69,7 @@ RSpec.describe BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline do ...@@ -72,10 +69,7 @@ RSpec.describe BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline do
end end
it 'has loaders' do it 'has loaders' do
expect(described_class.loaders).to contain_exactly( expect(described_class.get_loader).to eq(klass: BulkImports::Common::Loaders::EntityLoader, options: nil)
klass: BulkImports::Common::Loaders::EntityLoader,
options: nil
)
end end
end end
end end
...@@ -24,9 +24,9 @@ RSpec.describe BulkImports::Pipeline do ...@@ -24,9 +24,9 @@ RSpec.describe BulkImports::Pipeline do
describe 'getters' do describe 'getters' do
it 'retrieves class attributes' do it 'retrieves class attributes' do
expect(BulkImports::MyPipeline.extractors).to contain_exactly({ klass: BulkImports::Extractor, options: { foo: :bar } }) expect(BulkImports::MyPipeline.get_extractor).to eq({ klass: BulkImports::Extractor, options: { foo: :bar } })
expect(BulkImports::MyPipeline.transformers).to contain_exactly({ klass: BulkImports::Transformer, options: { foo: :bar } }) expect(BulkImports::MyPipeline.transformers).to contain_exactly({ klass: BulkImports::Transformer, options: { foo: :bar } })
expect(BulkImports::MyPipeline.loaders).to contain_exactly({ klass: BulkImports::Loader, options: { foo: :bar } }) expect(BulkImports::MyPipeline.get_loader).to eq({ klass: BulkImports::Loader, options: { foo: :bar } })
expect(BulkImports::MyPipeline.abort_on_failure?).to eq(true) expect(BulkImports::MyPipeline.abort_on_failure?).to eq(true)
end end
end end
...@@ -41,20 +41,14 @@ RSpec.describe BulkImports::Pipeline do ...@@ -41,20 +41,14 @@ RSpec.describe BulkImports::Pipeline do
BulkImports::MyPipeline.loader(klass, options) BulkImports::MyPipeline.loader(klass, options)
BulkImports::MyPipeline.abort_on_failure! BulkImports::MyPipeline.abort_on_failure!
expect(BulkImports::MyPipeline.extractors) expect(BulkImports::MyPipeline.get_extractor).to eq({ klass: klass, options: options })
.to contain_exactly(
{ klass: BulkImports::Extractor, options: { foo: :bar } },
{ klass: klass, options: options })
expect(BulkImports::MyPipeline.transformers) expect(BulkImports::MyPipeline.transformers)
.to contain_exactly( .to contain_exactly(
{ klass: BulkImports::Transformer, options: { foo: :bar } }, { klass: BulkImports::Transformer, options: { foo: :bar } },
{ klass: klass, options: options }) { klass: klass, options: options })
expect(BulkImports::MyPipeline.loaders) expect(BulkImports::MyPipeline.get_loader).to eq({ klass: klass, options: options })
.to contain_exactly(
{ klass: BulkImports::Loader, options: { foo: :bar } },
{ klass: klass, options: options })
expect(BulkImports::MyPipeline.abort_on_failure?).to eq(true) expect(BulkImports::MyPipeline.abort_on_failure?).to eq(true)
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment