Commit db91f1cd authored by Terri Chu's avatar Terri Chu Committed by Dylan Griffith

Migration to add new data to issues documents

Backfill issues documents with missing permissions
for project and issue permissions. Adds batch
capabilities to the Elastic::MigrationWorker
to support modifying large amounts of documents.
parent 36513a0e
...@@ -216,6 +216,29 @@ cron worker sequentially. ...@@ -216,6 +216,29 @@ cron worker sequentially.
Any update to the Elastic index mappings should be replicated in [`Elastic::Latest::Config`](https://gitlab.com/gitlab-org/gitlab/-/blob/master/ee/lib/elastic/latest/config.rb). Any update to the Elastic index mappings should be replicated in [`Elastic::Latest::Config`](https://gitlab.com/gitlab-org/gitlab/-/blob/master/ee/lib/elastic/latest/config.rb).
### Migration options supported by the [`Elastic::MigrationWorker`](https://gitlab.com/gitlab-org/gitlab/blob/master/ee/app/workers/elastic/migration_worker.rb)
- `batched!` - Allow the migration to run in batches. If set, the [`Elastic::MigrationWorker`](https://gitlab.com/gitlab-org/gitlab/blob/master/ee/app/workers/elastic/migration_worker.rb)
will re-enqueue itself with a delay which is set using the `throttle_delay` option described below. The batching
must be handled within the `migrate` method, this setting controls the re-enqueuing only.
- `throttle_delay` - Sets the wait time in between batch runs. This time should be set high enough to allow each migration batch
enough time to finish. Additionally, the time should be less than 30 minutes since that is how often the
[`Elastic::MigrationWorker`](https://gitlab.com/gitlab-org/gitlab/blob/master/ee/app/workers/elastic/migration_worker.rb)
cron worker runs. Default value is 5 minutes.
```ruby
# frozen_string_literal: true
class BatchedMigrationName < Elastic::Migration
# Declares a migration should be run in batches
batched!
throttle_delay 10.minutes
# ...
end
```
## Performance Monitoring ## Performance Monitoring
### Prometheus ### Prometheus
......
...@@ -4,7 +4,7 @@ module Elastic ...@@ -4,7 +4,7 @@ module Elastic
class MigrationRecord class MigrationRecord
attr_reader :version, :name, :filename attr_reader :version, :name, :filename
delegate :migrate, :skip_migration?, :completed?, to: :migration delegate :migrate, :skip_migration?, :completed?, :batched?, :throttle_delay, to: :migration
def initialize(version:, name:, filename:) def initialize(version:, name:, filename:)
@version = version @version = version
......
# frozen_string_literal: true
module Elastic
module MigrationOptions
extend ActiveSupport::Concern
include Gitlab::ClassAttributes
DEFAULT_THROTTLE_DELAY = 5.minutes
def batched?
self.class.get_batched
end
def throttle_delay
self.class.get_throttle_delay
end
class_methods do
def batched!
class_attributes[:batched] = true
end
def get_batched
class_attributes[:batched]
end
def throttle_delay(value)
class_attributes[:throttle_delay] = value
end
def get_throttle_delay
class_attributes[:throttle_delay] || DEFAULT_THROTTLE_DELAY
end
end
end
end
...@@ -42,11 +42,16 @@ module Elastic ...@@ -42,11 +42,16 @@ module Elastic
private private
def execute_migration(migration) def execute_migration(migration)
if migration.persisted? if migration.persisted? && !migration.batched?
logger.info "MigrationWorker: migration[#{migration.name}] did not execute migrate method since it was already executed. Waiting for migration to complete" logger.info "MigrationWorker: migration[#{migration.name}] did not execute migrate method since it was already executed. Waiting for migration to complete"
else else
logger.info "MigrationWorker: migration[#{migration.name}] executing migrate method" logger.info "MigrationWorker: migration[#{migration.name}] executing migrate method"
migration.migrate migration.migrate
if migration.batched? && !migration.completed?
logger.info "MigrationWorker: migration[#{migration.name}] kicking off next migration batch"
Elastic::MigrationWorker.perform_in(migration.throttle_delay)
end
end end
end end
......
# frozen_string_literal: true
class AddNewDataToIssuesDocuments < Elastic::Migration
batched!
throttle_delay 5.minutes
QUERY_BATCH_SIZE = 5000
UPDATE_BATCH_SIZE = 100
def migrate
if completed?
log "Skipping adding visibility_level field to issues documents migration since it is already applied"
return
end
log "Adding visibility_level field to issues documents for batch of #{QUERY_BATCH_SIZE} documents"
query = {
size: QUERY_BATCH_SIZE,
query: {
bool: {
filter: issues_missing_visibility_level_filter
}
}
}
results = client.search(index: helper.target_index_name, body: query)
hits = results.dig('hits', 'hits') || []
document_references = hits.map do |hit|
id = hit.dig('_source', 'id')
es_id = hit.dig('_id')
es_parent = hit.dig('_source', 'join_field', 'parent')
# ensure that any issues missing from the database will be removed from Elasticsearch
# as the data is back-filled
Gitlab::Elastic::DocumentReference.new(Issue, id, es_id, es_parent)
end
document_references.each_slice(UPDATE_BATCH_SIZE) do |refs|
Elastic::ProcessBookkeepingService.track!(*refs)
end
log "Adding visibility_level field to issues documents is completed for batch of #{document_references.size} documents"
end
def completed?
query = {
size: 0,
aggs: {
issues: {
filter: issues_missing_visibility_level_filter
}
}
}
results = client.search(index: helper.target_index_name, body: query)
doc_count = results.dig('aggregations', 'issues', 'doc_count')
doc_count && doc_count == 0
end
private
def issues_missing_visibility_level_filter
{
bool: {
must_not: field_exists('visibility_level'),
filter: issue_type_filter
}
}
end
def issue_type_filter
{
term: {
type: {
value: 'issue'
}
}
}
end
def field_exists(field)
{
exists: {
field: field
}
}
end
end
...@@ -2,6 +2,8 @@ ...@@ -2,6 +2,8 @@
module Elastic module Elastic
class Migration class Migration
include Elastic::MigrationOptions
attr_reader :version attr_reader :version
def initialize(version) def initialize(version)
......
# frozen_string_literal: true
require 'spec_helper'
require File.expand_path('ee/elastic/migrate/20201116142400_add_new_data_to_issues_documents.rb')
RSpec.describe AddNewDataToIssuesDocuments, :elastic, :sidekiq_inline do
let(:version) { 20201116142400 }
let(:migration) { described_class.new(version) }
let(:issues) { create_list(:issue, 3) }
before do
stub_ee_application_setting(elasticsearch_search: true, elasticsearch_indexing: true)
# ensure issues are indexed
issues
ensure_elasticsearch_index!
end
describe 'migration_options' do
it 'has migration options set', :aggregate_failures do
expect(migration.batched?).to be_truthy
expect(migration.throttle_delay).to eq(5.minutes)
end
end
describe '.migrate' do
subject { migration.migrate }
context 'when migration is already completed' do
before do
add_visibility_level_for_issues(issues)
end
it 'does not modify data', :aggregate_failures do
expect(::Elastic::ProcessBookkeepingService).not_to receive(:track!)
subject
end
end
context 'migration process' do
before do
remove_visibility_level_for_issues(issues)
end
it 'updates all issue documents' do
# track calls are batched in groups of 100
expect(::Elastic::ProcessBookkeepingService).to receive(:track!).once do |*tracked_refs|
expect(tracked_refs.count).to eq(3)
end
subject
end
it 'only updates issue documents missing visibility_level', :aggregate_failures do
issue = issues.first
add_visibility_level_for_issues(issues[1..-1])
expected = [Gitlab::Elastic::DocumentReference.new(Issue, issue.id, issue.es_id, issue.es_parent)]
expect(::Elastic::ProcessBookkeepingService).to receive(:track!).with(*expected).once
subject
end
it 'processes in batches', :aggregate_failures do
stub_const("#{described_class}::QUERY_BATCH_SIZE", 2)
stub_const("#{described_class}::UPDATE_BATCH_SIZE", 1)
expect(::Elastic::ProcessBookkeepingService).to receive(:track!).exactly(3).times.and_call_original
# cannot use subject in spec because it is memoized
migration.migrate
ensure_elasticsearch_index!
migration.migrate
end
end
end
describe '.completed?' do
subject { migration.completed? }
context 'when documents are missing visibility_level' do
before do
remove_visibility_level_for_issues(issues)
end
it { is_expected.to be_falsey }
end
context 'when no documents are missing visibility_level' do
before do
add_visibility_level_for_issues(issues)
end
it { is_expected.to be_truthy }
end
end
private
def add_visibility_level_for_issues(issues)
script = {
source: "ctx._source['visibility_level'] = params.visibility_level;",
lang: "painless",
params: {
visibility_level: Gitlab::VisibilityLevel::PRIVATE
}
}
update_by_query(issues, script)
end
def remove_visibility_level_for_issues(issues)
script = {
source: "ctx._source.remove('visibility_level')"
}
update_by_query(issues, script)
end
def update_by_query(issues, script)
issue_ids = issues.map { |i| i.id }
client = Issue.__elasticsearch__.client
client.update_by_query({
index: Issue.__elasticsearch__.index_name,
wait_for_completion: true, # run synchronously
refresh: true, # make operation visible to search
body: {
script: script,
query: {
bool: {
must: [
{
terms: {
id: issue_ids
}
},
{
term: {
type: {
value: 'issue'
}
}
}
]
}
}
}
})
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Elastic::MigrationOptions do
let(:migration_class) do
Class.new do
include Elastic::MigrationOptions
end
end
describe '#batched?' do
subject { migration_class.new.batched? }
it 'defaults to false' do
expect(subject).to be_falsey
end
it 'respects when batched! is set for the class' do
migration_class.batched!
expect(subject).to be_truthy
end
end
describe '#throttle_delay' do
subject { migration_class.new.throttle_delay }
it 'has a default' do
expect(subject).to eq(described_class::DEFAULT_THROTTLE_DELAY)
end
it 'respects when throttle_delay is set for the class' do
migration_class.throttle_delay 30.seconds
expect(subject).to eq(30.seconds)
end
end
end
...@@ -18,8 +18,12 @@ RSpec.describe Elastic::MigrationWorker, :elastic do ...@@ -18,8 +18,12 @@ RSpec.describe Elastic::MigrationWorker, :elastic do
end end
context 'indexing is enabled' do context 'indexing is enabled' do
let(:migration) { Elastic::DataMigrationService.migrations.first }
before do before do
stub_ee_application_setting(elasticsearch_indexing: true) stub_ee_application_setting(elasticsearch_indexing: true)
allow(subject).to receive(:current_migration).and_return(migration)
end end
it 'creates an index if it does not exist' do it 'creates an index if it does not exist' do
...@@ -41,25 +45,28 @@ RSpec.describe Elastic::MigrationWorker, :elastic do ...@@ -41,25 +45,28 @@ RSpec.describe Elastic::MigrationWorker, :elastic do
end end
context 'migration process' do context 'migration process' do
let(:migration) { Elastic::DataMigrationService.migrations.first }
before do before do
allow(subject).to receive(:current_migration).and_return(migration)
allow(migration).to receive(:persisted?).and_return(persisted) allow(migration).to receive(:persisted?).and_return(persisted)
allow(migration).to receive(:completed?).and_return(completed) allow(migration).to receive(:completed?).and_return(completed)
allow(migration).to receive(:batched?).and_return(batched)
end end
using RSpec::Parameterized::TableSyntax using RSpec::Parameterized::TableSyntax
where(:persisted, :completed, :execute_migration) do # completed is evaluated after migrate method is executed
false | false | true where(:persisted, :completed, :execute_migration, :batched) do
false | true | true false | false | true | false
true | false | false false | true | true | false
true | true | false false | false | true | true
false | true | true | true
true | false | false | false
true | true | false | false
true | false | true | true
true | true | true | true
end end
with_them do with_them do
it 'calls migration only when needed' do it 'calls migration only when needed', :aggregate_failures do
if execute_migration if execute_migration
expect(migration).to receive(:migrate).once expect(migration).to receive(:migrate).once
else else
...@@ -71,6 +78,18 @@ RSpec.describe Elastic::MigrationWorker, :elastic do ...@@ -71,6 +78,18 @@ RSpec.describe Elastic::MigrationWorker, :elastic do
subject.perform subject.perform
end end
it 'handles batched migrations' do
if batched && !completed
# default throttle_delay is 5.minutes
expect( Elastic::MigrationWorker).to receive(:perform_in)
.with(5.minutes)
else
expect( Elastic::MigrationWorker).not_to receive(:perform_in)
end
subject.perform
end
end end
end end
end end
......
...@@ -21,6 +21,8 @@ module Quality ...@@ -21,6 +21,8 @@ module Quality
config config
db db
dependencies dependencies
elastic
elastic_integration
factories factories
finders finders
frontend frontend
...@@ -46,7 +48,6 @@ module Quality ...@@ -46,7 +48,6 @@ module Quality
validators validators
views views
workers workers
elastic_integration
tooling tooling
], ],
integration: %w[ integration: %w[
......
...@@ -28,7 +28,7 @@ RSpec.describe Quality::TestLevel do ...@@ -28,7 +28,7 @@ RSpec.describe Quality::TestLevel do
context 'when level is unit' do context 'when level is unit' do
it 'returns a pattern' do it 'returns a pattern' do
expect(subject.pattern(:unit)) expect(subject.pattern(:unit))
.to eq("spec/{bin,channels,config,db,dependencies,factories,finders,frontend,graphql,haml_lint,helpers,initializers,javascripts,lib,models,policies,presenters,rack_servers,replicators,routing,rubocop,serializers,services,sidekiq,support_specs,tasks,uploaders,validators,views,workers,elastic_integration,tooling}{,/**/}*_spec.rb") .to eq("spec/{bin,channels,config,db,dependencies,elastic,elastic_integration,factories,finders,frontend,graphql,haml_lint,helpers,initializers,javascripts,lib,models,policies,presenters,rack_servers,replicators,routing,rubocop,serializers,services,sidekiq,support_specs,tasks,uploaders,validators,views,workers,tooling}{,/**/}*_spec.rb")
end end
end end
...@@ -103,7 +103,7 @@ RSpec.describe Quality::TestLevel do ...@@ -103,7 +103,7 @@ RSpec.describe Quality::TestLevel do
context 'when level is unit' do context 'when level is unit' do
it 'returns a regexp' do it 'returns a regexp' do
expect(subject.regexp(:unit)) expect(subject.regexp(:unit))
.to eq(%r{spec/(bin|channels|config|db|dependencies|factories|finders|frontend|graphql|haml_lint|helpers|initializers|javascripts|lib|models|policies|presenters|rack_servers|replicators|routing|rubocop|serializers|services|sidekiq|support_specs|tasks|uploaders|validators|views|workers|elastic_integration|tooling)}) .to eq(%r{spec/(bin|channels|config|db|dependencies|elastic|elastic_integration|factories|finders|frontend|graphql|haml_lint|helpers|initializers|javascripts|lib|models|policies|presenters|rack_servers|replicators|routing|rubocop|serializers|services|sidekiq|support_specs|tasks|uploaders|validators|views|workers|tooling)})
end end
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment