Commit 1fae3ce0 authored by Dmitry Gruzd's avatar Dmitry Gruzd

Add namespace_ancestry_ids migration

This change:
- Adds the AddNamespaceAncestryIdsToIssuesMapping migration
- Adds the RedoBackfillNamespaceAncestryIdsForIssues migration
- Changes the namespace_ancestry method
method to have trailing separator, which is needed to get all documents
belonging to one namespace in 1 query. Without this last separator
`namespace_ancestry_ids = '1-2'` will also match `1-233` and return
incorrect results.
- Extracts common examples for adding mappings and backfilling a new
field into migration shared examples (migration_shared_examples.rb)
and uses them for migration specs.

EE: true
Changelog: changed
parent 09dcf121
......@@ -233,6 +233,11 @@ Any data or index cleanup needed to support migration retries should be handled
will re-enqueue itself with a delay which is set using the `throttle_delay` option described below. The batching
must be handled within the `migrate` method, this setting controls the re-enqueuing only.
- `batch_size` - Sets the number of documents modified during a `batched!` migration run. This size should be set to a value which allows the updates
enough time to finish. This can be tuned in combination with the `throttle_delay` option described below. The batching
must be handled within a custom `migrate` method or by using the [`Elastic::MigrationBackfillHelper`](https://gitlab.com/gitlab-org/gitlab/-/blob/master/ee/app/workers/concerns/elastic/migration_backfill_helper.rb)
`migrate` method which uses this setting. Default value is 1000 documents.
- `throttle_delay` - Sets the wait time in between batch runs. This time should be set high enough to allow each migration batch
enough time to finish. Additionally, the time should be less than 30 minutes since that is how often the
[`Elastic::MigrationWorker`](https://gitlab.com/gitlab-org/gitlab/-/blob/master/ee/app/workers/elastic/migration_worker.rb)
......
......@@ -5,7 +5,7 @@ module Elastic
attr_reader :version, :name, :filename
delegate :migrate, :skip_migration?, :completed?, :batched?, :throttle_delay, :pause_indexing?,
:space_requirements?, :space_required_bytes, :obsolete?,
:space_requirements?, :space_required_bytes, :obsolete?, :batch_size,
to: :migration
ELASTICSEARCH_SIZE = 25
......
......@@ -2,13 +2,15 @@
module Elastic
module MigrationBackfillHelper
UPDATE_BATCH_SIZE = 100
def migrate
if completed?
log "Skipping adding #{field_name} field to #{index_name} documents migration since it is already applied"
return
end
log "Adding #{field_name} field to #{index_name} documents for batch of #{self.class::QUERY_BATCH_SIZE} documents"
log "Adding #{field_name} field to #{index_name} documents for batch of #{query_batch_size} documents"
document_references = process_batch!
......@@ -61,7 +63,7 @@ module Elastic
def process_batch!
query = {
size: self.class::QUERY_BATCH_SIZE,
size: query_batch_size,
query: {
bool: {
filter: missing_field_filter
......@@ -80,11 +82,24 @@ module Elastic
Gitlab::Elastic::DocumentReference.new(self.class::DOCUMENT_TYPE, id, es_id, es_parent)
end
document_references.each_slice(self.class::UPDATE_BATCH_SIZE) do |refs|
document_references.each_slice(update_batch_size) do |refs|
Elastic::ProcessInitialBookkeepingService.track!(*refs)
end
document_references
end
def query_batch_size
return self.class::QUERY_BATCH_SIZE if self.class.const_defined?(:QUERY_BATCH_SIZE)
return batch_size if respond_to?(:batch_size)
raise NotImplemented
end
def update_batch_size
return self.class::UPDATE_BATCH_SIZE if self.class.const_defined?(:UPDATE_BATCH_SIZE)
UPDATE_BATCH_SIZE
end
end
end
......@@ -6,11 +6,16 @@ module Elastic
include Gitlab::ClassAttributes
DEFAULT_THROTTLE_DELAY = 5.minutes
DEFAULT_BATCH_SIZE = 1000
def batched?
self.class.get_batched
end
def batch_size
self.class.get_batch_size
end
def throttle_delay
self.class.get_throttle_delay
end
......@@ -55,6 +60,14 @@ module Elastic
def get_throttle_delay
class_attributes[:throttle_delay] || DEFAULT_THROTTLE_DELAY
end
def batch_size(value)
class_attributes[:batch_size] = value
end
def get_batch_size
class_attributes[:batch_size] || DEFAULT_BATCH_SIZE
end
end
end
end
......@@ -5,26 +5,20 @@ class AddNamespaceAncestryToIssuesMapping < Elastic::Migration
DOCUMENT_KLASS = Issue
def migrate
if completed?
log 'Skipping adding namespace_ancestry to issues mapping migration since it is already applied'
return
end
log 'Adding namespace_ancestry to issues mapping'
update_mapping!(index_name, { properties: { namespace_ancestry: { type: 'text', index_prefixes: { min_chars: 1, max_chars: 19 } } } })
end
def completed?
helper.refresh_index(index_name: index_name)
mappings = helper.get_mapping(index_name: index_name)
mappings.dig('namespace_ancestry').present?
end
private
def index_name
DOCUMENT_KLASS.__elasticsearch__.index_name
end
def new_mappings
{
namespace_ancestry: {
type: 'text',
index_prefixes: {
min_chars: 1, max_chars: 19
}
}
}
end
end
# frozen_string_literal: true
class AddNamespaceAncestryIdsToIssuesMapping < Elastic::Migration
include Elastic::MigrationUpdateMappingsHelper
DOCUMENT_TYPE = Issue
private
def index_name
DOCUMENT_TYPE.__elasticsearch__.index_name
end
def new_mappings
{
namespace_ancestry_ids: {
type: 'keyword'
}
}
end
end
# frozen_string_literal: true
class RedoBackfillNamespaceAncestryIdsForIssues < Elastic::Migration
include Elastic::MigrationBackfillHelper
batched!
batch_size 5000
throttle_delay 3.minutes
DOCUMENT_TYPE = Issue
UPDATE_BATCH_SIZE = 100
private
def index_name
DOCUMENT_TYPE.__elasticsearch__.index_name
end
def field_name
:namespace_ancestry_ids
end
end
......@@ -5,6 +5,8 @@ module Elastic
class ApplicationInstanceProxy < Elasticsearch::Model::Proxy::InstanceMethodsProxy
include InstanceProxyUtil
NAMESPACE_ANCESTRY_SEPARATOR = '-'
def es_parent
"project_#{target.project_id}" unless target.is_a?(Project) || target&.project_id.nil?
end
......@@ -20,7 +22,7 @@ module Elastic
def namespace_ancestry
project = target.is_a?(Project) ? target : target.project
namespace = project.namespace
namespace.self_and_ancestor_ids(hierarchy_order: :desc).join('-')
namespace.self_and_ancestor_ids(hierarchy_order: :desc).join(NAMESPACE_ANCESTRY_SEPARATOR) + NAMESPACE_ANCESTRY_SEPARATOR
end
private
......
......@@ -36,7 +36,8 @@ module Elastic
indexes :visibility_level, type: :integer
indexes :issues_access_level, type: :integer
indexes :upvotes, type: :integer
indexes :namespace_ancestry, type: :text, index_prefixes: { min_chars: 1, max_chars: 19 }
indexes :namespace_ancestry, type: :text, index_prefixes: { min_chars: 1, max_chars: 19 } # deprecated
indexes :namespace_ancestry_ids, type: :keyword
end
end
end
......
......@@ -21,7 +21,7 @@ module Elastic
data['issues_access_level'] = safely_read_project_feature_for_elasticsearch(:issues)
data['upvotes'] = target.upvotes_count
data['namespace_ancestry'] = target.namespace_ancestry if Elastic::DataMigrationService.migration_has_finished?(:add_namespace_ancestry_to_issues_mapping)
data['namespace_ancestry_ids'] = target.namespace_ancestry if Elastic::DataMigrationService.migration_has_finished?(:add_namespace_ancestry_ids_to_issues_mapping)
data.merge(generic_attributes)
end
......
# frozen_string_literal: true
require 'spec_helper'
require_relative 'migration_shared_examples'
require File.expand_path('ee/elastic/migrate/20210722112500_add_upvotes_mappings_to_merge_requests.rb')
RSpec.describe AddUpvotesMappingsToMergeRequests, :elastic, :sidekiq_inline do
let(:version) { 20210722112500 }
let(:migration) { described_class.new(version) }
let(:helper) { Gitlab::Elastic::Helper.new }
before do
allow(migration).to receive(:helper).and_return(helper)
end
describe '.migrate' do
subject { migration.migrate }
context 'when migration is already completed' do
it 'does not modify data' do
expect(helper).not_to receive(:update_mapping)
subject
end
end
context 'migration process' do
before do
allow(helper).to receive(:get_mapping).and_return({})
end
it 'updates the issues index mappings' do
expect(helper).to receive(:update_mapping)
subject
end
end
end
describe '.completed?' do
context 'mapping has been updated' do
specify { expect(migration).to be_completed }
end
context 'mapping has not been updated' do
before do
allow(helper).to receive(:get_mapping).and_return({})
end
specify { expect(migration).not_to be_completed }
end
end
include_examples 'migration adds mapping'
end
# frozen_string_literal: true
require 'spec_helper'
require_relative 'migration_shared_examples'
require File.expand_path('ee/elastic/migrate/20210813134600_add_namespace_ancestry_to_issues_mapping.rb')
RSpec.describe AddNamespaceAncestryToIssuesMapping, :elastic, :sidekiq_inline do
let(:version) { 20210813134600 }
let(:migration) { described_class.new(version) }
let(:helper) { Gitlab::Elastic::Helper.new }
before do
allow(migration).to receive(:helper).and_return(helper)
end
describe '.migrate' do
subject { migration.migrate }
context 'when migration is already completed' do
it 'does not modify data' do
expect(helper).not_to receive(:update_mapping)
subject
end
end
context 'migration process' do
before do
allow(helper).to receive(:get_mapping).and_return({})
end
it 'updates the issues index mappings' do
expect(helper).to receive(:update_mapping)
subject
end
end
end
describe '.completed?' do
context 'mapping has been updated' do
specify { expect(migration).to be_completed }
end
context 'mapping has not been updated' do
before do
allow(helper).to receive(:get_mapping).and_return({})
end
specify { expect(migration).not_to be_completed }
end
end
include_examples 'migration adds mapping'
end
# frozen_string_literal: true
require 'spec_helper'
require_relative 'migration_shared_examples'
require File.expand_path('ee/elastic/migrate/20210910094600_add_namespace_ancestry_ids_to_issues_mapping.rb')
RSpec.describe AddNamespaceAncestryIdsToIssuesMapping, :elastic, :sidekiq_inline do
let(:version) { 20210910094600 }
include_examples 'migration adds mapping'
end
# frozen_string_literal: true
require 'spec_helper'
require_relative 'migration_shared_examples'
require File.expand_path('ee/elastic/migrate/20210910100000_redo_backfill_namespace_ancestry_ids_for_issues.rb')
RSpec.describe RedoBackfillNamespaceAncestryIdsForIssues, :elastic, :sidekiq_inline do
let(:version) { 20210910100000 }
include_examples 'migration backfills a field' do
let(:objects) { create_list(:issue, 3) }
let(:field_name) { :namespace_ancestry_ids }
let(:field_value) { "1-2-3-" }
let(:expected_throttle_delay) { 3.minutes }
let(:expected_batch_size) { 5000 }
end
end
# frozen_string_literal: true
require 'spec_helper'
require File.expand_path('ee/elastic/migrate/20210825110300_backfill_namespace_ancestry_for_issues.rb')
RSpec.describe BackfillNamespaceAncestryForIssues, :elastic, :sidekiq_inline do
let(:version) { 20210825110300 }
RSpec.shared_examples 'migration backfills a field' do
let(:migration) { described_class.new(version) }
let(:issues) { create_list(:issue, 3) }
let(:klass) { objects.first.class }
before do
stub_ee_application_setting(elasticsearch_search: true, elasticsearch_indexing: true)
set_elasticsearch_migration_to(version, including: false)
# ensure issues are indexed
issues
# ensure objects are indexed
objects
ensure_elasticsearch_index!
end
......@@ -19,7 +17,8 @@ RSpec.describe BackfillNamespaceAncestryForIssues, :elastic, :sidekiq_inline do
describe 'migration_options' do
it 'has migration options set', :aggregate_failures do
expect(migration).to be_batched
expect(migration.throttle_delay).to eq(3.minutes)
expect(migration.throttle_delay).to eq(expected_throttle_delay)
expect(migration.batch_size).to eq(expected_batch_size)
end
end
......@@ -36,47 +35,39 @@ RSpec.describe BackfillNamespaceAncestryForIssues, :elastic, :sidekiq_inline do
context 'migration process' do
before do
remove_namespace_ancestry_from_issues(issues)
remove_field_from_objects(objects)
end
context 'when migration fails' do
let(:logger) { instance_double('::Gitlab::Elasticsearch::Logger') }
before do
allow(migration).to receive(:logger).and_return(logger)
allow(logger).to receive(:info)
allow(migration).to receive(:process_batch!).and_raise('failed to process')
end
it 'logs and reraises an error' do
expect(logger).to receive(:error).with(/migrate failed with error/)
expect { subject }.to raise_error(RuntimeError)
end
end
it 'updates all issue documents' do
it 'updates all documents' do
# track calls are batched in groups of 100
expect(::Elastic::ProcessInitialBookkeepingService).to receive(:track!).once do |*tracked_refs|
expect(::Elastic::ProcessInitialBookkeepingService).to receive(:track!).once.and_call_original do |*tracked_refs|
expect(tracked_refs.count).to eq(3)
end
subject
ensure_elasticsearch_index!
expect(migration.completed?).to be_truthy
end
it 'only updates issue documents missing namespace_ancestry', :aggregate_failures do
issue = issues.first
add_namespace_ancestry_for_issues(issues[1..-1])
it 'only updates documents missing a field', :aggregate_failures do
object = objects.first
add_field_for_objects(objects[1..-1])
expected = [Gitlab::Elastic::DocumentReference.new(Issue, issue.id, issue.es_id, issue.es_parent)]
expect(::Elastic::ProcessInitialBookkeepingService).to receive(:track!).with(*expected).once
expected = [Gitlab::Elastic::DocumentReference.new(klass, object.id, object.es_id, object.es_parent)]
expect(::Elastic::ProcessInitialBookkeepingService).to receive(:track!).with(*expected).once.and_call_original
subject
ensure_elasticsearch_index!
expect(migration.completed?).to be_truthy
end
it 'processes in batches', :aggregate_failures do
stub_const("#{described_class}::QUERY_BATCH_SIZE", 2)
stub_const("#{described_class}::UPDATE_BATCH_SIZE", 1)
allow(migration).to receive(:batch_size).and_return(2)
allow(migration).to receive(:update_batch_size).and_return(1)
expect(::Elastic::ProcessInitialBookkeepingService).to receive(:track!).exactly(3).times.and_call_original
......@@ -86,89 +77,119 @@ RSpec.describe BackfillNamespaceAncestryForIssues, :elastic, :sidekiq_inline do
ensure_elasticsearch_index!
migration.migrate
ensure_elasticsearch_index!
expect(migration.completed?).to be_truthy
end
end
end
describe '.completed?' do
context 'when documents are missing namespace_ancestry' do
context 'when documents are missing field' do
before do
remove_namespace_ancestry_from_issues(issues)
remove_field_from_objects(objects)
end
specify { expect(migration).not_to be_completed }
context 'when there are no documents in index' do
before do
delete_issues_from_index!
end
specify { expect(migration).to be_completed }
end
end
context 'when no documents are missing namespace_ancestry' do
context 'when no documents are missing field' do
specify { expect(migration).to be_completed }
end
end
private
def add_namespace_ancestry_for_issues(issues)
def add_field_for_objects(objects)
script = {
source: "ctx._source['namespace_ancestry'] = params.namespace_ancestry;",
source: "ctx._source['#{field_name}'] = params.#{field_name};",
lang: "painless",
params: {
namespace_ancestry: "1-2-3"
field_name => field_value
}
}
update_by_query(issues, script)
update_by_query(objects, script)
end
def remove_namespace_ancestry_from_issues(issues)
def remove_field_from_objects(objects)
script = {
source: "ctx._source.remove('namespace_ancestry')"
source: "ctx._source.remove('#{field_name}')"
}
update_by_query(issues, script)
update_by_query(objects, script)
end
def update_by_query(issues, script)
issue_ids = issues.map { |i| i.id }
def update_by_query(objects, script)
object_ids = objects.map(&:id)
client = Issue.__elasticsearch__.client
client = klass.__elasticsearch__.client
client.update_by_query({
index: Issue.__elasticsearch__.index_name,
wait_for_completion: true, # run synchronously
refresh: true, # make operation visible to search
body: {
script: script,
query: {
bool: {
must: [
{
terms: {
id: issue_ids
}
}
]
}
}
}
})
index: klass.__elasticsearch__.index_name,
wait_for_completion: true, # run synchronously
refresh: true, # make operation visible to search
body: {
script: script,
query: {
bool: {
must: [
{
terms: {
id: object_ids
}
}
]
}
}
}
})
end
end
RSpec.shared_examples 'migration adds mapping' do
let(:migration) { described_class.new(version) }
let(:helper) { Gitlab::Elastic::Helper.new }
before do
allow(migration).to receive(:helper).and_return(helper)
end
def delete_issues_from_index!
client = Issue.__elasticsearch__.client
client.delete_by_query({
index: Issue.__elasticsearch__.index_name,
wait_for_completion: true, # run synchronously
body: {
query: {
match_all: {}
}
}
})
describe '.migrate' do
subject { migration.migrate }
context 'when migration is already completed' do
it 'does not modify data' do
expect(helper).not_to receive(:update_mapping)
subject
end
end
context 'migration process' do
before do
allow(helper).to receive(:get_mapping).and_return({})
end
it 'updates the issues index mappings' do
expect(helper).to receive(:update_mapping)
subject
end
end
end
describe '.completed?' do
context 'mapping has been updated' do
specify { expect(migration).to be_completed }
end
context 'mapping has not been updated' do
before do
allow(helper).to receive(:get_mapping).and_return({})
end
specify { expect(migration).not_to be_completed }
end
end
end
......@@ -113,13 +113,13 @@ RSpec.describe Issue, :elastic do
let_it_be(:issue) { create(:issue, project: project, assignees: [assignee]) }
let_it_be(:award_emoji) { create(:award_emoji, :upvote, awardable: issue) }
context 'when add_namespace_ancestry_to_issues_mapping migration is not done' do
context 'when add_namespace_ancestry_ids_to_issues_mapping migration is not done' do
before do
set_elasticsearch_migration_to :add_namespace_ancestry_to_issues_mapping, including: false
set_elasticsearch_migration_to :add_namespace_ancestry_ids_to_issues_mapping, including: false
end
it "returns json without namespace_ancestry" do
expect(issue.__elasticsearch__.as_indexed_json.keys).not_to include('namespace_ancestry')
expect(issue.__elasticsearch__.as_indexed_json.keys).not_to include('namespace_ancestry_ids')
end
end
......@@ -138,7 +138,7 @@ RSpec.describe Issue, :elastic do
'type' => issue.es_type,
'state' => issue.state,
'upvotes' => 1,
'namespace_ancestry' => "#{group.id}-#{subgroup.id}"
'namespace_ancestry_ids' => "#{group.id}-#{subgroup.id}-"
})
expected_hash['assignee_id'] = [assignee.id]
......
......@@ -48,4 +48,18 @@ RSpec.describe Elastic::MigrationOptions do
expect(subject).to eq(30.seconds)
end
end
describe '#batch_size' do
subject { migration_class.new.batch_size }
it 'has a default' do
expect(subject).to eq(described_class::DEFAULT_BATCH_SIZE)
end
it 'respects when batch_size is set for the class' do
migration_class.batch_size 10000
expect(subject).to eq(10000)
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment