Commit c1c99bcc authored by Dmitry Gruzd's avatar Dmitry Gruzd Committed by Dylan Griffith

Add elasticsearch_pause_indexing feature

This MR adds an ability to pause elasticsearch indexing
this is really useful for cluster reindexing and maintenance.
FF `elasticsearch_pause_indexing` is used to enable this functionality.
parent 402d9e56
......@@ -86,6 +86,8 @@
- 1
- - elastic_indexer
- 1
- - elastic_indexing_control
- 1
- - elastic_namespace_indexer
- 1
- - elastic_namespace_rollout
......
# frozen_string_literal: true
class AddElasticsearchPauseIndexingToApplicationSettings < ActiveRecord::Migration[6.0]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
disable_ddl_transaction!
def up
with_lock_retries do
add_column :application_settings, :elasticsearch_pause_indexing, :boolean, default: false, null: false
end
end
def down
remove_column :application_settings, :elasticsearch_pause_indexing
end
end
......@@ -443,6 +443,7 @@ CREATE TABLE public.application_settings (
container_registry_features text[] DEFAULT '{}'::text[] NOT NULL,
spam_check_endpoint_url text,
spam_check_endpoint_enabled boolean DEFAULT false NOT NULL,
elasticsearch_pause_indexing boolean DEFAULT false NOT NULL,
CONSTRAINT check_d03919528d CHECK ((char_length(container_registry_vendor) <= 255)),
CONSTRAINT check_d820146492 CHECK ((char_length(spam_check_endpoint_url) <= 255)),
CONSTRAINT check_e5aba18f02 CHECK ((char_length(container_registry_version) <= 255))
......@@ -13885,6 +13886,7 @@ COPY "schema_migrations" (version) FROM STDIN;
20200507221434
20200508050301
20200508091106
20200508140959
20200511080113
20200511083541
20200511092246
......
......@@ -142,6 +142,7 @@ The following Elasticsearch settings are available:
| Parameter | Description |
| ----------------------------------------------------- | ----------- |
| `Elasticsearch indexing` | Enables/disables Elasticsearch indexing. You may want to enable indexing but disable search in order to give the index time to be fully completed, for example. Also, keep in mind that this option doesn't have any impact on existing data, this only enables/disables background indexer which tracks data changes. So by enabling this you will not get your existing data indexed, use special Rake task for that as explained in [Adding GitLab's data to the Elasticsearch index](#adding-gitlabs-data-to-the-elasticsearch-index). |
| `Elasticsearch pause indexing` | Enables/disables temporary indexing pause. This is useful for cluster migration/reindexing. All changes are still tracked, but they are not committed to the Elasticsearch index until unpaused. |
| `Search with Elasticsearch enabled` | Enables/disables using Elasticsearch in search. |
| `URL` | The URL to use for connecting to Elasticsearch. Use a comma-separated list to support clustering (e.g., `http://host1, https://host2:9200`). If your Elasticsearch instance is password protected, pass the `username:password` in the URL (e.g., `http://<username>:<password>@<elastic_host>:9200/`). |
| `Number of Elasticsearch shards` | Elasticsearch indexes are split into multiple shards for performance reasons. In general, larger indexes need to have more shards. Changes to this value do not take effect until the index is recreated. You can read more about tradeoffs in the [Elasticsearch documentation](https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html#create-index-settings) |
......
......@@ -26,6 +26,7 @@ module EE
:elasticsearch_aws_region,
:elasticsearch_aws_secret_access_key,
:elasticsearch_indexing,
:elasticsearch_pause_indexing,
:elasticsearch_max_bulk_concurrency,
:elasticsearch_max_bulk_size_mb,
:elasticsearch_replicas,
......
......@@ -85,6 +85,7 @@ module EE
numericality: { only_integer: true, greater_than: 0, less_than_or_equal_to: 365 }
after_commit :update_personal_access_tokens_lifetime, if: :saved_change_to_max_personal_access_token_lifetime?
after_commit :resume_elasticsearch_indexing
end
class_methods do
......@@ -203,6 +204,13 @@ module EE
end
alias_method :elasticsearch_indexing?, :elasticsearch_indexing
def elasticsearch_pause_indexing
return false unless elasticsearch_pause_indexing_column_exists?
super
end
alias_method :elasticsearch_pause_indexing?, :elasticsearch_pause_indexing
def elasticsearch_search
return false unless elasticsearch_search_column_exists?
......@@ -310,6 +318,12 @@ module EE
end
end
def resume_elasticsearch_indexing
return false unless saved_changes['elasticsearch_pause_indexing'] == [true, false]
ElasticIndexingControlWorker.perform_async
end
def update_personal_access_tokens_lifetime
return unless max_personal_access_token_lifetime.present? && License.feature_available?(:personal_access_token_expiration_policy)
......@@ -332,6 +346,10 @@ module EE
::Gitlab::Database.cached_column_exists?(:application_settings, :elasticsearch_indexing)
end
def elasticsearch_pause_indexing_column_exists?
::Gitlab::Database.cached_column_exists?(:application_settings, :elasticsearch_pause_indexing)
end
def elasticsearch_search_column_exists?
::Gitlab::Database.cached_column_exists?(:application_settings, :elasticsearch_search)
end
......
# frozen_string_literal: true
module Elastic
# Class for managing queues for indexing workers
# When indexing is paused all jobs are saved in a separate sorted sets in redis
# This class should only be used with sidekiq workers which extend Elastic::IndexingControl module
class IndexingControlService
LIMIT = 1000
def initialize(klass)
raise ArgumentError, "passed class must extend Elastic::IndexingControl" unless klass.include?(Elastic::IndexingControl)
@klass = klass
@queue_name = klass.name.underscore
@redis_set_key = "elastic:paused_jobs:zset:#{queue_name}"
@redis_score_key = "elastic:paused_jobs:score:#{queue_name}"
end
class << self
def add_to_waiting_queue!(klass, args, context)
new(klass).add_to_waiting_queue!(args, context)
end
def has_jobs_in_waiting_queue?(klass)
new(klass).has_jobs_in_waiting_queue?
end
def resume_processing!(klass)
new(klass).resume_processing!
end
end
def add_to_waiting_queue!(args, context)
with_redis do |redis|
redis.zadd(redis_set_key, generate_unique_score(redis), serialize(args, context))
end
end
def queue_size
with_redis { |redis| redis.zcard(redis_set_key) }
end
def has_jobs_in_waiting_queue?
with_redis { |redis| redis.exists(redis_set_key) }
end
def resume_processing!
with_redis do |redis|
loop do
break if Elastic::IndexingControl.non_cached_pause_indexing?
jobs_with_scores = next_batch_from_waiting_queue(redis)
break if jobs_with_scores.empty?
parsed_jobs = jobs_with_scores.map { |j, _| deserialize(j) }
parsed_jobs.each { |j| send_to_processing_queue(j) }
remove_jobs_from_waiting_queue(redis, jobs_with_scores)
end
redis.del(redis_set_key, redis_score_key) if queue_size.zero?
end
end
private
attr_reader :klass, :queue_name, :redis_set_key, :redis_score_key
def with_redis(&blk)
Gitlab::Redis::SharedState.with(&blk) # rubocop:disable CodeReuse/ActiveRecord
end
def serialize(args, context)
{
args: args,
context: context
}.to_json
end
def deserialize(json)
Gitlab::Json.parse(json)
end
def send_to_processing_queue(job)
Labkit::Context.with_context(job['context']) do
klass.perform_async(*job['args'])
end
end
def generate_unique_score(redis)
redis.incr(redis_score_key)
end
def next_batch_from_waiting_queue(redis)
redis.zrangebyscore(redis_set_key, '-inf', '+inf', limit: [0, LIMIT], with_scores: true)
end
def remove_jobs_from_waiting_queue(redis, jobs_with_scores)
first_score = jobs_with_scores.first.last
last_score = jobs_with_scores.last.last
redis.zremrangebyscore(redis_set_key, first_score, last_score)
end
end
end
......@@ -33,6 +33,15 @@
= link_to _('Index all projects'), admin_elasticsearch_enqueue_index_path,
class: ['btn', 'btn-success'], method: :post
.card-body
.form-group
.form-check
= f.check_box :elasticsearch_pause_indexing, class: 'form-check-input', data: { qa_selector: 'pause_checkbox' }, disabled: !Gitlab::CurrentSettings.elasticsearch_indexing?
= f.label :elasticsearch_pause_indexing, class: 'form-check-label' do
Pause Elasticsearch indexing
.form-text.text-muted
= _('Changes are still tracked. Useful for cluster/index migrations.')
.form-group
.form-check
= f.check_box :elasticsearch_search, class: 'form-check-input', data: { qa_selector: 'search_checkbox' }
......
......@@ -500,6 +500,13 @@
:resource_boundary: :unknown
:weight: 1
:idempotent:
- :name: elastic_indexing_control
:feature_category: :global_search
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
- :name: elastic_namespace_indexer
:feature_category: :global_search
:has_external_dependencies:
......
# frozen_string_literal: true
# Concern for pausing/unpausing elasticsearch indexing workers
module Elastic
module IndexingControl
WORKERS = [ElasticCommitIndexerWorker, ElasticIndexerWorker].freeze
def perform(*args)
if Elastic::IndexingControl.non_cached_pause_indexing? && WORKERS.include?(self.class)
logger.info(
message: 'elasticsearch_pause_indexing setting is enabled. Job was added to the waiting queue',
worker_class: self.class.name,
args: args
)
Elastic::IndexingControlService.add_to_waiting_queue!(self.class, args, current_context)
else
super
end
end
class << self
def non_cached_pause_indexing?
ApplicationSetting.where(elasticsearch_pause_indexing: true).exists? # rubocop: disable CodeReuse/ActiveRecord
end
def resume_processing!
return false if non_cached_pause_indexing?
WORKERS.each do |worker_class|
resume_processing_for(worker_class)
end
true
end
def resume_processing_for(klass)
return unless Elastic::IndexingControlService.has_jobs_in_waiting_queue?(klass)
Elastic::IndexingControlService.resume_processing!(klass)
end
def logger
::Gitlab::Elasticsearch::Logger.build
end
end
private
def logger
Elastic::IndexingControl.logger
end
def current_context
Labkit::Context.current.to_h
end
end
end
......@@ -2,6 +2,7 @@
class ElasticCommitIndexerWorker
include ApplicationWorker
prepend Elastic::IndexingControl
feature_category :global_search
sidekiq_options retry: 2
......
......@@ -13,6 +13,11 @@ class ElasticIndexBulkCronWorker
urgency :throttled
def perform
if Elastic::IndexingControl.non_cached_pause_indexing?
logger.info(message: 'elasticsearch_pause_indexing setting is enabled. ElasticBulkCronWorker execution is skipped.')
return false
end
in_lock(self.class.name.underscore, ttl: 10.minutes, retries: 10, sleep_sec: 1) do
records_count = Elastic::ProcessBookkeepingService.new.execute
log_extra_metadata_on_done(:records_count, records_count)
......@@ -20,4 +25,10 @@ class ElasticIndexBulkCronWorker
rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError
# We're scheduled on a cronjob, so nothing to do here
end
private
def logger
Elastic::IndexingControl.logger
end
end
......@@ -3,6 +3,7 @@
class ElasticIndexerWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
include Elasticsearch::Model::Client::ClassMethods
prepend Elastic::IndexingControl
sidekiq_options retry: 2
feature_category :global_search
......
# frozen_string_literal: true
class ElasticIndexingControlWorker
include ApplicationWorker
feature_category :global_search
idempotent!
def perform
if Elastic::IndexingControl.non_cached_pause_indexing?
raise 'elasticsearch_pause_indexing is enabled, worker can not proceed'
end
Elastic::IndexingControl.resume_processing!
end
end
---
title: Add ability to pause and resume Elasticsearch indexing
merge_request: 30621
author:
type: added
......@@ -20,6 +20,7 @@ module EE
given elasticsearch_indexing: ->(val) { val } do
optional :elasticsearch_search, type: Grape::API::Boolean, desc: 'Enable Elasticsearch search'
optional :elasticsearch_pause_indexing, type: Grape::API::Boolean, desc: 'Pause Elasticsearch indexing'
requires :elasticsearch_url, type: String, desc: 'The url to use for connecting to Elasticsearch. Use a comma-separated list to support clustering (e.g., "http://localhost:9200, http://localhost:9201")'
optional :elasticsearch_limit_indexing, type: Grape::API::Boolean, desc: 'Limit Elasticsearch to index certain namespaces and projects'
end
......
......@@ -214,6 +214,20 @@ describe ApplicationSetting do
end
end
describe '#elasticsearch_pause_indexing' do
before do
setting.elasticsearch_pause_indexing = true
end
it 'resumes indexing' do
expect(ElasticIndexingControlWorker).to receive(:perform_async)
setting.save!
setting.elasticsearch_pause_indexing = false
setting.save!
end
end
describe '#elasticsearch_url' do
it 'presents a single URL as a one-element array' do
setting.elasticsearch_url = 'http://example.com'
......
# frozen_string_literal: true
require 'spec_helper'
describe Elastic::IndexingControlService, :clean_gitlab_redis_shared_state do
let(:worker_class) do
Class.new do
def self.name
'DummyIndexingWorker'
end
include ApplicationWorker
prepend Elastic::IndexingControl
end
end
let(:worker_args) { [1, 2] }
let(:worker_context) { { 'correlation_id' => 'context_correlation_id' } }
subject { described_class.new(worker_class) }
describe '.initialize' do
it 'raises an exception when passed wrong worker' do
expect { described_class.new(Class.new) }.to raise_error(ArgumentError)
end
end
describe '.add_to_waiting_queue!' do
it 'calls an instance method' do
expect_next_instance_of(described_class) do |instance|
expect(instance).to receive(:add_to_waiting_queue!).with(worker_args, worker_context)
end
described_class.add_to_waiting_queue!(worker_class, worker_args, worker_context)
end
end
describe '.has_jobs_in_waiting_queue?' do
it 'calls an instance method' do
expect_next_instance_of(described_class) do |instance|
expect(instance).to receive(:has_jobs_in_waiting_queue?)
end
described_class.has_jobs_in_waiting_queue?(worker_class)
end
end
describe '.resume_processing!' do
it 'calls an instance method' do
expect_next_instance_of(described_class) do |instance|
expect(instance).to receive(:resume_processing!)
end
described_class.resume_processing!(worker_class)
end
end
describe '#add_to_waiting_queue!' do
it 'adds a job to the set' do
expect { subject.add_to_waiting_queue!(worker_args, worker_context) }.to change { subject.queue_size }.from(0).to(1)
end
it 'adds only one unique job to the set' do
expect do
2.times { subject.add_to_waiting_queue!(worker_args, worker_context) }
end.to change { subject.queue_size }.from(0).to(1)
end
end
describe '#has_jobs_in_waiting_queue?' do
it 'checks set existence' do
expect { subject.add_to_waiting_queue!(worker_args, worker_context) }.to change { subject.has_jobs_in_waiting_queue? }.from(false).to(true)
end
end
describe '#resume_processing!' do
before do
allow(Elastic::IndexingControl).to receive(:non_cached_pause_indexing?).and_return(false)
end
let(:jobs) { [[1], [2], [3]] }
it 'puts jobs back into the queue and respects order' do
# We stub this const to test at least a couple of loop iterations
stub_const("Elastic::IndexingControlService::LIMIT", 2)
jobs.each do |j|
subject.add_to_waiting_queue!(j, worker_context)
end
expect(worker_class).to receive(:perform_async).with(1).ordered
expect(worker_class).to receive(:perform_async).with(2).ordered
expect(worker_class).to receive(:perform_async).with(3).ordered
subject.resume_processing!
end
it 'drops a set after execution' do
jobs.each do |j|
subject.add_to_waiting_queue!(j, worker_context)
end
expect(Labkit::Context).to receive(:with_context).with(worker_context).exactly(jobs.count).times.and_call_original
expect(worker_class).to receive(:perform_async).exactly(jobs.count).times
expect { subject.resume_processing! }.to change { subject.has_jobs_in_waiting_queue? }.from(true).to(false)
end
end
context 'concurrent changes to different queues' do
let(:second_worker_class) do
Class.new do
def self.name
'SecondDummyIndexingWorker'
end
include ApplicationWorker
prepend Elastic::IndexingControl
end
end
let(:other_subject) { described_class.new(second_worker_class) }
it 'allows to use queues independently of each other' do
expect { subject.add_to_waiting_queue!(worker_args, worker_context) }.to change { subject.queue_size }.from(0).to(1)
expect { other_subject.add_to_waiting_queue!(worker_args, worker_context) }.to change { other_subject.queue_size }.from(0).to(1)
expect { subject.resume_processing! }.to change { subject.has_jobs_in_waiting_queue? }.from(true).to(false)
expect { other_subject.resume_processing! }.to change { other_subject.has_jobs_in_waiting_queue? }.from(true).to(false)
end
end
end
......@@ -18,6 +18,7 @@ describe 'admin/application_settings/_elasticsearch_form' do
before do
allow(Gitlab::CurrentSettings).to(receive(:elasticsearch_indexing?)).and_return(es_indexing)
allow(Gitlab::CurrentSettings).to(receive(:elasticsearch_pause_indexing?)).and_return(true)
end
context 'indexing is enabled' do
......@@ -28,6 +29,13 @@ describe 'admin/application_settings/_elasticsearch_form' do
expect(rendered).to have_css('a.btn-success', text: button_text)
end
it 'renders an enabled pause checkbox' do
render
expect(rendered).to have_css('input[id=application_setting_elasticsearch_pause_indexing]')
expect(rendered).not_to have_css('input[id=application_setting_elasticsearch_pause_indexing][disabled="disabled"]')
end
end
context 'indexing is disabled' do
......@@ -38,6 +46,12 @@ describe 'admin/application_settings/_elasticsearch_form' do
expect(rendered).not_to have_css('a.btn-success', text: button_text)
end
it 'renders a disabled pause checkbox' do
render
expect(rendered).to have_css('input[id=application_setting_elasticsearch_pause_indexing][disabled="disabled"]')
end
end
end
......
# frozen_string_literal: true
require 'spec_helper'
describe Elastic::IndexingControl do
let!(:project) { create(:project, :repository) }
let(:worker) do
Class.new do
def perform(project_id)
project = Project.find(project_id)
Gitlab::Elastic::Indexer.new(project).run
end
def self.name
'DummyIndexingWorker'
end
include ApplicationWorker
prepend Elastic::IndexingControl
end.new
end
let(:worker_args) { [project.id] }
let(:worker_context) { { 'correlation_id' => 'context_correlation_id' } }
before do
stub_const("Elastic::IndexingControl::WORKERS", [worker.class])
end
describe '.non_cached_pause_indexing?' do
it 'calls current_without_cache' do
expect(ApplicationSetting).to receive(:where).with(elasticsearch_pause_indexing: true).and_return(ApplicationSetting.none)
expect(described_class.non_cached_pause_indexing?).to be_falsey
end
end
describe '.resume_processing!' do
before do
expect(Elastic::IndexingControl).to receive(:non_cached_pause_indexing?).and_return(false)
end
it 'triggers job processing if there are jobs' do
expect(Elastic::IndexingControlService).to receive(:has_jobs_in_waiting_queue?).with(worker.class).and_return(true)
expect(Elastic::IndexingControlService).to receive(:resume_processing!).with(worker.class)
described_class.resume_processing!
end
it 'does nothing if no jobs available' do
expect(Elastic::IndexingControlService).to receive(:has_jobs_in_waiting_queue?).with(worker.class).and_return(false)
expect(Elastic::IndexingControlService).not_to receive(:resume_processing!)
described_class.resume_processing!
end
end
context 'with elasticsearch indexing paused' do
before do
expect(Elastic::IndexingControl).to receive(:non_cached_pause_indexing?).and_return(true)
end
it 'adds jobs to the waiting queue' do
expect_any_instance_of(Gitlab::Elastic::Indexer).not_to receive(:run)
expect(Elastic::IndexingControlService).to receive(:add_to_waiting_queue!).with(worker.class, worker_args, worker_context)
Labkit::Context.with_context(worker_context) do
worker.perform(*worker_args)
end
end
it 'ignores changes from a different worker' do
stub_const("Elastic::IndexingControl::WORKERS", [])
expect_any_instance_of(Gitlab::Elastic::Indexer).to receive(:run)
expect(Elastic::IndexingControlService).not_to receive(:add_to_waiting_queue!)
worker.perform(*worker_args)
end
end
context 'with elasticsearch indexing unpaused' do
before do
expect(Elastic::IndexingControl).to receive(:non_cached_pause_indexing?).and_return(false)
end
it 'performs the job' do
expect_next_instance_of(Gitlab::Elastic::Indexer) do |indexer|
expect(indexer).to receive(:run)
end
expect(Elastic::IndexingControlService).not_to receive(:track!)
worker.perform(*worker_args)
end
end
end
......@@ -5,14 +5,32 @@ require 'spec_helper'
describe ElasticIndexBulkCronWorker do
include ExclusiveLeaseHelpers
describe '.perform' do
it 'executes the service under an exclusive lease' do
expect_to_obtain_exclusive_lease('elastic_index_bulk_cron_worker')
context 'indexing is not paused' do
before do
expect(Elastic::IndexingControl).to receive(:non_cached_pause_indexing?).and_return(false)
end
expect_next_instance_of(::Elastic::ProcessBookkeepingService) do |service|
expect(service).to receive(:execute)
it 'executes the service under an exclusive lease' do
expect_to_obtain_exclusive_lease('elastic_index_bulk_cron_worker')
expect_next_instance_of(::Elastic::ProcessBookkeepingService) do |service|
expect(service).to receive(:execute)
end
described_class.new.perform
end
end
described_class.new.perform
context 'indexing is paused' do
before do
expect(Elastic::IndexingControl).to receive(:non_cached_pause_indexing?).and_return(true)
end
it 'does nothing if indexing is paused' do
expect(::Elastic::ProcessBookkeepingService).not_to receive(:new)
expect(described_class.new.perform).to eq(false)
end
end
it 'adds the elastic_bulk_count to the done log' do
......
# frozen_string_literal: true
require 'spec_helper'
describe ElasticIndexingControlWorker do
subject { described_class.new }
describe '#perform' do
context 'indexing is unpaused' do
before do
expect(Elastic::IndexingControl).to receive(:non_cached_pause_indexing?).and_return(false)
end
it 'calls resume_processing!' do
expect(Elastic::IndexingControl).to receive(:resume_processing!)
subject.perform
end
end
context 'indexing is paused' do
before do
expect(Elastic::IndexingControl).to receive(:non_cached_pause_indexing?).and_return(true)
end
it 'raises an exception' do
expect { subject.perform }.to raise_error(RuntimeError, /elasticsearch_pause_indexing/)
end
end
end
end
......@@ -3849,6 +3849,9 @@ msgstr ""
msgid "Changes are shown as if the <b>source</b> revision was being merged into the <b>target</b> revision."
msgstr ""
msgid "Changes are still tracked. Useful for cluster/index migrations."
msgstr ""
msgid "Changes are unknown"
msgstr ""
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment