Commit 0c14911b authored by Max Woolf's avatar Max Woolf Committed by Heinrich Lee Yu

Add timeout status to bulk importer

Adds a new status, timeout, to BulkImport
and BulkImports::Entity models.

Adds a new worker to be used as a daily cron
to set stale objects as timed out.

Add daily cron to run new worker

Changelog: added
parent 5dad1cf7
...@@ -16,10 +16,13 @@ class BulkImport < ApplicationRecord ...@@ -16,10 +16,13 @@ class BulkImport < ApplicationRecord
enum source_type: { gitlab: 0 } enum source_type: { gitlab: 0 }
scope :stale, -> { where('created_at < ?', 8.hours.ago).where(status: [0, 1]) }
state_machine :status, initial: :created do state_machine :status, initial: :created do
state :created, value: 0 state :created, value: 0
state :started, value: 1 state :started, value: 1
state :finished, value: 2 state :finished, value: 2
state :timeout, value: 3
state :failed, value: -1 state :failed, value: -1
event :start do event :start do
...@@ -30,6 +33,11 @@ class BulkImport < ApplicationRecord ...@@ -30,6 +33,11 @@ class BulkImport < ApplicationRecord
transition started: :finished transition started: :finished
end end
event :cleanup_stale do
transition created: :timeout
transition started: :timeout
end
event :fail_op do event :fail_op do
transition any => :failed transition any => :failed
end end
......
...@@ -51,11 +51,13 @@ class BulkImports::Entity < ApplicationRecord ...@@ -51,11 +51,13 @@ class BulkImports::Entity < ApplicationRecord
enum source_type: { group_entity: 0, project_entity: 1 } enum source_type: { group_entity: 0, project_entity: 1 }
scope :by_user_id, ->(user_id) { joins(:bulk_import).where(bulk_imports: { user_id: user_id }) } scope :by_user_id, ->(user_id) { joins(:bulk_import).where(bulk_imports: { user_id: user_id }) }
scope :stale, -> { where('created_at < ?', 8.hours.ago).where(status: [0, 1]) }
state_machine :status, initial: :created do state_machine :status, initial: :created do
state :created, value: 0 state :created, value: 0
state :started, value: 1 state :started, value: 1
state :finished, value: 2 state :finished, value: 2
state :timeout, value: 3
state :failed, value: -1 state :failed, value: -1
event :start do event :start do
...@@ -70,6 +72,11 @@ class BulkImports::Entity < ApplicationRecord ...@@ -70,6 +72,11 @@ class BulkImports::Entity < ApplicationRecord
event :fail_op do event :fail_op do
transition any => :failed transition any => :failed
end end
event :cleanup_stale do
transition created: :timeout
transition started: :timeout
end
end end
def self.all_human_statuses def self.all_human_statuses
......
...@@ -192,6 +192,15 @@ ...@@ -192,6 +192,15 @@
:weight: 1 :weight: 1
:idempotent: true :idempotent: true
:tags: [] :tags: []
- :name: cronjob:bulk_imports_stuck_import
:worker_name: BulkImports::StuckImportWorker
:feature_category: :importers
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: cronjob:ci_archive_traces_cron - :name: cronjob:ci_archive_traces_cron
:worker_name: Ci::ArchiveTracesCronWorker :worker_name: Ci::ArchiveTracesCronWorker
:feature_category: :continuous_integration :feature_category: :continuous_integration
......
# frozen_string_literal: true
module BulkImports
class StuckImportWorker
include ApplicationWorker
# This worker does not schedule other workers that require context.
include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
idempotent!
data_consistency :always
feature_category :importers
def perform
BulkImport.stale.find_each do |import|
import.cleanup_stale
end
BulkImports::Entity.stale.find_each do |import|
import.cleanup_stale
end
end
end
end
...@@ -500,6 +500,9 @@ Settings.cron_jobs['trending_projects_worker']['job_class'] = 'TrendingProjectsW ...@@ -500,6 +500,9 @@ Settings.cron_jobs['trending_projects_worker']['job_class'] = 'TrendingProjectsW
Settings.cron_jobs['remove_unreferenced_lfs_objects_worker'] ||= Settingslogic.new({}) Settings.cron_jobs['remove_unreferenced_lfs_objects_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['remove_unreferenced_lfs_objects_worker']['cron'] ||= '20 0 * * *' Settings.cron_jobs['remove_unreferenced_lfs_objects_worker']['cron'] ||= '20 0 * * *'
Settings.cron_jobs['remove_unreferenced_lfs_objects_worker']['job_class'] = 'RemoveUnreferencedLfsObjectsWorker' Settings.cron_jobs['remove_unreferenced_lfs_objects_worker']['job_class'] = 'RemoveUnreferencedLfsObjectsWorker'
Settings.cron_jobs['bulk_imports_stuck_import_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['bulk_imports_stuck_import_worker']['cron'] ||= '0 */4 * * *'
Settings.cron_jobs['bulk_imports_stuck_import_worker']['job_class'] = 'BulkImports::StuckImportWorker'
Settings.cron_jobs['import_stuck_project_import_jobs'] ||= Settingslogic.new({}) Settings.cron_jobs['import_stuck_project_import_jobs'] ||= Settingslogic.new({})
Settings.cron_jobs['import_stuck_project_import_jobs']['cron'] ||= '15 * * * *' Settings.cron_jobs['import_stuck_project_import_jobs']['cron'] ||= '15 * * * *'
Settings.cron_jobs['import_stuck_project_import_jobs']['job_class'] = 'Gitlab::Import::StuckProjectImportJobsWorker' Settings.cron_jobs['import_stuck_project_import_jobs']['job_class'] = 'Gitlab::Import::StuckProjectImportJobsWorker'
......
...@@ -138,9 +138,9 @@ migrated: ...@@ -138,9 +138,9 @@ migrated:
In a [rails console session](../../../administration/operations/rails_console.md#starting-a-rails-console-session), In a [rails console session](../../../administration/operations/rails_console.md#starting-a-rails-console-session),
you can find the failure or error messages for the group import attempt using: you can find the failure or error messages for the group import attempt using:
```shell ```ruby
# Get relevant import records # Get relevant import records
import = BulkImports::Entity.where(namespace_id: Group.id).bulk_import import = BulkImports::Entity.where(namespace_id: Group.id).map(&:bulk_import)
# Alternative lookup by user # Alternative lookup by user
import = BulkImport.where(user_id: User.find(...)).last import = BulkImport.where(user_id: User.find(...)).last
...@@ -154,3 +154,18 @@ entities.map(&:failures).flatten ...@@ -154,3 +154,18 @@ entities.map(&:failures).flatten
# Alternative failure lookup by status # Alternative failure lookup by status
entities.where(status: [-1]).pluck(:destination_name, :destination_namespace, :status) entities.where(status: [-1]).pluck(:destination_name, :destination_namespace, :status)
``` ```
### Stale imports
> [Introduced](https://gitlab.com/gitlab-org/gitlab/-/issues/352985) in GitLab 14.10.
When troubleshooting group migration, an import may not complete because the import workers took
longer than 8 hours to execute. In this case, the `status` of either a `BulkImport` or
`BulkImport::Entity` is `3` (`timeout`):
```ruby
# Get relevant import records
import = BulkImports::Entity.where(namespace_id: Group.id).map(&:bulk_import)
import.status #=> 3 means that the import timed out.
```
...@@ -3,6 +3,13 @@ ...@@ -3,6 +3,13 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe BulkImport, type: :model do RSpec.describe BulkImport, type: :model do
let_it_be(:created_bulk_import) { create(:bulk_import, :created) }
let_it_be(:started_bulk_import) { create(:bulk_import, :started) }
let_it_be(:finished_bulk_import) { create(:bulk_import, :finished) }
let_it_be(:failed_bulk_import) { create(:bulk_import, :failed) }
let_it_be(:stale_created_bulk_import) { create(:bulk_import, :created, created_at: 3.days.ago) }
let_it_be(:stale_started_bulk_import) { create(:bulk_import, :started, created_at: 3.days.ago) }
describe 'associations' do describe 'associations' do
it { is_expected.to belong_to(:user).required } it { is_expected.to belong_to(:user).required }
it { is_expected.to have_one(:configuration) } it { is_expected.to have_one(:configuration) }
...@@ -16,9 +23,15 @@ RSpec.describe BulkImport, type: :model do ...@@ -16,9 +23,15 @@ RSpec.describe BulkImport, type: :model do
it { is_expected.to define_enum_for(:source_type).with_values(%i[gitlab]) } it { is_expected.to define_enum_for(:source_type).with_values(%i[gitlab]) }
end end
describe '.stale scope' do
subject { described_class.stale }
it { is_expected.to contain_exactly(stale_created_bulk_import, stale_started_bulk_import) }
end
describe '.all_human_statuses' do describe '.all_human_statuses' do
it 'returns all human readable entity statuses' do it 'returns all human readable entity statuses' do
expect(described_class.all_human_statuses).to contain_exactly('created', 'started', 'finished', 'failed') expect(described_class.all_human_statuses).to contain_exactly('created', 'started', 'finished', 'failed', 'timeout')
end end
end end
......
...@@ -151,7 +151,7 @@ RSpec.describe BulkImports::Entity, type: :model do ...@@ -151,7 +151,7 @@ RSpec.describe BulkImports::Entity, type: :model do
describe '.all_human_statuses' do describe '.all_human_statuses' do
it 'returns all human readable entity statuses' do it 'returns all human readable entity statuses' do
expect(described_class.all_human_statuses).to contain_exactly('created', 'started', 'finished', 'failed') expect(described_class.all_human_statuses).to contain_exactly('created', 'started', 'finished', 'failed', 'timeout')
end end
end end
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::StuckImportWorker do
let_it_be(:created_bulk_import) { create(:bulk_import, :created) }
let_it_be(:started_bulk_import) { create(:bulk_import, :started) }
let_it_be(:stale_created_bulk_import) { create(:bulk_import, :created, created_at: 3.days.ago) }
let_it_be(:stale_started_bulk_import) { create(:bulk_import, :started, created_at: 3.days.ago) }
let_it_be(:stale_created_bulk_import_entity) { create(:bulk_import_entity, :created, created_at: 3.days.ago) }
let_it_be(:stale_started_bulk_import_entity) { create(:bulk_import_entity, :started, created_at: 3.days.ago) }
subject { described_class.new.perform }
describe 'perform' do
it 'updates the status of bulk imports to timeout' do
expect { subject }.to change { stale_created_bulk_import.reload.status }.from(0).to(3)
.and change { stale_started_bulk_import.reload.status }.from(1).to(3)
end
it 'updates the status of bulk import entities to timeout' do
expect { subject }.to change { stale_created_bulk_import_entity.reload.status }.from(0).to(3)
.and change { stale_started_bulk_import_entity.reload.status }.from(1).to(3)
end
it 'does not update the status of non-stale records' do
expect { subject }.to not_change { created_bulk_import.reload.status }
.and not_change { started_bulk_import.reload.status }
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment