Commit a65928cf authored by Nick Thomas's avatar Nick Thomas

Add a bulk processor for ES incremental updates

Currently, we store bookkeeping information for the elasticsearch index
in sidekiq jobs. There are four types of information:

* Backfill indexing for repositories
* Backfill indexing for database records
* Incremental indexing for repositories
* Incremental indexing for database records

The first three use elasticsearch bulk requests when indexing. The last
does not.

This commit introduces a system that uses bulk requests when indexing
incremental changes to database records. This is done by adding the
bookkeeping information to a Redis ZSET, rather than enqueuing sidekiq
jobs for each change. A Sidekiq cron worker takes batches from the ZSET
and submits them to elasticsearch via the bulk API.

This reduces the responsiveness of indexing slightly, but also reduces
the cost of indexing, both in terms of the load on Elasticsearch, and
the size of the bookkeeping information.

Since we're using a ZSET, we also get deduplication of work for free.
parent a26c6fae
---
title: 'Add a bulk processor for elasticsearch incremental updates'
merge_request: 24298
author:
type: added
......@@ -454,6 +454,11 @@ production: &base
pseudonymizer_worker:
cron: "0 * * * *"
# Elasticsearch bulk updater for incremental updates.
# NOTE: This will only take effect if elasticsearch is enabled.
elastic_index_bulk_cron_worker:
cron: "*/1 * * * *"
registry:
# enabled: true
# host: registry.example.com
......
......@@ -531,6 +531,9 @@ Gitlab.ee do
Settings.cron_jobs['update_max_seats_used_for_gitlab_com_subscriptions_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['update_max_seats_used_for_gitlab_com_subscriptions_worker']['cron'] ||= '0 12 * * *'
Settings.cron_jobs['update_max_seats_used_for_gitlab_com_subscriptions_worker']['job_class'] = 'UpdateMaxSeatsUsedForGitlabComSubscriptionsWorker'
Settings.cron_jobs['elastic_index_bulk_cron_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['elastic_index_bulk_cron_worker']['cron'] ||= '*/1 * * * *'
Settings.cron_jobs['elastic_index_bulk_cron_worker']['job_class'] ||= 'ElasticIndexBulkCronWorker'
end
#
......
......@@ -36,7 +36,11 @@ Additionally, if you need large repos or multiple forks for testing, please cons
The Elasticsearch integration depends on an external indexer. We ship an [indexer written in Go](https://gitlab.com/gitlab-org/gitlab-elasticsearch-indexer). The user must trigger the initial indexing via a rake task but, after this is done, GitLab itself will trigger reindexing when required via `after_` callbacks on create, update, and destroy that are inherited from [/ee/app/models/concerns/elastic/application_versioned_search.rb](https://gitlab.com/gitlab-org/gitlab/blob/master/ee/app/models/concerns/elastic/application_versioned_search.rb).
All indexing after the initial one is done via `ElasticIndexerWorker` (Sidekiq jobs).
After initial indexing is complete, updates proceed in one of two ways, depending on the `:elastic_bulk_incremental_updates` feature flag.
If disabled, every create, update, or delete operation on an Elasticsearch-tracked model enqueues a new `ElasticIndexerWorker` Sidekiq job which takes care of updating just that document. This is quite inefficient.
If the feature flag is enabled, create, update, and delete operations for all models except projects (see [#207494](https://gitlab.com/gitlab-org/gitlab/issues/207494)) are tracked in a Redis [`ZSET`](https://redis.io/topics/data-types#sorted-sets) instead. A regular `sidekiq-cron` `ElasticIndexBulkCronWorker` processes this queue, updating many Elasticsearch documents at a time with the [Bulk Request API](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html).
Search queries are generated by the concerns found in [ee/app/models/concerns/elastic](https://gitlab.com/gitlab-org/gitlab/tree/master/ee/app/models/concerns/elastic). These concerns are also in charge of access control, and have been a historic source of security bugs so please pay close attention to them!
......
......@@ -45,10 +45,14 @@ module Elastic
end
def maintain_elasticsearch_create
return if maintain_elasticsearch_incremental_bulk
ElasticIndexerWorker.perform_async(:index, self.class.to_s, self.id, self.es_id)
end
def maintain_elasticsearch_update
return if maintain_elasticsearch_incremental_bulk
ElasticIndexerWorker.perform_async(
:update,
self.class.to_s,
......@@ -58,11 +62,21 @@ module Elastic
end
def maintain_elasticsearch_destroy
return if maintain_elasticsearch_incremental_bulk
ElasticIndexerWorker.perform_async(
:delete, self.class.to_s, self.id, self.es_id, es_parent: self.es_parent
)
end
def maintain_elasticsearch_incremental_bulk
return false unless Feature.enabled?(:elastic_bulk_incremental_updates, self.project)
::Elastic::ProcessBookkeepingService.track!(self)
true
end
class_methods do
def __elasticsearch__
@__elasticsearch__ ||= ::Elastic::MultiVersionClassProxy.new(self)
......
......@@ -19,6 +19,14 @@ module Elastic
::Gitlab::CurrentSettings.elasticsearch_indexes_project?(self)
end
def maintain_elasticsearch_incremental_bulk
# TODO: ElasticIndexerWorker does extra work for project hooks, so we
# can't use the incremental-bulk indexer for projects yet.
#
# https://gitlab.com/gitlab-org/gitlab/issues/207494
false
end
def each_indexed_association
INDEXED_ASSOCIATIONS.each do |association_name|
association = self.association(association_name)
......
# frozen_string_literal: true
module Elastic
class ProcessBookkeepingService
REDIS_SET_KEY = 'elastic:incremental:updates:0:zset'
REDIS_SCORE_KEY = 'elastic:incremental:updates:0:score'
LIMIT = 1000
class << self
# Add some records to the processing queue. Items must be serializable to
# a Gitlab::Elastic::DocumentReference
def track!(*items)
return true if items.empty?
items.map! { |item| ::Gitlab::Elastic::DocumentReference.serialize(item) }
with_redis do |redis|
# Efficiently generate a guaranteed-unique score for each item
max = redis.incrby(REDIS_SCORE_KEY, items.size)
min = (max - items.size) + 1
(min..max).zip(items).each_slice(1000) do |group|
logger.debug(message: 'track_items', count: group.count, items: group)
redis.zadd(REDIS_SET_KEY, group)
end
end
true
end
def queue_size
with_redis { |redis| redis.zcard(REDIS_SET_KEY) }
end
def clear_tracking!
with_redis { |redis| redis.del(REDIS_SET_KEY, REDIS_SCORE_KEY) }
end
def logger
# build already caches the logger via request store
::Gitlab::Elasticsearch::Logger.build
end
def with_redis(&blk)
Gitlab::Redis::SharedState.with(&blk) # rubocop:disable CodeReuse/ActiveRecord
end
end
def execute
self.class.with_redis { |redis| execute_with_redis(redis) }
end
private
def execute_with_redis(redis)
specs = redis.zrangebyscore(REDIS_SET_KEY, '-inf', '+inf', limit: [0, LIMIT], with_scores: true)
return if specs.empty?
first_score = specs.first.last
last_score = specs.last.last
logger.info(
message: 'bulk_indexing_start',
records_count: specs.count,
first_score: first_score,
last_score: last_score
)
specs.each { |spec, _| submit_document(spec) }
failures = bulk_indexer.flush
# Re-enqueue any failures so they are retried
self.class.track!(*failures) if failures.present?
# Remove all the successes
redis.zremrangebyscore(REDIS_SET_KEY, first_score, last_score)
logger.info(
message: 'bulk_indexing_end',
records_count: specs.count,
failures_count: failures.count,
first_score: first_score,
last_score: last_score
)
end
def submit_document(spec)
ref = ::Gitlab::Elastic::DocumentReference.deserialize(spec)
bulk_indexer.process(ref)
rescue ::Gitlab::Elastic::DocumentReference::InvalidError => err
logger.warn(
message: 'submit_document_failed',
reference: spec,
error_class: err.class.to_s,
error_message: err.message
)
end
def bulk_indexer
@bulk_indexer ||= ::Gitlab::Elastic::BulkIndexer.new(logger: logger)
end
def logger
self.class.logger
end
end
end
......@@ -24,6 +24,13 @@
:resource_boundary: :unknown
:weight: 1
:idempotent:
- :name: cronjob:elastic_index_bulk_cron
:feature_category: :search
:has_external_dependencies:
:latency_sensitive:
:resource_boundary: :unknown
:weight: 1
:idempotent: true
- :name: cronjob:geo_container_repository_sync_dispatch
:feature_category: :geo_replication
:has_external_dependencies:
......
# frozen_string_literal: true
class ElasticIndexBulkCronWorker
include ApplicationWorker
include Gitlab::ExclusiveLeaseHelpers
# There is no onward scheduling and this cron handles work from across the
# application, so there's no useful context to add.
include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
feature_category :search
idempotent!
def perform
in_lock(self.class.name.underscore, ttl: 10.minutes, retries: 10, sleep_sec: 1) do
Elastic::ProcessBookkeepingService.new.execute
end
rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError
# We're scheduled on a cronjob, so nothing to do here
end
end
---
title: Add a bulk processor for ES incremental updates
merge_request:
author:
type: added
# frozen_string_literal: true
module Gitlab
module Elastic
# Accumulate records and submit to elasticsearch in bulk, respecting limits
# on request size.
#
# Call +process+ to accumulate records in memory, submitting bulk requests
# when the bulk limits are reached.
#
# Once finished, call +flush+. Any errors accumulated earlier will be
# reported by this call.
#
# BulkIndexer is not safe for concurrent use.
class BulkIndexer
include ::Elasticsearch::Model::Client::ClassMethods
attr_reader :logger, :failures
def initialize(logger:)
@body = []
@body_size_bytes = 0
@failures = []
@logger = logger
@ref_cache = []
end
# Adds or removes a document in elasticsearch, depending on whether the
# database record it refers to can be found
def process(ref)
ref_cache << ref
if ref.database_record
index(ref)
else
delete(ref)
end
end
def flush
maybe_send_bulk(force: true).failures
end
private
def reset!
@body = []
@body_size_bytes = 0
@ref_cache = []
end
attr_reader :body, :body_size_bytes, :ref_cache
def index(ref)
proxy = ref.database_record.__elasticsearch__
op = build_op(ref, proxy)
submit({ index: op }, proxy.as_indexed_json)
maybe_send_bulk
end
def delete(ref)
proxy = ref.klass.__elasticsearch__
op = build_op(ref, proxy)
submit(delete: op)
maybe_send_bulk
end
def build_op(ref, proxy)
op = {
_index: proxy.index_name,
_type: proxy.document_type,
_id: ref.es_id
}
op[:_routing] = ref.es_parent if ref.es_parent # blank for projects
op
end
def bulk_limit_bytes
Gitlab::CurrentSettings.elasticsearch_max_bulk_size_mb.megabytes
end
def submit(*hashes)
hashes.each do |hash|
text = hash.to_json
body.push(text)
@body_size_bytes += text.bytesize + 2 # Account for newlines
end
end
def maybe_send_bulk(force: false)
return self if body.empty?
return self if body_size_bytes < bulk_limit_bytes && !force
failed_refs = try_send_bulk
logger.info(
message: 'bulk_submitted',
body_size_bytes: body_size_bytes,
bulk_count: ref_cache.count,
errors_count: failed_refs.count
)
failures.push(*failed_refs)
reset!
self
end
def try_send_bulk
process_errors(client.bulk(body: body))
rescue => err
# If an exception is raised, treat the entire bulk as failed
logger.error(message: 'bulk_exception', error_class: err.class.to_s, error_message: err.message)
ref_cache
end
def process_errors(result)
return [] unless result['errors']
out = []
# Items in the response have the same order as items in the request.
#
# Example succces: {"index": {"result": "created", "status": 201}}
# Example failure: {"index": {"error": {...}, "status": 400}}
result['items'].each_with_index do |item, i|
op = item['index'] || item['delete']
if op.nil? || op['error']
logger.warn(message: 'bulk_error', item: item)
out << ref_cache[i]
end
end
out
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Elastic
# Tracks some essential information needed to tie database and elasticsearch
# records together, and to delete ES documents when the database object no
# longer exists.
#
# A custom serialisation format suitable for Redis is included.
class DocumentReference
include Gitlab::Utils::StrongMemoize
InvalidError = Class.new(StandardError)
class << self
def build(instance)
new(instance.class, instance.id, instance.es_id, instance.es_parent)
end
def serialize(anything)
case anything
when String
anything
when Gitlab::Elastic::DocumentReference
anything.serialize
when ApplicationRecord
serialize_record(anything)
when Array
serialize_array(anything)
else
raise InvalidError.new("Don't know how to serialize #{anything.class}")
end
end
def serialize_record(record)
serialize_array([record.class.to_s, record.id, record.es_id, record.es_parent].compact)
end
def serialize_array(array)
test_array!(array)
array.join(' ')
end
def deserialize(string)
deserialize_array(string.split(' '))
end
def deserialize_array(array)
test_array!(array)
new(*array)
end
private
def test_array!(array)
raise InvalidError.new("Bad array representation: #{array.inspect}") unless
(3..4).cover?(array.size)
end
end
attr_reader :klass, :db_id, :es_id
# This attribute is nil for some records, e.g., projects
attr_reader :es_parent
def initialize(klass_or_name, db_id, es_id, es_parent = nil)
@klass = klass_or_name
@klass = klass_or_name.constantize if @klass.is_a?(String)
@db_id = db_id
@es_id = es_id
@es_parent = es_parent
end
def ==(other)
other.instance_of?(self.class) &&
self.serialize == other.serialize
end
def klass_name
klass.to_s
end
# TODO: return a promise for batch loading: https://gitlab.com/gitlab-org/gitlab/issues/207280
def database_record
strong_memoize(:database_record) { klass.find_by_id(db_id) }
end
def serialize
self.class.serialize_array([klass_name, db_id, es_id, es_parent].compact)
end
end
end
end
......@@ -16,8 +16,9 @@ describe 'Project elastic search', :js, :elastic do
end
describe 'searching' do
it 'finds issues', :sidekiq_might_not_need_inline do
it 'finds issues', :sidekiq_inline do
create(:issue, project: project, title: 'Test searching for an issue')
ensure_elasticsearch_index!
submit_search('Test')
select_search_scope('Issues')
......@@ -25,8 +26,9 @@ describe 'Project elastic search', :js, :elastic do
expect(page).to have_selector('.results', text: 'Test searching for an issue')
end
it 'finds merge requests', :sidekiq_might_not_need_inline do
it 'finds merge requests', :sidekiq_inline do
create(:merge_request, source_project: project, target_project: project, title: 'Test searching for an MR')
ensure_elasticsearch_index!
submit_search('Test')
select_search_scope('Merge requests')
......@@ -34,8 +36,9 @@ describe 'Project elastic search', :js, :elastic do
expect(page).to have_selector('.results', text: 'Test searching for an MR')
end
it 'finds milestones', :sidekiq_might_not_need_inline do
it 'finds milestones', :sidekiq_inline do
create(:milestone, project: project, title: 'Test searching for a milestone')
ensure_elasticsearch_index!
submit_search('Test')
select_search_scope('Milestones')
......@@ -43,7 +46,7 @@ describe 'Project elastic search', :js, :elastic do
expect(page).to have_selector('.results', text: 'Test searching for a milestone')
end
it 'finds wiki pages', :sidekiq_might_not_need_inline do
it 'finds wiki pages', :sidekiq_inline do
project.wiki.create_page('test.md', 'Test searching for a wiki page')
project.wiki.index_wiki_blobs
......@@ -53,8 +56,9 @@ describe 'Project elastic search', :js, :elastic do
expect(page).to have_selector('.results', text: 'Test searching for a wiki page')
end
it 'finds notes', :sidekiq_might_not_need_inline do
it 'finds notes', :sidekiq_inline do
create(:note, project: project, note: 'Test searching for a comment')
ensure_elasticsearch_index!
submit_search('Test')
select_search_scope('Comments')
......@@ -62,7 +66,7 @@ describe 'Project elastic search', :js, :elastic do
expect(page).to have_selector('.results', text: 'Test searching for a comment')
end
it 'finds commits', :sidekiq_might_not_need_inline do
it 'finds commits', :sidekiq_inline do
project.repository.index_commits_and_blobs
submit_search('initial')
......@@ -71,7 +75,7 @@ describe 'Project elastic search', :js, :elastic do
expect(page).to have_selector('.results', text: 'Initial commit')
end
it 'finds blobs', :sidekiq_might_not_need_inline do
it 'finds blobs', :sidekiq_inline do
project.repository.index_commits_and_blobs
submit_search('def')
......
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::Elastic::BulkIndexer, :elastic do
let_it_be(:issue) { create(:issue) }
let_it_be(:other_issue) { create(:issue, project: issue.project) }
let(:project) { issue.project }
let(:logger) { ::Gitlab::Elasticsearch::Logger.build }
subject(:indexer) { described_class.new(logger: logger) }
let(:es_client) { indexer.client }
let(:issue_as_ref) { ref(issue) }
let(:issue_as_json_with_times) { issue.__elasticsearch__.as_indexed_json }
let(:issue_as_json) { issue_as_json_with_times.except('created_at', 'updated_at') }
let(:other_issue_as_ref) { ref(other_issue) }
describe '#process' do
it 'returns self' do
expect(indexer.process(issue_as_ref)).to be(indexer)
end
it 'does not send a bulk request per call' do
expect(es_client).not_to receive(:bulk)
indexer.process(issue_as_ref)
end
it 'sends a bulk request if the max bulk request size is reached' do
set_bulk_limit(indexer, 1)
expect(es_client)
.to receive(:bulk)
.with(body: [kind_of(String), kind_of(String)])
.and_return({})
indexer.process(issue_as_ref)
expect(indexer.failures).to be_empty
end
end
describe '#flush' do
it 'completes a bulk' do
indexer.process(issue_as_ref)
expect(es_client)
.to receive(:bulk)
.with(body: [kind_of(String), kind_of(String)])
.and_return({})
expect(indexer.flush).to be_empty
end
it 'fails documents that elasticsearch refuses to accept' do
# Indexes with uppercase characters are invalid
expect(other_issue_as_ref.database_record.__elasticsearch__)
.to receive(:index_name)
.and_return('Invalid')
indexer.process(issue_as_ref)
indexer.process(other_issue_as_ref)
expect(indexer.flush).to contain_exactly(other_issue_as_ref)
expect(indexer.failures).to contain_exactly(other_issue_as_ref)
refresh_index!
expect(search_one(Issue)).to have_attributes(issue_as_json)
end
it 'fails all documents on exception' do
expect(es_client).to receive(:bulk) { raise 'An exception' }
indexer.process(issue_as_ref)
indexer.process(other_issue_as_ref)
expect(indexer.flush).to contain_exactly(issue_as_ref, other_issue_as_ref)
expect(indexer.failures).to contain_exactly(issue_as_ref, other_issue_as_ref)
end
context 'indexing an issue' do
it 'adds the issue to the index' do
expect(indexer.process(issue_as_ref).flush).to be_empty
refresh_index!
expect(search_one(Issue)).to have_attributes(issue_as_json)
end
it 'reindexes an unchanged issue' do
ensure_elasticsearch_index!
expect(es_client).to receive(:bulk).and_call_original
expect(indexer.process(issue_as_ref).flush).to be_empty
end
it 'reindexes a changed issue' do
ensure_elasticsearch_index!
issue.update!(title: 'new title')
expect(issue_as_json['title']).to eq('new title')
expect(indexer.process(issue_as_ref).flush).to be_empty
refresh_index!
expect(search_one(Issue)).to have_attributes(issue_as_json)
end
end
context 'deleting an issue' do
it 'removes the issue from the index' do
ensure_elasticsearch_index!
expect(issue_as_ref).to receive(:database_record).and_return(nil)
expect(indexer.process(issue_as_ref).flush).to be_empty
refresh_index!
expect(search(Issue, '*').size).to eq(0)
end
it 'succeeds even if the issue is not present' do
expect(issue_as_ref).to receive(:database_record).and_return(nil)
expect(indexer.process(issue_as_ref).flush).to be_empty
refresh_index!
expect(search(Issue, '*').size).to eq(0)
end
end
end
def ref(record)
Gitlab::Elastic::DocumentReference.build(record)
end
def stub_es_client(indexer, client)
allow(indexer).to receive(:client) { client }
end
def set_bulk_limit(indexer, bytes)
allow(indexer).to receive(:bulk_limit_bytes) { bytes }
end
def search(klass, text)
klass.__elasticsearch__.search(text)
end
def search_one(klass)
results = search(klass, '*')
expect(results.size).to eq(1)
results.first._source
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::Elastic::DocumentReference do
let_it_be(:issue) { create(:issue) }
let(:project) { issue.project }
let(:issue_as_array) { [Issue, issue.id, issue.es_id, issue.es_parent] }
let(:issue_as_ref) { described_class.new(*issue_as_array) }
let(:issue_as_str) { issue_as_array.join(' ') }
let(:project_as_array) { [Project, project.id, project.es_id] }
let(:project_as_ref) { described_class.new(*project_as_array) }
let(:project_as_str) { project_as_array.join(' ') }
describe '.build' do
it 'builds a document for an issue' do
expect(described_class.build(issue)).to eq(issue_as_ref)
end
it 'builds a document for a project' do
expect(described_class.build(project)).to eq(project_as_ref)
end
end
describe '.serialize' do
it 'does nothing to a string' do
expect(described_class.serialize('foo')).to eq('foo')
end
it 'serializes a DocumentReference' do
expect(described_class.serialize(issue_as_ref)).to eq(issue_as_str)
end
it 'defers to serialize_record for ApplicationRecord instances' do
expect(described_class).to receive(:serialize_record).with(issue)
described_class.serialize(issue)
end
it 'defers to serialize_array for Array instances' do
expect(described_class).to receive(:serialize_array).with(issue_as_array)
described_class.serialize(issue_as_array)
end
it 'fails to serialize an unrecognised value' do
expect { described_class.serialize(1) }.to raise_error(described_class::InvalidError)
end
end
describe '.serialize_record' do
it 'serializes an issue' do
expect(described_class.serialize(issue)).to eq(issue_as_str)
end
it 'serializes a project' do
expect(described_class.serialize(project)).to eq(project_as_str)
end
end
describe '.serialize_array' do
it 'serializes a project array' do
expect(described_class.serialize(project_as_array)).to eq(project_as_str)
end
it 'serializes an issue array' do
expect(described_class.serialize(issue_as_array)).to eq(issue_as_str)
end
it 'fails to serialize a too-small array' do
expect { described_class.serialize(project_as_array[0..1]) }.to raise_error(described_class::InvalidError)
end
it 'fails to serialize a too-large array' do
expect { described_class.serialize(project_as_array * 2) }.to raise_error(described_class::InvalidError)
end
end
describe '.deserialize' do
it 'deserializes an issue string' do
expect(described_class.deserialize(issue_as_str)).to eq(issue_as_ref)
end
it 'deserializes a project string' do
expect(described_class.deserialize(project_as_str)).to eq(project_as_ref)
end
end
describe '#initialize' do
it 'creates an issue reference' do
expect(described_class.new(*issue_as_array)).to eq(issue_as_ref)
end
it 'creates a project reference' do
expect(described_class.new(*project_as_array)).to eq(project_as_ref)
end
end
describe '#==' do
let(:subclass) { Class.new(described_class) }
it 'is equal to itself' do
expect(issue_as_ref).to eq(issue_as_ref)
end
it 'is equal to another ref when all elements match' do
expect(issue_as_ref).to eq(described_class.new(*issue_as_array))
end
it 'is not equal unless the other instance class matches' do
expect(issue_as_ref).not_to eq(subclass.new(*issue_as_array))
end
it 'is not equal unless db_id matches' do
other = described_class.new(Issue, issue.id + 1, issue.es_id, issue.es_parent)
expect(issue_as_ref).not_to eq(other)
end
it 'is not equal unless es_id matches' do
other = described_class.new(Issue, issue.id, 'Other es_id', issue.es_parent)
expect(issue_as_ref).not_to eq(other)
end
it 'is not equal unless es_parent matches' do
other = described_class.new(Issue, issue.id, issue.es_id, 'Other es_parent')
expect(issue_as_ref).not_to eq(other)
end
end
describe '#klass_name' do
it { expect(issue_as_ref.klass_name).to eq('Issue') }
end
describe '#database_record' do
it 'returns an issue' do
expect(issue_as_ref.database_record).to eq(issue)
end
it 'returns a project' do
expect(project_as_ref.database_record).to eq(project)
end
it 'returns nil if the record cannot be found' do
ref = described_class.new(Issue, issue.id + 1, 'issue_1')
expect(ref.database_record).to be_nil
end
it 'raises if the class is bad' do
ref = described_class.new(Integer, 1, 'integer_1')
expect { ref.database_record }.to raise_error(NoMethodError)
end
end
describe '#serialize' do
it 'serializes an issue' do
expect(issue_as_ref.serialize).to eq(issue_as_str)
end
it 'serializes a project' do
expect(project_as_ref.serialize).to eq(project_as_str)
end
end
end
......@@ -107,6 +107,8 @@ describe Note, :elastic do
end
it "does not create ElasticIndexerWorker job for system messages" do
stub_feature_flags(elastic_bulk_incremental_updates: false)
project = create :project, :repository
# We have to set one minute delay because of https://gitlab.com/gitlab-org/gitlab-foss/merge_requests/15682
issue = create :issue, project: project, updated_at: 1.minute.ago
......@@ -116,6 +118,16 @@ describe Note, :elastic do
create :note, :system, project: project, noteable: issue
end
it 'does not track system note updates via the bulk updater' do
stub_feature_flags(elastic_bulk_incremental_updates: true)
note = create(:note, :system)
expect(Elastic::ProcessBookkeepingService).not_to receive(:track!)
note.update!(note: 'some other text here')
end
it 'uses same index for Note subclasses' do
Note.subclasses.each do |note_class|
expect(note_class.index_name).to eq(Note.index_name)
......
......@@ -25,10 +25,10 @@ describe Elastic::IndexRecordService, :elastic do
with_them do
it 'indexes new records' do
object = nil
Sidekiq::Testing.disable! do
object = create(type)
end
object = create(type)
# Prevent records from being added via bulk indexing updates
::Elastic::ProcessBookkeepingService.clear_tracking!
expect do
expect(subject.execute(object, true)).to eq(true)
......@@ -122,10 +122,14 @@ describe Elastic::IndexRecordService, :elastic do
Sidekiq::Testing.inline! do
expect(subject.execute(other_project, true)).to eq(true)
end
# Prevent records from being added via bulk indexing updates
::Elastic::ProcessBookkeepingService.clear_tracking!
ensure_elasticsearch_index!
# Only the project itself should be in the index
expect(Elasticsearch::Model.search('*').total_count).to be 1
expect(Elasticsearch::Model.search('*').total_count).to eq(1)
expect(Project.elastic_search('*').records).to contain_exactly(other_project)
end
......@@ -312,13 +316,9 @@ describe Elastic::IndexRecordService, :elastic do
end
it 'skips records for which indexing is disabled' do
project = nil
Sidekiq::Testing.disable! do
project = create :project, name: 'project_1'
end
stub_ee_application_setting(elasticsearch_limit_indexing: true)
expect(project).to receive(:use_elasticsearch?).and_return(false)
project = create(:project, name: 'project_1')
Sidekiq::Testing.inline! do
expect(subject.execute(project, true)).to eq(true)
......
# frozen_string_literal: true
require 'spec_helper'
describe Elastic::ProcessBookkeepingService, :clean_gitlab_redis_shared_state do
around do |example|
described_class.with_redis do |redis|
@redis = redis
example.run
end
end
let(:zset) { 'elastic:incremental:updates:0:zset' }
let(:redis) { @redis }
let(:ref_class) { ::Gitlab::Elastic::DocumentReference }
let(:fake_refs) { Array.new(10) { |i| ref_class.new(Issue, i, "issue_#{i}", 'project_1') } }
let(:issue) { fake_refs.first }
let(:issue_spec) { issue.serialize }
describe '.track' do
it 'enqueues a record' do
described_class.track!(issue)
spec, score = redis.zpopmin(zset)
expect(spec).to eq(issue_spec)
expect(score).to eq(1.0)
end
it 'enqueues a set of unique records' do
described_class.track!(*fake_refs)
expect(described_class.queue_size).to eq(fake_refs.size)
spec1, score1 = redis.zpopmin(zset)
_, score2 = redis.zpopmin(zset)
expect(score1).to be < score2
expect(spec1).to eq(issue_spec)
end
it 'enqueues 10 identical records as 1 entry' do
described_class.track!(*([issue] * 10))
expect(described_class.queue_size).to eq(1)
end
it 'deduplicates across multiple inserts' do
10.times { described_class.track!(issue) }
expect(described_class.queue_size).to eq(1)
end
end
describe '.queue_size' do
it 'reports the queue size' do
expect(described_class.queue_size).to eq(0)
described_class.track!(*fake_refs)
expect(described_class.queue_size).to eq(fake_refs.size)
expect { redis.zpopmin(zset) }.to change(described_class, :queue_size).by(-1)
end
end
describe '.clear_tracking!' do
it 'removes all entries from the queue' do
described_class.track!(*fake_refs)
expect(described_class.queue_size).to eq(fake_refs.size)
described_class.clear_tracking!
expect(described_class.queue_size).to eq(0)
end
end
describe '#execute' do
let(:limit) { 5 }
before do
stub_const('Elastic::ProcessBookkeepingService::LIMIT', limit)
end
it 'submits a batch of documents' do
described_class.track!(*fake_refs)
expect(described_class.queue_size).to eq(fake_refs.size)
expect_processing(*fake_refs[0...limit])
expect { described_class.new.execute }.to change(described_class, :queue_size).by(-limit)
end
it 'retries failed documents' do
described_class.track!(*fake_refs)
failed = fake_refs[0]
expect(described_class.queue_size).to eq(10)
expect_processing(*fake_refs[0...limit], failures: [failed])
expect { described_class.new.execute }.to change(described_class, :queue_size).by(-limit + 1)
serialized, _ = redis.zpopmax(zset)
expect(ref_class.deserialize(serialized)).to eq(failed)
end
it 'discards malformed documents' do
described_class.track!('Bad')
expect(described_class.queue_size).to eq(1)
expect_next_instance_of(::Gitlab::Elastic::BulkIndexer) do |indexer|
expect(indexer).not_to receive(:process)
end
expect { described_class.new.execute }.to change(described_class, :queue_size).by(-1)
end
it 'fails, preserving documents, when processing fails with an exception' do
described_class.track!(issue)
expect(described_class.queue_size).to eq(1)
expect_next_instance_of(::Gitlab::Elastic::BulkIndexer) do |indexer|
expect(indexer).to receive(:process).with(issue) { raise 'Bad' }
end
expect { described_class.new.execute }.to raise_error('Bad')
expect(described_class.queue_size).to eq(1)
end
def expect_processing(*refs, failures: [])
expect_next_instance_of(::Gitlab::Elastic::BulkIndexer) do |indexer|
refs.each { |ref| expect(indexer).to receive(:process).with(ref) }
expect(indexer).to receive(:flush) { failures }
end
end
end
end
......@@ -2,11 +2,13 @@
RSpec.configure do |config|
config.before(:each, :elastic) do
Elastic::ProcessBookkeepingService.clear_tracking!
Gitlab::Elastic::Helper.create_empty_index
end
config.after(:each, :elastic) do
Gitlab::Elastic::Helper.delete_index
Elastic::ProcessBookkeepingService.clear_tracking!
end
config.include ElasticsearchHelpers, :elastic
......
......@@ -2,6 +2,14 @@
module ElasticsearchHelpers
def ensure_elasticsearch_index!
# Ensure that any enqueued updates are processed
Elastic::ProcessBookkeepingService.new.execute
# Make any documents added to the index visible
refresh_index!
end
def refresh_index!
::Gitlab::Elastic::Helper.refresh_index
end
end
# frozen_string_literal: true
require 'spec_helper'
describe ElasticIndexBulkCronWorker do
include ExclusiveLeaseHelpers
describe '.perform' do
it 'executes the service under an exclusive lease' do
expect_to_obtain_exclusive_lease('elastic_index_bulk_cron_worker')
expect_next_instance_of(::Elastic::ProcessBookkeepingService) do |service|
expect(service).to receive(:execute)
end
described_class.new.perform
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment