Commit 2171930d authored by Dmitry Gruzd's avatar Dmitry Gruzd

Remove old Advanced Search migrations [part 2]

parent 42eb2e7f
...@@ -83,7 +83,7 @@ module EE ...@@ -83,7 +83,7 @@ module EE
# override # override
def use_separate_indices? def use_separate_indices?
Elastic::DataMigrationService.migration_has_finished?(:migrate_issues_to_separate_index) true
end end
end end
......
...@@ -20,11 +20,7 @@ class ElasticDeleteProjectWorker ...@@ -20,11 +20,7 @@ class ElasticDeleteProjectWorker
def indices def indices
helper = Gitlab::Elastic::Helper.default helper = Gitlab::Elastic::Helper.default
index_names = [helper.target_name] index_names = [helper.target_name] + helper.standalone_indices_proxies(target_classes: [Issue]).map(&:index_name)
if Elastic::DataMigrationService.migration_has_finished?(:migrate_issues_to_separate_index)
index_names << helper.standalone_indices_proxies(target_classes: [Issue]).map(&:index_name)
end
if Elastic::DataMigrationService.migration_has_finished?(:migrate_notes_to_separate_index) if Elastic::DataMigrationService.migration_has_finished?(:migrate_notes_to_separate_index)
index_names << helper.standalone_indices_proxies(target_classes: [Note]).map(&:index_name) index_names << helper.standalone_indices_proxies(target_classes: [Note]).map(&:index_name)
......
# frozen_string_literal: true # frozen_string_literal: true
class AddNewDataToIssuesDocuments < Elastic::Migration class AddNewDataToIssuesDocuments < Elastic::Migration
batched! include Elastic::MigrationObsolete
throttle_delay 5.minutes
QUERY_BATCH_SIZE = 5000
UPDATE_BATCH_SIZE = 100
def migrate
if completed?
log "Skipping adding visibility_level field to issues documents migration since it is already applied"
return
end
log "Adding visibility_level field to issues documents for batch of #{QUERY_BATCH_SIZE} documents"
query = {
size: QUERY_BATCH_SIZE,
query: {
bool: {
filter: issues_missing_visibility_level_filter
}
}
}
results = client.search(index: helper.target_index_name, body: query)
hits = results.dig('hits', 'hits') || []
document_references = hits.map do |hit|
id = hit.dig('_source', 'id')
es_id = hit.dig('_id')
es_parent = hit.dig('_source', 'join_field', 'parent')
# ensure that any issues missing from the database will be removed from Elasticsearch
# as the data is back-filled
Gitlab::Elastic::DocumentReference.new(Issue, id, es_id, es_parent)
end
document_references.each_slice(UPDATE_BATCH_SIZE) do |refs|
Elastic::ProcessBookkeepingService.track!(*refs)
end
log "Adding visibility_level field to issues documents is completed for batch of #{document_references.size} documents"
end
def completed?
query = {
size: 0,
aggs: {
issues: {
filter: issues_missing_visibility_level_filter
}
}
}
results = client.search(index: helper.target_index_name, body: query)
doc_count = results.dig('aggregations', 'issues', 'doc_count')
doc_count && doc_count == 0
end
private
def issues_missing_visibility_level_filter
{
bool: {
must_not: field_exists('visibility_level'),
filter: issue_type_filter
}
}
end
def issue_type_filter
{
term: {
type: {
value: 'issue'
}
}
}
end
def field_exists(field)
{
exists: {
field: field
}
}
end
end end
# frozen_string_literal: true # frozen_string_literal: true
class MigrateIssuesToSeparateIndex < Elastic::Migration class MigrateIssuesToSeparateIndex < Elastic::Migration
pause_indexing! include Elastic::MigrationObsolete
batched!
throttle_delay 1.minute
MAX_ATTEMPTS = 30
FIELDS = %w(
type
id
iid
title
description
created_at
updated_at
state
project_id
author_id
confidential
assignee_id
visibility_level
issues_access_level
).freeze
def migrate
# On initial batch we only create index
if migration_state[:slice].blank?
cleanup # support retries
log "Create standalone issues index under #{issues_index_name}"
helper.create_standalone_indices(target_classes: [Issue])
options = {
slice: 0,
retry_attempt: 0,
max_slices: get_number_of_shards
}
set_migration_state(options)
return
end
retry_attempt = migration_state[:retry_attempt].to_i
slice = migration_state[:slice]
max_slices = migration_state[:max_slices]
if retry_attempt >= MAX_ATTEMPTS
fail_migration_halt_error!(retry_attempt: retry_attempt)
return
end
if slice < max_slices
log "Launching reindexing for slice:#{slice} | max_slices:#{max_slices}"
response = reindex(slice: slice, max_slices: max_slices)
process_response(response)
log "Reindexing for slice:#{slice} | max_slices:#{max_slices} is completed with #{response.to_json}"
set_migration_state(
slice: slice + 1,
retry_attempt: retry_attempt,
max_slices: max_slices
)
end
rescue StandardError => e
log "migrate failed, increasing migration_state retry_attempt: #{retry_attempt} error:#{e.message}"
set_migration_state(
slice: slice,
retry_attempt: retry_attempt + 1,
max_slices: max_slices
)
raise e
end
def completed?
log "completed check: Refreshing #{issues_index_name}"
helper.refresh_index(index_name: issues_index_name)
original_count = original_issues_documents_count
new_count = new_issues_documents_count
log "Checking to see if migration is completed based on index counts: original_count:#{original_count}, new_count:#{new_count}"
original_count == new_count
end
private
def cleanup
helper.delete_index(index_name: issues_index_name) if helper.index_exists?(index_name: issues_index_name)
end
def reindex(slice:, max_slices:)
body = query(slice: slice, max_slices: max_slices)
client.reindex(body: body, wait_for_completion: true)
end
def process_response(response)
if response['failures'].present?
log_raise "Reindexing failed with #{response['failures']}"
end
if response['total'] != (response['updated'] + response['created'] + response['deleted'])
log_raise "Slice reindexing seems to have failed, total is not equal to updated + created + deleted"
end
end
def query(slice:, max_slices:)
{
source: {
index: default_index_name,
_source: FIELDS,
query: {
match: {
type: 'issue'
}
},
slice: {
id: slice,
max: max_slices
}
},
dest: {
index: issues_index_name
}
}
end
def original_issues_documents_count
query = {
size: 0,
aggs: {
issues: {
filter: {
term: {
type: {
value: 'issue'
}
}
}
}
}
}
results = client.search(index: default_index_name, body: query)
results.dig('aggregations', 'issues', 'doc_count')
end
def new_issues_documents_count
helper.documents_count(index_name: issues_index_name)
end
def get_number_of_shards
helper.get_settings.dig('number_of_shards').to_i
end
def default_index_name
helper.target_name
end
def issues_index_name
"#{default_index_name}-issues"
end
end end
# frozen_string_literal: true # frozen_string_literal: true
class DeleteIssuesFromOriginalIndex < Elastic::Migration class DeleteIssuesFromOriginalIndex < Elastic::Migration
batched! include Elastic::MigrationObsolete
throttle_delay 1.minute
MAX_ATTEMPTS = 30
QUERY_BODY = {
query: {
term: {
type: 'issue'
}
}
}.freeze
def migrate
retry_attempt = migration_state[:retry_attempt].to_i
if retry_attempt >= MAX_ATTEMPTS
fail_migration_halt_error!(retry_attempt: retry_attempt)
return
end
if completed?
log "Skipping removing issues from the original index since it is already applied"
return
end
response = client.delete_by_query(index: helper.target_name, body: QUERY_BODY)
log_raise "Failed to delete issues: #{response['failures']}" if response['failures'].present?
rescue StandardError => e
log "migrate failed, increasing migration_state retry_attempt: #{retry_attempt} error:#{e.class}:#{e.message}"
set_migration_state(
retry_attempt: retry_attempt + 1
)
raise e
end
def completed?
helper.refresh_index
results = client.search(index: helper.target_name, body: QUERY_BODY.merge(size: 0))
results.dig('hits', 'total', 'value') == 0
end
end end
...@@ -17,7 +17,7 @@ module Elastic ...@@ -17,7 +17,7 @@ module Elastic
end end
options[:features] = 'issues' options[:features] = 'issues'
options[:no_join_project] = Elastic::DataMigrationService.migration_has_finished?(:add_new_data_to_issues_documents) options[:no_join_project] = true
context.name(:issue) do context.name(:issue) do
query_hash = context.name(:authorized) { project_ids_filter(query_hash, options) } query_hash = context.name(:authorized) { project_ids_filter(query_hash, options) }
query_hash = context.name(:confidentiality) { confidentiality_filter(query_hash, options) } query_hash = context.name(:confidentiality) { confidentiality_filter(query_hash, options) }
......
...@@ -26,11 +26,7 @@ module Elastic ...@@ -26,11 +26,7 @@ module Elastic
private private
def generic_attributes def generic_attributes
if Elastic::DataMigrationService.migration_has_finished?(:migrate_issues_to_separate_index) super.except('join_field')
super.except('join_field')
else
super
end
end end
end end
end end
......
...@@ -8,7 +8,7 @@ namespace :gitlab do ...@@ -8,7 +8,7 @@ namespace :gitlab do
helper = Gitlab::Elastic::Helper.default helper = Gitlab::Elastic::Helper.default
indices = [helper.target_name] indices = [helper.target_name]
indices += helper.standalone_indices_proxies.map(&:index_name) if Elastic::DataMigrationService.migration_has_finished?(:migrate_issues_to_separate_index) indices += helper.standalone_indices_proxies.map(&:index_name)
indices.each do |index_name| indices.each do |index_name|
puts "===== Size stats for index: #{index_name} =====" puts "===== Size stats for index: #{index_name} ====="
pp helper.index_size(index_name: index_name).slice(*%w(docs store)) pp helper.index_size(index_name: index_name).slice(*%w(docs store))
......
# frozen_string_literal: true
require 'spec_helper'
require File.expand_path('ee/elastic/migrate/20201116142400_add_new_data_to_issues_documents.rb')
RSpec.describe AddNewDataToIssuesDocuments, :elastic, :sidekiq_inline do
let(:version) { 20201116142400 }
let(:migration) { described_class.new(version) }
let(:issues) { create_list(:issue, 3) }
before do
stub_ee_application_setting(elasticsearch_search: true, elasticsearch_indexing: true)
allow(Elastic::DataMigrationService).to receive(:migration_has_finished?)
.with(:migrate_issues_to_separate_index)
.and_return(false)
# ensure issues are indexed
issues
ensure_elasticsearch_index!
end
describe 'migration_options' do
it 'has migration options set', :aggregate_failures do
expect(migration.batched?).to be_truthy
expect(migration.throttle_delay).to eq(5.minutes)
end
end
describe '.migrate' do
subject { migration.migrate }
context 'when migration is already completed' do
before do
add_visibility_level_for_issues(issues)
end
it 'does not modify data', :aggregate_failures do
expect(::Elastic::ProcessBookkeepingService).not_to receive(:track!)
subject
end
end
context 'migration process' do
before do
remove_visibility_level_for_issues(issues)
end
it 'updates all issue documents' do
# track calls are batched in groups of 100
expect(::Elastic::ProcessBookkeepingService).to receive(:track!).once do |*tracked_refs|
expect(tracked_refs.count).to eq(3)
end
subject
end
it 'only updates issue documents missing visibility_level', :aggregate_failures do
issue = issues.first
add_visibility_level_for_issues(issues[1..-1])
expected = [Gitlab::Elastic::DocumentReference.new(Issue, issue.id, issue.es_id, issue.es_parent)]
expect(::Elastic::ProcessBookkeepingService).to receive(:track!).with(*expected).once
subject
end
it 'processes in batches', :aggregate_failures do
stub_const("#{described_class}::QUERY_BATCH_SIZE", 2)
stub_const("#{described_class}::UPDATE_BATCH_SIZE", 1)
expect(::Elastic::ProcessBookkeepingService).to receive(:track!).exactly(3).times.and_call_original
# cannot use subject in spec because it is memoized
migration.migrate
ensure_elasticsearch_index!
migration.migrate
end
end
end
describe '.completed?' do
subject { migration.completed? }
context 'when documents are missing visibility_level' do
before do
remove_visibility_level_for_issues(issues)
end
it { is_expected.to be_falsey }
end
context 'when no documents are missing visibility_level' do
before do
add_visibility_level_for_issues(issues)
end
it { is_expected.to be_truthy }
end
end
private
def add_visibility_level_for_issues(issues)
script = {
source: "ctx._source['visibility_level'] = params.visibility_level;",
lang: "painless",
params: {
visibility_level: Gitlab::VisibilityLevel::PRIVATE
}
}
update_by_query(issues, script)
end
def remove_visibility_level_for_issues(issues)
script = {
source: "ctx._source.remove('visibility_level')"
}
update_by_query(issues, script)
end
def update_by_query(issues, script)
issue_ids = issues.map { |i| i.id }
client = Issue.__elasticsearch__.client
client.update_by_query({
index: Issue.__elasticsearch__.index_name,
wait_for_completion: true, # run synchronously
refresh: true, # make operation visible to search
body: {
script: script,
query: {
bool: {
must: [
{
terms: {
id: issue_ids
}
},
{
term: {
type: {
value: 'issue'
}
}
}
]
}
}
}
})
end
end
# frozen_string_literal: true
require 'spec_helper'
require File.expand_path('ee/elastic/migrate/20201123123400_migrate_issues_to_separate_index.rb')
RSpec.describe MigrateIssuesToSeparateIndex, :elastic, :sidekiq_inline do
let(:version) { 20201123123400 }
let(:migration) { described_class.new(version) }
let(:issues) { create_list(:issue, 3) }
let(:index_name) { "#{es_helper.target_name}-issues" }
before do
allow(Elastic::DataMigrationService).to receive(:migration_has_finished?)
.with(:migrate_issues_to_separate_index)
.and_return(false)
stub_ee_application_setting(elasticsearch_search: true, elasticsearch_indexing: true)
issues
ensure_elasticsearch_index!
end
describe 'migration_options' do
it 'has migration options set', :aggregate_failures do
expect(migration.batched?).to be_truthy
expect(migration.throttle_delay).to eq(1.minute)
expect(migration.pause_indexing?).to be_truthy
end
end
describe '.migrate', :clean_gitlab_redis_shared_state do
context 'initial launch' do
before do
es_helper.delete_index(index_name: es_helper.target_index_name(target: index_name))
end
it 'creates an index and sets migration_state' do
expect { migration.migrate }.to change { es_helper.alias_exists?(name: index_name) }.from(false).to(true)
expect(migration.migration_state).to include(slice: 0, max_slices: 5)
end
end
context 'batch run' do
it 'migrates all issues' do
total_shards = es_helper.get_settings.dig('number_of_shards').to_i
migration.set_migration_state(slice: 0, max_slices: total_shards)
total_shards.times do |i|
migration.migrate
end
expect(migration.completed?).to be_truthy
expect(es_helper.documents_count(index_name: "#{es_helper.target_name}-issues")).to eq(issues.count)
end
end
context 'failed run' do
let(:client) { double('Elasticsearch::Transport::Client') }
before do
allow(migration).to receive(:client).and_return(client)
end
context 'exception is raised' do
before do
allow(client).to receive(:reindex).and_raise(StandardError)
end
it 'increases retry_attempt' do
migration.set_migration_state(slice: 0, max_slices: 2, retry_attempt: 1)
expect { migration.migrate }.to raise_error(StandardError)
expect(migration.migration_state).to match(slice: 0, max_slices: 2, retry_attempt: 2)
end
it 'fails the migration after too many attempts' do
migration.set_migration_state(slice: 0, max_slices: 2, retry_attempt: 30)
migration.migrate
expect(migration.migration_state).to match(slice: 0, max_slices: 2, retry_attempt: 30, halted: true, halted_indexing_unpaused: false)
expect(migration).not_to receive(:process_response)
end
end
context 'elasticsearch failures' do
context 'total is not equal' do
before do
allow(client).to receive(:reindex).and_return({ "total" => 60, "updated" => 0, "created" => 45, "deleted" => 0, "failures" => [] })
end
it 'raises an error' do
migration.set_migration_state(slice: 0, max_slices: 2)
expect { migration.migrate }.to raise_error(/total is not equal/)
end
end
context 'reindexing failues' do
before do
allow(client).to receive(:reindex).and_return({ "total" => 60, "updated" => 0, "created" => 0, "deleted" => 0, "failures" => [{ "type": "es_rejected_execution_exception" }] })
end
it 'raises an error' do
migration.set_migration_state(slice: 0, max_slices: 2)
expect { migration.migrate }.to raise_error(/failed with/)
end
end
end
end
end
describe '.completed?' do
subject { migration.completed? }
before do
2.times do |slice|
migration.set_migration_state(slice: slice, max_slices: 2)
migration.migrate
end
end
context 'counts are equal' do
let(:issues_count) { issues.count }
it 'returns true' do
is_expected.to be_truthy
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
require File.expand_path('ee/elastic/migrate/20210112165500_delete_issues_from_original_index.rb')
RSpec.describe DeleteIssuesFromOriginalIndex, :elastic, :sidekiq_inline do
let(:version) { 20210112165500 }
let(:migration) { described_class.new(version) }
let(:helper) { Gitlab::Elastic::Helper.new }
before do
stub_ee_application_setting(elasticsearch_search: true, elasticsearch_indexing: true)
allow(migration).to receive(:helper).and_return(helper)
end
describe 'migration_options' do
it 'has migration options set', :aggregate_failures do
expect(migration.batched?).to be_truthy
expect(migration.throttle_delay).to eq(1.minute)
end
end
context 'issues are already deleted' do
it 'does not execute delete_by_query' do
expect(migration.completed?).to be_truthy
expect(helper.client).not_to receive(:delete_by_query)
migration.migrate
end
end
context 'issues are still present in the index' do
let(:issues) { create_list(:issue, 3) }
before do
allow(Elastic::DataMigrationService).to receive(:migration_has_finished?)
.with(:migrate_issues_to_separate_index)
.and_return(false)
# ensure issues are indexed
issues
ensure_elasticsearch_index!
end
it 'removes issues from the index' do
expect { migration.migrate }.to change { migration.completed? }.from(false).to(true)
end
end
context 'migration fails' do
let(:client) { double('Elasticsearch::Transport::Client') }
before do
allow(migration).to receive(:client).and_return(client)
allow(migration).to receive(:completed?).and_return(false)
end
context 'exception is raised' do
before do
allow(client).to receive(:delete_by_query).and_raise(StandardError)
end
it 'increases retry_attempt' do
migration.set_migration_state(retry_attempt: 1)
expect { migration.migrate }.to raise_error(StandardError)
expect(migration.migration_state).to match(retry_attempt: 2)
end
it 'fails the migration after too many attempts' do
migration.set_migration_state(retry_attempt: 30)
migration.migrate
expect(migration.migration_state).to match(retry_attempt: 30, halted: true, halted_indexing_unpaused: false)
expect(client).not_to receive(:delete_by_query)
end
end
context 'es responds with errors' do
before do
allow(client).to receive(:delete_by_query).and_return('failures' => ['failed'])
end
it 'raises an error and increases retry attempt' do
expect { migration.migrate }.to raise_error(/Failed to delete issues/)
expect(migration.migration_state).to match(retry_attempt: 1)
end
end
end
end
...@@ -18,12 +18,6 @@ RSpec.describe Search::GlobalService do ...@@ -18,12 +18,6 @@ RSpec.describe Search::GlobalService do
let(:service) { described_class.new(user, params) } let(:service) { described_class.new(user, params) }
end end
context 'issue search' do
let(:results) { described_class.new(nil, search: '*').execute.objects('issues') }
it_behaves_like 'search query applies joins based on migrations shared examples', :add_new_data_to_issues_documents
end
context 'notes search' do context 'notes search' do
let(:results) { described_class.new(nil, search: '*').execute.objects('notes') } let(:results) { described_class.new(nil, search: '*').execute.objects('notes') }
...@@ -218,38 +212,14 @@ RSpec.describe Search::GlobalService do ...@@ -218,38 +212,14 @@ RSpec.describe Search::GlobalService do
let(:scope) { 'issues' } let(:scope) { 'issues' }
let(:search) { issue.title } let(:search) { issue.title }
context 'when add_new_data_to_issues_documents migration is finished' do let!(:issue) { create :issue, project: project }
let!(:issue) { create :issue, project: project }
where(:project_level, :feature_access_level, :membership, :admin_mode, :expected_count) do
permission_table_for_guest_feature_access
end
with_them do where(:project_level, :feature_access_level, :membership, :admin_mode, :expected_count) do
it_behaves_like 'search respects visibility' permission_table_for_guest_feature_access
end
end end
# Since newly created indices automatically have all migrations as with_them do
# finished we need a test to verify the old style searches work for it_behaves_like 'search respects visibility'
# instances which haven't finished the migration yet
context 'when add_new_data_to_issues_documents migration is not finished' do
before do
set_elasticsearch_migration_to :add_new_data_to_issues_documents, including: false
end
# issue cannot be defined prior to the migration mocks because it
# will cause the incorrect value to be passed to `use_separate_indices` when creating
# the proxy
let!(:issue) { create(:issue, project: project) }
where(:project_level, :feature_access_level, :membership, :admin_mode, :expected_count) do
permission_table_for_guest_feature_access
end
with_them do
it_behaves_like 'search respects visibility'
end
end end
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment