Commit e52757dc authored by Terri Chu's avatar Terri Chu Committed by Dylan Griffith

Global Search data migrations: add storage requirements

parent 082c6ccd
...@@ -241,6 +241,10 @@ cron worker runs. Default value is 5 minutes. ...@@ -241,6 +241,10 @@ cron worker runs. Default value is 5 minutes.
- `pause_indexing!` - Pause indexing while the migration runs. This setting will record the indexing setting before - `pause_indexing!` - Pause indexing while the migration runs. This setting will record the indexing setting before
the migration runs and set it back to that value when the migration is completed. the migration runs and set it back to that value when the migration is completed.
- `space_requirements!` - Verify that enough free space is available in the cluster when the migration runs. This setting
will halt the migration if the storage required is not available when the migration runs. The migration must provide
the space required in bytes by defining a `space_required_bytes` method.
```ruby ```ruby
# frozen_string_literal: true # frozen_string_literal: true
...@@ -248,7 +252,9 @@ class BatchedMigrationName < Elastic::Migration ...@@ -248,7 +252,9 @@ class BatchedMigrationName < Elastic::Migration
# Declares a migration should be run in batches # Declares a migration should be run in batches
batched! batched!
throttle_delay 10.minutes throttle_delay 10.minutes
pause_indexing!
space_requirements!
# ... # ...
end end
``` ```
......
...@@ -4,7 +4,7 @@ module Elastic ...@@ -4,7 +4,7 @@ module Elastic
class MigrationRecord class MigrationRecord
attr_reader :version, :name, :filename attr_reader :version, :name, :filename
delegate :migrate, :skip_migration?, :completed?, :batched?, :throttle_delay, :pause_indexing?, to: :migration delegate :migrate, :skip_migration?, :completed?, :batched?, :throttle_delay, :pause_indexing?, :space_requirements?, :space_required_bytes, to: :migration
def initialize(version:, name:, filename:) def initialize(version:, name:, filename:)
@version = version @version = version
...@@ -45,6 +45,11 @@ module Elastic ...@@ -45,6 +45,11 @@ module Elastic
!!load_state&.dig('halted') !!load_state&.dig('halted')
end end
def halt!(additional_options = {})
state = { halted: true, halted_indexing_unpaused: false }.merge(additional_options)
save_state!(state)
end
def name_for_key def name_for_key
name.underscore name.underscore
end end
......
...@@ -19,7 +19,19 @@ module Elastic ...@@ -19,7 +19,19 @@ module Elastic
self.class.get_pause_indexing self.class.get_pause_indexing
end end
def space_requirements?
self.class.get_space_requirements
end
class_methods do class_methods do
def space_requirements!
class_attributes[:space_requirements] = true
end
def get_space_requirements
class_attributes[:space_requirements]
end
def batched! def batched!
class_attributes[:batched] = true class_attributes[:batched] = true
end end
......
...@@ -7,6 +7,7 @@ module Elastic ...@@ -7,6 +7,7 @@ module Elastic
# There is no onward scheduling and this cron handles work from across the # There is no onward scheduling and this cron handles work from across the
# application, so there's no useful context to add. # application, so there's no useful context to add.
include CronjobQueue # rubocop:disable Scalability/CronWorkerContext include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
include ActionView::Helpers::NumberHelper
feature_category :global_search feature_category :global_search
idempotent! idempotent!
...@@ -36,6 +37,20 @@ module Elastic ...@@ -36,6 +37,20 @@ module Elastic
break false break false
end end
if !migration.started? && migration.space_requirements?
free_size_bytes = helper.cluster_free_size_bytes
space_required_bytes = migration.space_required_bytes
logger.info "MigrationWorker: migration[#{migration.name}] checking free space in cluster. Required space #{number_to_human_size(space_required_bytes)}. Free space #{number_to_human_size(free_size_bytes)}."
if free_size_bytes < space_required_bytes
logger.warn "MigrationWorker: migration[#{migration.name}] You should have at least #{number_to_human_size(space_required_bytes)} of free space in the cluster to run this migration. Please increase the storage in your Elasticsearch cluster."
logger.info "MigrationWorker: migration[#{migration.name}] updating with halted: true"
migration.halt!
break false
end
end
execute_migration(migration) execute_migration(migration)
completed = migration.completed? completed = migration.completed?
......
---
title: Add storage requirement option to Advanced Search notes migration
merge_request: 55423
author:
type: changed
...@@ -5,6 +5,7 @@ class MigrateNotesToSeparateIndex < Elastic::Migration ...@@ -5,6 +5,7 @@ class MigrateNotesToSeparateIndex < Elastic::Migration
pause_indexing! pause_indexing!
batched! batched!
space_requirements!
throttle_delay 1.minute throttle_delay 1.minute
MAX_ATTEMPTS_PER_SLICE = 30 MAX_ATTEMPTS_PER_SLICE = 30
...@@ -92,6 +93,12 @@ class MigrateNotesToSeparateIndex < Elastic::Migration ...@@ -92,6 +93,12 @@ class MigrateNotesToSeparateIndex < Elastic::Migration
original_count == new_count original_count == new_count
end end
def space_required_bytes
# notes index on GitLab.com takes 0.31% of the main index storage
# this migration will require 5 times that value to give a buffer
(helper.index_size_bytes * 0.0155).ceil
end
private private
def document_type def document_type
......
...@@ -19,6 +19,10 @@ module Elastic ...@@ -19,6 +19,10 @@ module Elastic
raise NotImplementedError, 'Please extend Elastic::Migration' raise NotImplementedError, 'Please extend Elastic::Migration'
end end
def space_required_bytes
raise NotImplementedError, 'Please extend Elastic::Migration'
end
private private
def helper def helper
...@@ -34,11 +38,9 @@ module Elastic ...@@ -34,11 +38,9 @@ module Elastic
end end
def fail_migration_halt_error!(retry_attempt: 0) def fail_migration_halt_error!(retry_attempt: 0)
set_migration_state( log "Halting migration on retry_attempt #{retry_attempt}"
retry_attempt: retry_attempt,
halted: true, migration_record.halt!(retry_attempt: retry_attempt)
halted_indexing_unpaused: false
)
end end
def log(message) def log(message)
......
...@@ -19,6 +19,7 @@ RSpec.describe MigrateNotesToSeparateIndex do ...@@ -19,6 +19,7 @@ RSpec.describe MigrateNotesToSeparateIndex do
expect(migration.batched?).to be_truthy expect(migration.batched?).to be_truthy
expect(migration.throttle_delay).to eq(1.minute) expect(migration.throttle_delay).to eq(1.minute)
expect(migration.pause_indexing?).to be_truthy expect(migration.pause_indexing?).to be_truthy
expect(migration.space_requirements?).to be_truthy
end end
end end
...@@ -194,4 +195,14 @@ RSpec.describe MigrateNotesToSeparateIndex do ...@@ -194,4 +195,14 @@ RSpec.describe MigrateNotesToSeparateIndex do
end end
end end
end end
describe 'space_required_bytes' do
subject { migration.space_required_bytes }
before do
allow(helper).to receive(:index_size_bytes).and_return(300)
end
it { is_expected.to eq(5) }
end
end end
...@@ -32,4 +32,16 @@ RSpec.describe Elastic::Migration, :elastic do ...@@ -32,4 +32,16 @@ RSpec.describe Elastic::Migration, :elastic do
expect { bare_migration.migrate }.to raise_error(NotImplementedError) expect { bare_migration.migrate }.to raise_error(NotImplementedError)
end end
end end
describe '#completed?' do
it 'raises exception for original class' do
expect { bare_migration.completed? }.to raise_error(NotImplementedError)
end
end
describe '#space_required_bytes' do
it 'raises exception for original class' do
expect { bare_migration.space_required_bytes }.to raise_error(NotImplementedError)
end
end
end end
...@@ -41,6 +41,22 @@ RSpec.describe Elastic::MigrationRecord, :elastic do ...@@ -41,6 +41,22 @@ RSpec.describe Elastic::MigrationRecord, :elastic do
end end
end end
describe '#halt!' do
it 'sets state for halted and halted_indexing_unpaused' do
record.halt!
expect(record.load_from_index.dig('_source', 'state', 'halted')).to be_truthy
expect(record.load_from_index.dig('_source', 'state', 'halted_indexing_unpaused')).to be_falsey
end
it 'sets state with additional options if passed' do
record.halt!(hello: 'world', good: 'bye')
expect(record.load_from_index.dig('_source', 'state', 'hello')).to eq('world')
expect(record.load_from_index.dig('_source', 'state', 'good')).to eq('bye')
end
end
describe '#started?' do describe '#started?' do
it 'changes on object save' do it 'changes on object save' do
expect { record.save!(completed: true) }.to change { record.started? }.from(false).to(true) expect { record.save!(completed: true) }.to change { record.started? }.from(false).to(true)
......
...@@ -9,20 +9,32 @@ RSpec.describe Elastic::MigrationOptions do ...@@ -9,20 +9,32 @@ RSpec.describe Elastic::MigrationOptions do
end end
end end
describe '#batched?' do shared_examples_for 'a boolean option' do |option|
subject { migration_class.new.batched? } subject { migration_class.new.public_send("#{option}?") }
it 'defaults to false' do it 'defaults to false' do
expect(subject).to be_falsey expect(subject).to be_falsey
end end
it 'respects when batched! is set for the class' do it "respects when #{option} is set for the class" do
migration_class.batched! migration_class.public_send("#{option}!")
expect(subject).to be_truthy expect(subject).to be_truthy
end end
end end
describe '#batched?' do
it_behaves_like 'a boolean option', :batched
end
describe '#pause_indexing?' do
it_behaves_like 'a boolean option', :pause_indexing
end
describe '#space_requirements?' do
it_behaves_like 'a boolean option', :space_requirements
end
describe '#throttle_delay' do describe '#throttle_delay' do
subject { migration_class.new.throttle_delay } subject { migration_class.new.throttle_delay }
......
...@@ -145,6 +145,46 @@ RSpec.describe Elastic::MigrationWorker, :elastic do ...@@ -145,6 +145,46 @@ RSpec.describe Elastic::MigrationWorker, :elastic do
end end
end end
end end
context 'checks space required' do
let(:helper) { Gitlab::Elastic::Helper.new }
let(:started) { false }
let(:completed) { false }
let(:batched) { false }
before do
allow(Gitlab::Elastic::Helper).to receive(:default).and_return(helper)
allow(migration).to receive(:space_requirements?).and_return(true)
allow(migration).to receive(:space_required_bytes).and_return(10)
end
it 'halts the migration if there is not enough space' do
allow(helper).to receive(:cluster_free_size_bytes).and_return(5)
expect(migration).to receive(:halt!)
expect(migration).not_to receive(:migrate)
subject.perform
end
it 'runs the migration if there is enough space' do
allow(helper).to receive(:cluster_free_size_bytes).and_return(20)
expect(migration).not_to receive(:halt!)
expect(migration).to receive(:migrate).once
subject.perform
end
context 'when migration is already started' do
let(:started) { true }
it 'does not check space requirements' do
expect(helper).not_to receive(:cluster_free_size_bytes)
expect(migration).not_to receive(:space_required_bytes)
subject.perform
end
end
end
end end
end end
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment