Commit 1c5b9011 authored by Dmitry Gruzd's avatar Dmitry Gruzd Committed by Dylan Griffith

Add background migrations for Elasticsearch

This MR implements elastic migration framework, which looks and behaves
similar to rails database migrations.
Migrations are stored in ee/elastic/migrate/ with
YYYYMMDDHHMMSS_migration_name.rb file name format
parent 10e72ff5
......@@ -605,6 +605,9 @@ Gitlab.ee do
Settings.cron_jobs['elastic_remove_expired_namespace_subscriptions_from_index_cron_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['elastic_remove_expired_namespace_subscriptions_from_index_cron_worker']['cron'] ||= '10 3 * * *'
Settings.cron_jobs['elastic_remove_expired_namespace_subscriptions_from_index_cron_worker']['job_class'] ||= 'ElasticRemoveExpiredNamespaceSubscriptionsFromIndexCronWorker'
Settings.cron_jobs['elastic_migration_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['elastic_migration_worker']['cron'] ||= '*/30 * * * *'
Settings.cron_jobs['elastic_migration_worker']['job_class'] ||= 'Elastic::MigrationWorker'
Settings.cron_jobs['sync_seat_link_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['sync_seat_link_worker']['cron'] ||= "#{rand(60)} 0 * * *"
Settings.cron_jobs['sync_seat_link_worker']['job_class'] = 'SyncSeatLinkWorker'
......
......@@ -184,6 +184,38 @@ If the current version is `v12p1`, and we need to create a new version for `v12p
1. Change the namespace for files under `v12p1` folder from `Latest` to `V12p1`
1. Make changes to files under the `latest` folder as needed
## Creating a new Global Search migration
> This functionality was introduced by [#234046](https://gitlab.com/gitlab-org/gitlab/-/issues/234046).
NOTE: **Note:**
This only supported for indices created with GitLab 13.0 or greater.
Migrations are stored in the [`ee/elastic/migrate/`](https://gitlab.com/gitlab-org/gitlab/-/tree/master/ee/elastic/migrate) folder with `YYYYMMDDHHMMSS_migration_name.rb`
file name format, which is similar to Rails database migrations:
```ruby
# frozen_string_literal: true
class MigrationName < Elastic::Migration
# Important: Any update to the Elastic index mappings should be replicated in Elastic::Latest::Config
def migrate
end
# Check if the migration has completed
# Return true if completed, otherwise return false
def completed?
end
end
```
Applied migrations are stored in `gitlab-#{RAILS_ENV}-migrations` index. All unexecuted migrations
are applied by the [`Elastic::MigrationWorker`](https://gitlab.com/gitlab-org/gitlab/blob/master/ee/app/workers/elastic/migration_worker.rb)
cron worker sequentially.
Any update to the Elastic index mappings should be replicated in [`Elastic::Latest::Config`](https://gitlab.com/gitlab-org/gitlab/-/blob/master/ee/lib/elastic/latest/config.rb).
## Performance Monitoring
### Prometheus
......
# frozen_string_literal: true
module Elastic
class MigrationRecord
attr_reader :version, :name, :filename
delegate :migrate, :skip_migration?, :completed?, to: :migration
def initialize(version:, name:, filename:)
@version = version
@name = name
@filename = filename
@migration = nil
end
def save!(completed:)
raise 'Migrations index is not found' unless helper.index_exists?(index_name: index_name)
client.index index: index_name, type: '_doc', id: version, body: { completed: completed }
end
def persisted?
load_from_index.present?
end
def load_from_index
client.get(index: index_name, id: version)
rescue Elasticsearch::Transport::Transport::Errors::NotFound
nil
end
def self.persisted_versions(completed:)
helper = Gitlab::Elastic::Helper.default
helper.client
.search(index: helper.migrations_index_name, body: { query: { term: { completed: completed } } })
.dig('hits', 'hits')
.map { |v| v['_id'].to_i }
rescue Elasticsearch::Transport::Transport::Errors::NotFound
[]
end
private
def migration
@migration ||= load_migration
end
def load_migration
require(File.expand_path(filename))
name.constantize.new version
end
def index_name
helper.migrations_index_name
end
def client
helper.client
end
def helper
Gitlab::Elastic::Helper.default
end
end
end
# frozen_string_literal: true
module Elastic
class DataMigrationService
MIGRATIONS_PATH = 'ee/elastic/migrate'
MIGRATION_REGEXP = /\A([0-9]+)_([_a-z0-9]*)\.rb\z/.freeze
class << self
def migration_files
Dir[migrations_full_path]
end
def migrations
migrations = migration_files.map do |file|
version, name = parse_migration_filename(file)
Elastic::MigrationRecord.new(version: version.to_i, name: name.camelize, filename: file)
end
migrations.sort_by(&:version)
end
def migration_has_finished?(name)
migration = migrations.find { |migration| migration.name == name.camelize }
!!migration&.load_from_index&.dig('_source', 'completed')
end
def mark_all_as_completed!
migrations.each { |migration| migration.save!(completed: true) }
end
private
def parse_migration_filename(filename)
File.basename(filename).scan(MIGRATION_REGEXP).first
end
def migrations_full_path
Rails.root.join(MIGRATIONS_PATH, '**', '[0-9]*_*.rb').to_s
end
end
end
end
......@@ -59,6 +59,14 @@
:weight: 1
:idempotent: true
:tags: []
- :name: cronjob:elastic_migration
:feature_category: :global_search
:has_external_dependencies:
:urgency: :throttled
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: cronjob:elastic_remove_expired_namespace_subscriptions_from_index_cron
:feature_category: :global_search
:has_external_dependencies:
......
# frozen_string_literal: true
module Elastic
class MigrationWorker
include ApplicationWorker
include Gitlab::ExclusiveLeaseHelpers
# There is no onward scheduling and this cron handles work from across the
# application, so there's no useful context to add.
include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
feature_category :global_search
idempotent!
urgency :throttled
def perform
return false unless Gitlab::CurrentSettings.elasticsearch_indexing?
return false unless helper.alias_exists?
in_lock(self.class.name.underscore, ttl: 1.day, retries: 10, sleep_sec: 1) do
migration = current_migration
unless migration
logger.info 'MigrationWorker: no migration available'
break false
end
unless helper.index_exists?(index_name: helper.migrations_index_name)
logger.info 'MigrationWorker: creating migrations index'
helper.create_migrations_index
end
execute_migration(migration)
completed = migration.completed?
logger.info "MigrationWorker: migration[#{migration.name}] updating with completed: #{completed}"
migration.save!(completed: completed)
end
end
private
def execute_migration(migration)
if migration.persisted?
logger.info "MigrationWorker: migration[#{migration.name}] did not execute migrate method since it was already executed. Waiting for migration to complete"
else
logger.info "MigrationWorker: migration[#{migration.name}] executing migrate method"
migration.migrate
end
end
def current_migration
completed_migrations = Elastic::MigrationRecord.persisted_versions(completed: true)
Elastic::DataMigrationService.migrations.find { |migration| !completed_migrations.include?(migration.version) }
end
def helper
Gitlab::Elastic::Helper.default
end
def logger
@logger ||= ::Gitlab::Elasticsearch::Logger.build
end
end
end
---
title: Add background migrations for Elasticsearch
merge_request: 46672
author:
type: added
# frozen_string_literal: true
class ApplyMaxAnalyzedOffset < Elastic::Migration
# Important: Any update to the Elastic index mappings should be replicated in Elastic::Latest::Config
def migrate
if max_analyzed_offset_setting == current_max_analyzed_offset
log "Skipping highlight.max_analyzed_offset migration since it is already applied"
return
end
log "Setting highlight.max_analyzed_offset to #{max_analyzed_offset_setting}kb"
helper.update_settings(settings: { index: { 'highlight.max_analyzed_offset': max_analyzed_offset_setting } })
log 'Update of highlight.max_analyzed_offset is completed'
end
# Check if the migration has completed
# Return true if completed, otherwise return false
def completed?
max_analyzed_offset_setting == current_max_analyzed_offset
end
private
def max_analyzed_offset_setting
Gitlab::CurrentSettings.elasticsearch_indexed_file_size_limit_kb.kilobytes
end
def current_max_analyzed_offset
Gitlab::Elastic::Helper.default.get_settings.dig('highlight', 'max_analyzed_offset').to_i
end
end
# frozen_string_literal: true
module Elastic
class Migration
attr_reader :version
def initialize(version)
@version = version
end
def migrate
raise NotImplementedError, 'Please extend Elastic::Migration'
end
def completed?
raise NotImplementedError, 'Please extend Elastic::Migration'
end
private
def helper
@helper ||= Gitlab::Elastic::Helper.default
end
def client
helper.client
end
def log(message)
logger.info "[Elastic::Migration: #{self.version}] #{message}"
end
def logger
@logger ||= ::Gitlab::Elasticsearch::Logger.build
end
end
end
......@@ -52,6 +52,33 @@ module Gitlab
mappings.deep_merge(::Elastic::Latest::CustomLanguageAnalyzers.custom_analyzers_mappings)
end
def migrations_index_name
"#{target_name}-migrations"
end
def create_migrations_index
settings = { number_of_shards: 1 }
mappings = {
properties: {
completed: {
type: 'boolean'
}
}
}
create_index_options = {
index: migrations_index_name,
body: {
settings: settings.to_hash,
mappings: mappings.to_hash
}
}
client.indices.create create_index_options
migrations_index_name
end
def create_empty_index(with_alias: true, options: {})
new_index_name = options[:index_name] || "#{target_name}-#{Time.now.strftime("%Y%m%d-%H%M")}"
......@@ -149,6 +176,12 @@ module Gitlab
client.tasks.get(task_id: task_id)
end
def get_settings(index_name: nil)
index = index_name || target_index_name
settings = client.indices.get_settings(index: index)
settings.dig(index, 'settings', 'index')
end
def update_settings(index_name: nil, settings:)
client.indices.put_settings(index: index_name || target_index_name, body: settings)
end
......
......@@ -68,6 +68,9 @@ namespace :gitlab do
helper = Gitlab::Elastic::Helper.new(target_name: args[:target_name])
index_name = helper.create_empty_index(with_alias: with_alias, options: options)
helper.create_migrations_index unless helper.index_exists?(index_name: migrations_index_name)
::Elastic::DataMigrationService.mark_all_as_completed!
puts "Index '#{index_name}' has been created.".color(:green)
puts "Alias '#{helper.target_name}' → '#{index_name}' has been created".color(:green) if with_alias
end
......
......@@ -53,6 +53,18 @@ RSpec.describe Gitlab::Elastic::Helper do
end
end
describe '#create_migrations_index' do
after do
helper.delete_index(index_name: helper.migrations_index_name)
end
it 'creates the index' do
expect { helper.create_migrations_index }
.to change { helper.index_exists?(index_name: helper.migrations_index_name) }
.from(false).to(true)
end
end
describe '#create_empty_index' do
context 'with an empty cluster' do
context 'with alias and index' do
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Elastic::Migration, :elastic do
let(:logger) { double('Gitlab::Elasticsearch::Logger') }
let(:migration_class) do
Class.new(described_class) do
def migrate
log "number_of_nodes: #{client.cluster.health['number_of_nodes']}"
raise 'Index does not exist' unless helper.index_exists?(index_name: helper.migrations_index_name)
end
end
end
let(:version) { 20201105181100 }
let(:migration) { migration_class.new(version) }
let(:bare_migration) { described_class.new(version) }
before do
allow(::Gitlab::Elasticsearch::Logger).to receive(:build).and_return(logger)
end
describe '#migrate' do
it 'executes method' do
expect(logger).to receive(:info).with(/number_of_nodes/)
expect { migration.migrate }.not_to raise_error
end
it 'raises exception for original class' do
expect { bare_migration.migrate }.to raise_error(NotImplementedError)
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Elastic::MigrationRecord, :elastic do
let(:record) { described_class.new(version: Time.now.to_i, name: 'ExampleMigration', filename: nil) }
describe '#save!' do
it 'creates an index if it is not found' do
es_helper.delete_index(index_name: @migrations_index_name)
expect { record.save!(completed: true) }.to raise_error(/index is not found/)
end
end
describe '#persisted?' do
it 'changes on object save' do
expect { record.save!(completed: true) }.to change { record.persisted? }.from(false).to(true)
end
end
describe '.persisted_versions' do
let(:completed_versions) { 1.upto(5).map { |i| described_class.new(version: i, name: i, filename: nil) } }
let(:in_progress_migration) { described_class.new(version: 10, name: 10, filename: nil) }
before do
completed_versions.each { |migration| migration.save!(completed: true) }
in_progress_migration.save!(completed: false)
es_helper.refresh_index(index_name: @migrations_index_name)
end
it 'loads all records' do
expect(described_class.persisted_versions(completed: true)).to match_array(completed_versions.map(&:version))
expect(described_class.persisted_versions(completed: false)).to contain_exactly(in_progress_migration.version)
end
it 'returns empty array if no index present' do
es_helper.delete_index(index_name: @migrations_index_name)
expect(described_class.persisted_versions(completed: true)).to eq([])
expect(described_class.persisted_versions(completed: false)).to eq([])
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Elastic::DataMigrationService, :elastic do
subject { described_class }
describe '.migrations' do
it 'all migration names are unique' do
expect(subject.migrations.count).to eq(subject.migrations.map(&:name).uniq.count)
end
context 'migration_files stubbed' do
let(:migration_files) { %w(ee/elastic/migrate/20201105180000_example_migration.rb ee/elastic/migrate/20201201130000_example_migration.rb) }
before do
allow(subject).to receive(:migration_files).and_return(migration_files)
end
it 'creates migration records' do
migrations = subject.migrations
migration = migrations.first
expect(migrations.count).to eq(2)
expect(migration.version).to eq(20201105180000)
expect(migration.name).to eq('ExampleMigration')
expect(migration.filename).to eq(migration_files.first)
end
end
end
describe '.migration_has_finished?' do
let(:migration) { subject.migrations.first }
let(:migration_name) { migration.name.underscore }
it 'returns true if migration has finished' do
expect(subject.migration_has_finished?(migration_name)).to eq(false)
migration.save!(completed: false)
refresh_index!
expect(subject.migration_has_finished?(migration_name)).to eq(false)
migration.save!(completed: true)
refresh_index!
expect(subject.migration_has_finished?(migration_name)).to eq(true)
end
end
describe 'mark_all_as_completed!' do
it 'creates all migration versions' do
expect(Elastic::MigrationRecord.persisted_versions(completed: true).count).to eq(0)
subject.mark_all_as_completed!
refresh_index!
expect(Elastic::MigrationRecord.persisted_versions(completed: true).count).to eq(subject.migrations.count)
end
end
end
......@@ -5,11 +5,13 @@ RSpec.configure do |config|
Elastic::ProcessBookkeepingService.clear_tracking!
Gitlab::Elastic::Helper.default.delete_index
Gitlab::Elastic::Helper.default.create_empty_index(options: { settings: { number_of_replicas: 0 } })
@migrations_index_name = Gitlab::Elastic::Helper.default.create_migrations_index
end
config.after(:each, :elastic) do
Gitlab::Elastic::Helper.default.delete_index
Elastic::ProcessBookkeepingService.clear_tracking!
Gitlab::Elastic::Helper.default.delete_index(index_name: @migrations_index_name)
end
config.include ElasticsearchHelpers, :elastic
......
......@@ -35,6 +35,11 @@ module ElasticsearchHelpers
end
def refresh_index!
::Gitlab::Elastic::Helper.default.refresh_index
es_helper.refresh_index
es_helper.refresh_index(index_name: @migrations_index_name) # rubocop:disable Gitlab/ModuleWithInstanceVariables
end
def es_helper
Gitlab::Elastic::Helper.default
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Elastic::MigrationWorker, :elastic do
subject { described_class.new }
describe '#perform' do
context 'indexing is disabled' do
before do
stub_ee_application_setting(elasticsearch_indexing: false)
end
it 'returns without execution' do
expect(subject).not_to receive(:execute_migration)
expect(subject.perform).to be_falsey
end
end
context 'indexing is enabled' do
before do
stub_ee_application_setting(elasticsearch_indexing: true)
end
it 'creates an index if it does not exist' do
Gitlab::Elastic::Helper.default.delete_index(index_name: @migrations_index_name)
expect { subject.perform }.to change { Gitlab::Elastic::Helper.default.index_exists?(index_name: @migrations_index_name) }.from(false).to(true)
end
context 'no unexecuted migrations' do
before do
allow(subject).to receive(:current_migration).and_return(nil)
end
it 'skips execution' do
expect(subject).not_to receive(:execute_migration)
expect(subject.perform).to be_falsey
end
end
context 'migration process' do
let(:migration) { Elastic::DataMigrationService.migrations.first }
before do
allow(subject).to receive(:current_migration).and_return(migration)
allow(migration).to receive(:persisted?).and_return(persisted)
allow(migration).to receive(:completed?).and_return(completed)
end
using RSpec::Parameterized::TableSyntax
where(:persisted, :completed, :execute_migration) do
false | false | true
false | true | true
true | false | false
true | true | false
end
with_them do
it 'calls migration only when needed' do
if execute_migration
expect(migration).to receive(:migrate).once
else
expect(migration).not_to receive(:migrate)
end
expect(migration).to receive(:save!).with(completed: completed)
subject.perform
end
end
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment