Commit 02d36848 authored by Vitali Tatarintev's avatar Vitali Tatarintev

Merge branch 'georgekoltsov/bulk-import-entity-worker' into 'master'

Move Group Migration entities import to individual sidekiq jobs

See merge request gitlab-org/gitlab!50781
parents bd28c107 21dcb48a
...@@ -1433,6 +1433,14 @@ ...@@ -1433,6 +1433,14 @@
:idempotent: :idempotent:
:tags: [] :tags: []
- :name: bulk_import - :name: bulk_import
:feature_category: :importers
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent:
:tags: []
- :name: bulk_imports_entity
:feature_category: :importers :feature_category: :importers
:has_external_dependencies: true :has_external_dependencies: true
:urgency: :low :urgency: :low
......
...@@ -7,9 +7,58 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker ...@@ -7,9 +7,58 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker
sidekiq_options retry: false, dead: false sidekiq_options retry: false, dead: false
worker_has_external_dependencies! PERFORM_DELAY = 5.seconds
DEFAULT_BATCH_SIZE = 5
def perform(bulk_import_id) def perform(bulk_import_id)
BulkImports::Importers::GroupsImporter.new(bulk_import_id).execute @bulk_import = BulkImport.find_by_id(bulk_import_id)
return unless @bulk_import
return if @bulk_import.finished?
return @bulk_import.finish! if all_entities_processed? && @bulk_import.started?
return re_enqueue if max_batch_size_exceeded? # Do not start more jobs if max allowed are already running
@bulk_import.start! if @bulk_import.created?
created_entities.first(next_batch_size).each do |entity|
entity.start!
BulkImports::EntityWorker.perform_async(entity.id)
end
re_enqueue
end
private
def entities
@entities ||= @bulk_import.entities
end
def started_entities
entities.with_status(:started)
end
def created_entities
entities.with_status(:created)
end
def all_entities_processed?
entities.all? { |entity| entity.finished? || entity.failed? }
end
def max_batch_size_exceeded?
started_entities.count >= DEFAULT_BATCH_SIZE
end
def next_batch_size
[DEFAULT_BATCH_SIZE - started_entities.count, 0].max
end
# A new BulkImportWorker job is enqueued to either
# - Process the new BulkImports::Entity created during import (e.g. for the subgroups)
# - Or to mark the `bulk_import` as finished
def re_enqueue
BulkImportWorker.perform_in(PERFORM_DELAY, @bulk_import.id)
end end
end end
# frozen_string_literal: true
module BulkImports
class EntityWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
feature_category :importers
sidekiq_options retry: false, dead: false
worker_has_external_dependencies!
def perform(entity_id)
entity = BulkImports::Entity.with_status(:started).find_by_id(entity_id)
if entity
entity.update!(jid: jid)
BulkImports::Importers::GroupImporter.new(entity).execute
end
end
end
end
---
title: Move Group Migration entities import to individual sidekiq jobs
merge_request: 50781
author:
type: changed
...@@ -50,6 +50,8 @@ ...@@ -50,6 +50,8 @@
- 1 - 1
- - bulk_import - - bulk_import
- 1 - 1
- - bulk_imports_entity
- 1
- - chaos - - chaos
- 2 - 2
- - chat_notification - - chat_notification
......
...@@ -5,7 +5,7 @@ require 'spec_helper' ...@@ -5,7 +5,7 @@ require 'spec_helper'
RSpec.describe BulkImports::Importers::GroupImporter do RSpec.describe BulkImports::Importers::GroupImporter do
let(:user) { create(:user) } let(:user) { create(:user) }
let(:bulk_import) { create(:bulk_import) } let(:bulk_import) { create(:bulk_import) }
let(:bulk_import_entity) { create(:bulk_import_entity, bulk_import: bulk_import) } let(:bulk_import_entity) { create(:bulk_import_entity, :started, bulk_import: bulk_import) }
let(:bulk_import_configuration) { create(:bulk_import_configuration, bulk_import: bulk_import) } let(:bulk_import_configuration) { create(:bulk_import_configuration, bulk_import: bulk_import) }
let(:context) do let(:context) do
BulkImports::Pipeline::Context.new( BulkImports::Pipeline::Context.new(
...@@ -23,7 +23,6 @@ RSpec.describe BulkImports::Importers::GroupImporter do ...@@ -23,7 +23,6 @@ RSpec.describe BulkImports::Importers::GroupImporter do
describe '#execute' do describe '#execute' do
it "starts the entity and run its pipelines" do it "starts the entity and run its pipelines" do
expect(bulk_import_entity).to receive(:start!).and_call_original
expect_to_run_pipeline BulkImports::Groups::Pipelines::GroupPipeline, context: context expect_to_run_pipeline BulkImports::Groups::Pipelines::GroupPipeline, context: context
expect_to_run_pipeline EE::BulkImports::Groups::Pipelines::EpicsPipeline, context: context expect_to_run_pipeline EE::BulkImports::Groups::Pipelines::EpicsPipeline, context: context
expect_to_run_pipeline BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline, context: context expect_to_run_pipeline BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline, context: context
......
...@@ -8,7 +8,6 @@ module BulkImports ...@@ -8,7 +8,6 @@ module BulkImports
end end
def execute def execute
entity.start!
bulk_import = entity.bulk_import bulk_import = entity.bulk_import
configuration = bulk_import.configuration configuration = bulk_import.configuration
......
# frozen_string_literal: true
module BulkImports
module Importers
class GroupsImporter
def initialize(bulk_import_id)
@bulk_import = BulkImport.find(bulk_import_id)
end
def execute
bulk_import.start! unless bulk_import.started?
if entities_to_import.empty?
bulk_import.finish!
else
entities_to_import.each do |entity|
BulkImports::Importers::GroupImporter.new(entity).execute
end
# A new BulkImportWorker job is enqueued to either
# - Process the new BulkImports::Entity created for the subgroups
# - Or to mark the `bulk_import` as finished.
BulkImportWorker.perform_async(bulk_import.id)
end
end
private
attr_reader :bulk_import
def entities_to_import
@entities_to_import ||= bulk_import.entities.with_status(:created)
end
end
end
end
...@@ -5,7 +5,7 @@ require 'spec_helper' ...@@ -5,7 +5,7 @@ require 'spec_helper'
RSpec.describe BulkImports::Importers::GroupImporter do RSpec.describe BulkImports::Importers::GroupImporter do
let(:user) { create(:user) } let(:user) { create(:user) }
let(:bulk_import) { create(:bulk_import) } let(:bulk_import) { create(:bulk_import) }
let(:bulk_import_entity) { create(:bulk_import_entity, bulk_import: bulk_import) } let(:bulk_import_entity) { create(:bulk_import_entity, :started, bulk_import: bulk_import) }
let(:bulk_import_configuration) { create(:bulk_import_configuration, bulk_import: bulk_import) } let(:bulk_import_configuration) { create(:bulk_import_configuration, bulk_import: bulk_import) }
let(:context) do let(:context) do
BulkImports::Pipeline::Context.new( BulkImports::Pipeline::Context.new(
...@@ -23,7 +23,6 @@ RSpec.describe BulkImports::Importers::GroupImporter do ...@@ -23,7 +23,6 @@ RSpec.describe BulkImports::Importers::GroupImporter do
describe '#execute' do describe '#execute' do
it 'starts the entity and run its pipelines' do it 'starts the entity and run its pipelines' do
expect(bulk_import_entity).to receive(:start!).and_call_original
expect_to_run_pipeline BulkImports::Groups::Pipelines::GroupPipeline, context: context expect_to_run_pipeline BulkImports::Groups::Pipelines::GroupPipeline, context: context
expect_to_run_pipeline('EE::BulkImports::Groups::Pipelines::EpicsPipeline'.constantize, context: context) if Gitlab.ee? expect_to_run_pipeline('EE::BulkImports::Groups::Pipelines::EpicsPipeline'.constantize, context: context) if Gitlab.ee?
expect_to_run_pipeline BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline, context: context expect_to_run_pipeline BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline, context: context
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::Importers::GroupsImporter do
let_it_be(:bulk_import) { create(:bulk_import) }
subject { described_class.new(bulk_import.id) }
describe '#execute' do
context "when there is entities to be imported" do
let!(:bulk_import_entity) { create(:bulk_import_entity, bulk_import: bulk_import) }
it "starts the bulk_import and imports its entities" do
expect(BulkImports::Importers::GroupImporter).to receive(:new)
.with(bulk_import_entity).and_return(double(execute: true))
expect(BulkImportWorker).to receive(:perform_async).with(bulk_import.id)
subject.execute
expect(bulk_import.reload).to be_started
end
end
context "when there is no entities to be imported" do
it "starts the bulk_import and imports its entities" do
expect(BulkImports::Importers::GroupImporter).not_to receive(:new)
expect(BulkImportWorker).not_to receive(:perform_async)
subject.execute
expect(bulk_import.reload).to be_finished
end
end
end
end
...@@ -4,13 +4,74 @@ require 'spec_helper' ...@@ -4,13 +4,74 @@ require 'spec_helper'
RSpec.describe BulkImportWorker do RSpec.describe BulkImportWorker do
describe '#perform' do describe '#perform' do
it 'executes Group Importer' do before do
bulk_import_id = 1 stub_const("#{described_class}::DEFAULT_BATCH_SIZE", 1)
end
context 'when no bulk import is found' do
it 'does nothing' do
expect(described_class).not_to receive(:perform_in)
subject.perform(non_existing_record_id)
end
end
context 'when bulk import is finished' do
it 'does nothing' do
bulk_import = create(:bulk_import, :finished)
expect(described_class).not_to receive(:perform_in)
subject.perform(bulk_import.id)
end
end
context 'when all entities are processed' do
it 'marks bulk import as finished' do
bulk_import = create(:bulk_import, :started)
create(:bulk_import_entity, :finished, bulk_import: bulk_import)
create(:bulk_import_entity, :failed, bulk_import: bulk_import)
subject.perform(bulk_import.id)
expect(bulk_import.reload.finished?).to eq(true)
end
end
context 'when maximum allowed number of import entities in progress' do
it 'reenqueues itself' do
bulk_import = create(:bulk_import, :started)
(described_class::DEFAULT_BATCH_SIZE + 1).times { |_| create(:bulk_import_entity, :started, bulk_import: bulk_import) }
expect(described_class).to receive(:perform_in).with(described_class::PERFORM_DELAY, bulk_import.id)
subject.perform(bulk_import.id)
end
end
context 'when bulk import is created' do
it 'marks bulk import as started' do
bulk_import = create(:bulk_import, :created)
create(:bulk_import_entity, :created, bulk_import: bulk_import)
expect(BulkImports::Importers::GroupsImporter) subject.perform(bulk_import.id)
.to receive(:new).with(bulk_import_id).and_return(double(execute: true))
described_class.new.perform(bulk_import_id) expect(bulk_import.reload.started?).to eq(true)
end
context 'when there are created entities to process' do
it 'marks a batch of entities as started, enqueues BulkImports::EntityWorker and reenqueues' do
bulk_import = create(:bulk_import, :created)
(described_class::DEFAULT_BATCH_SIZE + 1).times { |_| create(:bulk_import_entity, :created, bulk_import: bulk_import) }
expect(described_class).to receive(:perform_in).with(described_class::PERFORM_DELAY, bulk_import.id)
expect(BulkImports::EntityWorker).to receive(:perform_async)
subject.perform(bulk_import.id)
expect(bulk_import.entities.map(&:status_name)).to contain_exactly(:created, :started)
end
end
end end
end end
end end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::EntityWorker do
describe '#execute' do
let(:bulk_import) { create(:bulk_import) }
context 'when started entity exists' do
let(:entity) { create(:bulk_import_entity, :started, bulk_import: bulk_import) }
it 'executes BulkImports::Importers::GroupImporter' do
expect(BulkImports::Importers::GroupImporter).to receive(:new).with(entity).and_call_original
subject.perform(entity.id)
end
it 'sets jid' do
jid = 'jid'
allow(subject).to receive(:jid).and_return(jid)
subject.perform(entity.id)
expect(entity.reload.jid).to eq(jid)
end
end
context 'when started entity does not exist' do
it 'does not execute BulkImports::Importers::GroupImporter' do
entity = create(:bulk_import_entity, bulk_import: bulk_import)
expect(BulkImports::Importers::GroupImporter).not_to receive(:new)
subject.perform(entity.id)
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment