Commit 2ab43ff7 authored by Sean McGivern's avatar Sean McGivern

Merge branch '12372-retry' into 'master'

Elasticsearch: Intelligently retry bulk-insert failures when indexing database content

See merge request gitlab-org/gitlab-ee!14657
parents 3b590dbe 338d321a
...@@ -319,7 +319,7 @@ module Elastic ...@@ -319,7 +319,7 @@ module Elastic
{ fields: es_fields } { fields: es_fields }
end end
def es_import(options = {}) def es_import(**options)
transform = lambda do |r| transform = lambda do |r|
{ index: { _id: r.es_id, data: r.__elasticsearch__.as_indexed_json } }.tap do |data| { index: { _id: r.es_id, data: r.__elasticsearch__.as_indexed_json } }.tap do |data|
data[:index][:routing] = r.es_parent if r.es_parent data[:index][:routing] = r.es_parent if r.es_parent
......
...@@ -4,7 +4,10 @@ module Elastic ...@@ -4,7 +4,10 @@ module Elastic
class IndexRecordService class IndexRecordService
include Elasticsearch::Model::Client::ClassMethods include Elasticsearch::Model::Client::ClassMethods
ImportError = Class.new(StandardError)
ISSUE_TRACKED_FIELDS = %w(assignee_ids author_id confidential).freeze ISSUE_TRACKED_FIELDS = %w(assignee_ids author_id confidential).freeze
IMPORT_RETRY_COUNT = 3
# @param indexing [Boolean] determines whether operation is "indexing" or "updating" # @param indexing [Boolean] determines whether operation is "indexing" or "updating"
def execute(record, indexing, options = {}) def execute(record, indexing, options = {})
...@@ -17,6 +20,8 @@ module Elastic ...@@ -17,6 +20,8 @@ module Elastic
initial_index_project(record) if record.class == Project && indexing initial_index_project(record) if record.class == Project && indexing
update_issue_notes(record, options["changed_fields"]) if record.class == Issue update_issue_notes(record, options["changed_fields"]) if record.class == Issue
true
rescue Elasticsearch::Transport::Transport::Errors::NotFound, ActiveRecord::RecordNotFound rescue Elasticsearch::Transport::Transport::Errors::NotFound, ActiveRecord::RecordNotFound
# These errors can happen in several cases, including: # These errors can happen in several cases, including:
# - A record is updated, then removed before the update is handled # - A record is updated, then removed before the update is handled
...@@ -31,7 +36,7 @@ module Elastic ...@@ -31,7 +36,7 @@ module Elastic
def update_issue_notes(record, changed_fields) def update_issue_notes(record, changed_fields)
if changed_fields && (changed_fields & ISSUE_TRACKED_FIELDS).any? if changed_fields && (changed_fields & ISSUE_TRACKED_FIELDS).any?
Note.es_import query: -> { where(noteable: record) } import_association(Note, query: -> { where(noteable: record) })
end end
end end
...@@ -41,19 +46,45 @@ module Elastic ...@@ -41,19 +46,45 @@ module Elastic
ElasticCommitIndexerWorker.perform_async(project.id) ElasticCommitIndexerWorker.perform_async(project.id)
ElasticCommitIndexerWorker.perform_async(project.id, nil, nil, true) ElasticCommitIndexerWorker.perform_async(project.id, nil, nil, true)
project.each_indexed_association do |klass, objects| project.each_indexed_association do |klass, association|
objects.es_import import_association(association)
end
end
def import_association(association, options = {})
options[:return] = 'errors'
errors = association.es_import(options)
return if errors.empty?
IMPORT_RETRY_COUNT.times do
errors = retry_import(errors, association, options)
return if errors.empty?
end end
raise ImportError.new(errors.inspect)
end end
def import(record, nested, indexing) def import(record, nested, indexing)
operation = indexing ? 'index_document' : 'update_document' operation = indexing ? 'index_document' : 'update_document'
response = nil
if nested IMPORT_RETRY_COUNT.times do
response = if nested
record.__elasticsearch__.__send__ operation, routing: record.es_parent # rubocop:disable GitlabSecurity/PublicSend record.__elasticsearch__.__send__ operation, routing: record.es_parent # rubocop:disable GitlabSecurity/PublicSend
else else
record.__elasticsearch__.__send__ operation # rubocop:disable GitlabSecurity/PublicSend record.__elasticsearch__.__send__ operation # rubocop:disable GitlabSecurity/PublicSend
end end
return if response['_shards']['successful'] > 0
end
raise ImportError.new(response)
end
def retry_import(errors, association, options)
ids = errors.map { |error| error['index']['_id'][/_(\d+)$/, 1] }
association.id_in(ids).es_import(options)
end end
end end
end end
...@@ -12,8 +12,16 @@ class ElasticFullIndexWorker ...@@ -12,8 +12,16 @@ class ElasticFullIndexWorker
def perform(start_id, end_id) def perform(start_id, end_id)
return true unless Gitlab::CurrentSettings.elasticsearch_indexing? return true unless Gitlab::CurrentSettings.elasticsearch_indexing?
failed_ids = []
Project.id_in(start_id..end_id).find_each do |project| Project.id_in(start_id..end_id).find_each do |project|
Elastic::IndexRecordService.new.execute(project, true) Elastic::IndexRecordService.new.execute(project, true)
rescue Elastic::IndexRecordService::ImportError
failed_ids << project.id
end
if failed_ids.present?
Elastic::IndexProjectsByIdService.new.execute(project_ids: failed_ids)
end end
end end
end end
---
title: Improve Elasticsearch database import by retrying only failed cases first
merge_request: 14657
author:
type: performance
...@@ -29,7 +29,7 @@ describe Elastic::IndexRecordService, :elastic do ...@@ -29,7 +29,7 @@ describe Elastic::IndexRecordService, :elastic do
end end
expect do expect do
subject.execute(object, true) expect(subject.execute(object, true)).to eq(true)
Gitlab::Elastic::Helper.refresh_index Gitlab::Elastic::Helper.refresh_index
end.to change { Elasticsearch::Model.search('*').records.size }.by(1) end.to change { Elasticsearch::Model.search('*').records.size }.by(1)
end end
...@@ -39,12 +39,12 @@ describe Elastic::IndexRecordService, :elastic do ...@@ -39,12 +39,12 @@ describe Elastic::IndexRecordService, :elastic do
Sidekiq::Testing.disable! do Sidekiq::Testing.disable! do
object = create(type) object = create(type)
subject.execute(object, true) expect(subject.execute(object, true)).to eq(true)
object.update(attribute => "new") object.update(attribute => "new")
end end
expect do expect do
subject.execute(object, false) expect(subject.execute(object, false)).to eq(true)
Gitlab::Elastic::Helper.refresh_index Gitlab::Elastic::Helper.refresh_index
end.to change { Elasticsearch::Model.search('new').records.size }.by(1) end.to change { Elasticsearch::Model.search('new').records.size }.by(1)
end end
...@@ -72,7 +72,7 @@ describe Elastic::IndexRecordService, :elastic do ...@@ -72,7 +72,7 @@ describe Elastic::IndexRecordService, :elastic do
expect(ElasticCommitIndexerWorker).to receive(:perform_async).with(project.id, nil, nil, true).and_call_original expect(ElasticCommitIndexerWorker).to receive(:perform_async).with(project.id, nil, nil, true).and_call_original
Sidekiq::Testing.inline! do Sidekiq::Testing.inline! do
subject.execute(project, true) expect(subject.execute(project, true)).to eq(true)
end end
Gitlab::Elastic::Helper.refresh_index Gitlab::Elastic::Helper.refresh_index
...@@ -87,7 +87,7 @@ describe Elastic::IndexRecordService, :elastic do ...@@ -87,7 +87,7 @@ describe Elastic::IndexRecordService, :elastic do
expect(ElasticCommitIndexerWorker).to receive(:perform_async).with(other_project.id, nil, nil, true).and_call_original expect(ElasticCommitIndexerWorker).to receive(:perform_async).with(other_project.id, nil, nil, true).and_call_original
Sidekiq::Testing.inline! do Sidekiq::Testing.inline! do
subject.execute(other_project, true) expect(subject.execute(other_project, true)).to eq(true)
end end
Gitlab::Elastic::Helper.refresh_index Gitlab::Elastic::Helper.refresh_index
...@@ -95,6 +95,156 @@ describe Elastic::IndexRecordService, :elastic do ...@@ -95,6 +95,156 @@ describe Elastic::IndexRecordService, :elastic do
expect(Elasticsearch::Model.search('*').total_count).to be 1 expect(Elasticsearch::Model.search('*').total_count).to be 1
expect(Project.elastic_search('*').records).to contain_exactly(other_project) expect(Project.elastic_search('*').records).to contain_exactly(other_project)
end end
context 'retry indexing record' do
let(:failure_response) do
{
"_shards" => {
"total" => 2,
"failed" => 2,
"successful" => 0
},
"_index" => "foo",
"_type" => "_doc",
"_id" => "project_1",
"_version" => 1,
"created" => false,
"result" => ""
}
end
before do
allow(ElasticCommitIndexerWorker).to receive(:perform_async)
end
it 'does not retry if successful' do
expect(project.__elasticsearch__).to receive(:index_document).once.and_call_original
expect(subject.execute(project, true)).to eq(true)
end
it 'retries, and raises error if all retries fail' do
expect(project.__elasticsearch__).to receive(:index_document)
.exactly(described_class::IMPORT_RETRY_COUNT).times
.and_return(failure_response)
expect { subject.execute(project, true) }.to raise_error(described_class::ImportError)
end
it 'retries, and returns true if a retry is successful' do
expect(project.__elasticsearch__).to receive(:index_document).and_wrap_original do |m, *args|
allow(project.__elasticsearch__).to receive(:index_document).and_call_original
m.call(*args)
end
expect(subject.execute(project, true)).to eq(true)
end
end
context 'retry importing associations' do
let(:issues) { Issue.all.to_a }
let(:failure_response) do
{
"took" => 30,
"errors" => true,
"items" => [
{
"index" => {
"error" => 'FAILED',
"_index" => "test",
"_type" => "_doc",
"_id" => issues.first.es_id,
"_version" => 1,
"result" => "created",
"_shards" => {
"total" => 2,
"successful" => 1,
"failed" => 0
},
"status" => 400
}
},
{
"index" => {
"_index" => "test",
"_type" => "_doc",
"_id" => issues.last.es_id,
"_version" => 1,
"result" => "created",
"_shards" => {
"total" => 2,
"successful" => 1,
"failed" => 0
},
"status" => 201
}
}
]
}
end
let(:success_response) do
{
"took" => 30,
"errors" => false,
"items" => [
{
"index" => {
"_index" => "test",
"_type" => "_doc",
"_id" => issues.first.es_id,
"_version" => 1,
"result" => "created",
"_shards" => {
"total" => 2,
"successful" => 1,
"failed" => 0
},
"status" => 201
}
}
]
}
end
before do
allow(ElasticCommitIndexerWorker).to receive(:perform_async)
end
def expect_indexing(issue_ids, response, unstub: false)
expect(Issue.__elasticsearch__.client).to receive(:bulk) do |args|
actual_ids = args[:body].map { |job| job[:index][:_id] }
expected_ids = issue_ids.map { |id| "issue_#{id}" }
expect(actual_ids).to eq(expected_ids)
allow(Issue.__elasticsearch__.client).to receive(:bulk).and_call_original if unstub
response
end
end
it 'does not retry if successful' do
expect_indexing(issues.map(&:id), success_response, unstub: true)
expect(subject.execute(project, true)).to eq(true)
end
it 'retries, and raises error if all retries fail' do
expect_indexing(issues.map(&:id), failure_response)
expect_indexing([issues.first.id], failure_response).exactly(described_class::IMPORT_RETRY_COUNT).times # Retry
expect { subject.execute(project, true) }.to raise_error(described_class::ImportError)
end
it 'retries, and returns true if a retry is successful' do
expect_indexing(issues.map(&:id), failure_response)
expect_indexing([issues.first.id], success_response, unstub: true) # Retry
expect(subject.execute(project, true)).to eq(true)
end
end
end end
it 'indexes changes during indexing gap' do it 'indexes changes during indexing gap' do
...@@ -119,7 +269,7 @@ describe Elastic::IndexRecordService, :elastic do ...@@ -119,7 +269,7 @@ describe Elastic::IndexRecordService, :elastic do
expect(Note.elastic_search('note_3', options: options).present?).to eq(false) expect(Note.elastic_search('note_3', options: options).present?).to eq(false)
Sidekiq::Testing.inline! do Sidekiq::Testing.inline! do
subject.execute(project, true) expect(subject.execute(project, true)).to eq(true)
Gitlab::Elastic::Helper.refresh_index Gitlab::Elastic::Helper.refresh_index
end end
...@@ -138,7 +288,7 @@ describe Elastic::IndexRecordService, :elastic do ...@@ -138,7 +288,7 @@ describe Elastic::IndexRecordService, :elastic do
expect(project).to receive(:use_elasticsearch?).and_return(false) expect(project).to receive(:use_elasticsearch?).and_return(false)
Sidekiq::Testing.inline! do Sidekiq::Testing.inline! do
subject.execute(project, true) expect(subject.execute(project, true)).to eq(true)
Gitlab::Elastic::Helper.refresh_index Gitlab::Elastic::Helper.refresh_index
end end
......
# frozen_string_literal: true
require 'spec_helper'
describe ElasticFullIndexWorker do
subject { described_class.new }
before do
stub_ee_application_setting(elasticsearch_indexing: true)
end
it 'does nothing if ES disabled' do
stub_ee_application_setting(elasticsearch_indexing: false)
expect(Elastic::IndexRecordService).not_to receive(:new)
subject.perform(1, 2)
end
describe 'indexing' do
let(:projects) { create_list(:project, 3) }
it 'indexes projects in range' do
projects.each do |project|
expect_next_instance_of(Elastic::IndexRecordService) do |service|
expect(service).to receive(:execute).with(project, true).and_return(true)
end
end
subject.perform(projects.first.id, projects.last.id)
end
it 'retries failed indexing' do
projects.each do |project|
expect_next_instance_of(Elastic::IndexRecordService) do |service|
expect(service).to receive(:execute).with(project, true).and_raise(Elastic::IndexRecordService::ImportError)
end
end
expect_next_instance_of(Elastic::IndexProjectsByIdService) do |service|
expect(service).to receive(:execute).with(project_ids: projects.map(&:id))
end
subject.perform(projects.first.id, projects.last.id)
end
end
end
...@@ -34,7 +34,7 @@ describe ElasticIndexerWorker, :elastic do ...@@ -34,7 +34,7 @@ describe ElasticIndexerWorker, :elastic do
object = create(type) object = create(type)
expect_next_instance_of(Elastic::IndexRecordService) do |service| expect_next_instance_of(Elastic::IndexRecordService) do |service|
expect(service).to receive(:execute).with(object, true, {}) expect(service).to receive(:execute).with(object, true, {}).and_return(true)
end end
subject.perform("index", name, object.id, object.es_id) subject.perform("index", name, object.id, object.es_id)
...@@ -89,4 +89,16 @@ describe ElasticIndexerWorker, :elastic do ...@@ -89,4 +89,16 @@ describe ElasticIndexerWorker, :elastic do
expect(Elasticsearch::Model.search('*').total_count).to be(0) expect(Elasticsearch::Model.search('*').total_count).to be(0)
end end
it 'retries if index raises error' do
object = create(:project)
expect_next_instance_of(Elastic::IndexRecordService) do |service|
allow(service).to receive(:execute).and_raise(Elastic::IndexRecordService::ImportError)
end
expect do
subject.perform("index", 'Project', object.id, object.es_id)
end.to raise_error
end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment