Commit cf380d26 authored by Dylan Griffith's avatar Dylan Griffith

Merge branch '263365-add-upvotes-field-to-merge-requests-index' into 'master'

Add upvotes to merge requests documents

See merge request gitlab-org/gitlab!66671
parents 9bcb0239 f00de59c
......@@ -66,3 +66,5 @@ class AwardEmoji < ApplicationRecord
awardable.try(:update_upvotes_count) if upvote?
end
end
AwardEmoji.prepend_mod_with('AwardEmoji')
......@@ -1918,6 +1918,20 @@ class MergeRequest < ApplicationRecord
end
end
def lazy_upvotes_count
BatchLoader.for(id).batch(default_value: 0) do |ids, loader|
counts = AwardEmoji
.where(awardable_id: ids)
.upvotes
.group(:awardable_id)
.count
counts.each do |id, count|
loader.call(id, count)
end
end
end
private
def set_draft_status
......
# frozen_string_literal: true
module Preloaders
class MergeRequestsPreloader
attr_reader :merge_requests
def initialize(merge_requests)
@merge_requests = merge_requests
end
def execute
preloader = ActiveRecord::Associations::Preloader.new
preloader.preload(merge_requests, { target_project: [:project_feature] })
merge_requests.each do |merge_request|
merge_request.lazy_upvotes_count
end
end
end
end
......@@ -233,6 +233,11 @@ Any data or index cleanup needed to support migration retries should be handled
will re-enqueue itself with a delay which is set using the `throttle_delay` option described below. The batching
must be handled within the `migrate` method, this setting controls the re-enqueuing only.
- `batch_size` - Sets the number of documents modified during a `batched!` migration run. This size should be set to a value which allows the updates
enough time to finish. This can be tuned in combination with the `throttle_delay` option described below. The batching
must be handled within a custom `migrate` method or by using the [`Elastic::MigrationBackfillHelper`](https://gitlab.com/gitlab-org/gitlab/-/blob/master/ee/app/workers/concerns/elastic/migration_backfill_helper.rb)
`migrate` method which uses this setting. Default value is 1000 documents.
- `throttle_delay` - Sets the wait time in between batch runs. This time should be set high enough to allow each migration batch
enough time to finish. Additionally, the time should be less than 30 minutes since that is how often the
[`Elastic::MigrationWorker`](https://gitlab.com/gitlab-org/gitlab/-/blob/master/ee/app/workers/elastic/migration_worker.rb)
......
# frozen_string_literal: true
module Elastic
module AwardEmojisSearch
extend ActiveSupport::Concern
UPDATE_ELASTIC_ASSOCIATIONS_FOR = [::MergeRequest].freeze
included do
if self < ActiveRecord::Base
after_commit :update_elastic_associations, on: [:create, :destroy]
end
end
private
def update_elastic_associations
return unless UPDATE_ELASTIC_ASSOCIATIONS_FOR.any? { |model| awardable.is_a?(model) }
return unless awardable.maintaining_elasticsearch?
awardable.maintain_elasticsearch_update
end
end
end
# frozen_string_literal: true
module EE
module AwardEmoji
extend ActiveSupport::Concern
prepended do
include Elastic::AwardEmojisSearch
end
end
end
......@@ -5,7 +5,7 @@ module Elastic
attr_reader :version, :name, :filename
delegate :migrate, :skip_migration?, :completed?, :batched?, :throttle_delay, :pause_indexing?,
:space_requirements?, :space_required_bytes, :obsolete?,
:space_requirements?, :space_required_bytes, :obsolete?, :batch_size,
to: :migration
ELASTICSEARCH_SIZE = 25
......
......@@ -2,15 +2,23 @@
module Elastic
module MigrationBackfillHelper
UPDATE_BATCH_SIZE = 100
def migrate
update_mapping!(index_name, { properties: new_mappings }) if respond_to?(:new_mappings)
# only create mappings on the first run
if respond_to?(:new_mappings) && migration_state[:update_mappings].blank?
update_mapping!(index_name, { properties: new_mappings })
options = { update_mappings: true }
set_migration_state(options)
end
if completed?
log "Skipping adding #{field_name} field to #{index_name} documents migration since it is already applied"
return
end
log "Adding #{field_name} field to #{index_name} documents for batch of #{self.class::QUERY_BATCH_SIZE} documents"
log "Adding #{field_name} field to #{index_name} documents for batch of #{query_batch_size} documents"
document_references = process_batch!
......@@ -63,7 +71,7 @@ module Elastic
def process_batch!
query = {
size: self.class::QUERY_BATCH_SIZE,
size: query_batch_size,
query: {
bool: {
filter: missing_field_filter
......@@ -82,11 +90,28 @@ module Elastic
Gitlab::Elastic::DocumentReference.new(self.class::DOCUMENT_TYPE, id, es_id, es_parent)
end
document_references.each_slice(self.class::UPDATE_BATCH_SIZE) do |refs|
document_references.each_slice(update_batch_size) do |refs|
Elastic::ProcessInitialBookkeepingService.track!(*refs)
end
document_references
end
def query_batch_size
return self.class::QUERY_BATCH_SIZE if self.class.const_defined?(:QUERY_BATCH_SIZE)
return batch_size if respond_to?(:batch_size)
raise NotImplemented
end
def update_batch_size
return self.class::UPDATE_BATCH_SIZE if self.class.const_defined?(:UPDATE_BATCH_SIZE)
UPDATE_BATCH_SIZE
end
def update_mapping!(index_name, mappings)
helper.update_mapping(index_name: index_name, mappings: mappings)
end
end
end
......@@ -102,9 +102,5 @@ module Elastic
}
}
end
def update_mapping!(index_name, mappings)
helper.update_mapping(index_name: index_name, mappings: mappings)
end
end
end
......@@ -6,11 +6,16 @@ module Elastic
include Gitlab::ClassAttributes
DEFAULT_THROTTLE_DELAY = 5.minutes
DEFAULT_BATCH_SIZE = 1000
def batched?
self.class.get_batched
end
def batch_size
self.class.get_batch_size
end
def throttle_delay
self.class.get_throttle_delay
end
......@@ -55,6 +60,14 @@ module Elastic
def get_throttle_delay
class_attributes[:throttle_delay] || DEFAULT_THROTTLE_DELAY
end
def batch_size(value)
class_attributes[:batch_size] = value
end
def get_batch_size
class_attributes[:batch_size] || DEFAULT_BATCH_SIZE
end
end
end
end
# frozen_string_literal: true
class AddUpvotesToMergeRequests < Elastic::Migration
include Elastic::MigrationBackfillHelper
batched!
batch_size 5000
throttle_delay 3.minutes
DOCUMENT_TYPE = MergeRequest
def new_mappings
{ upvotes: { type: 'integer' } }
end
private
def index_name
DOCUMENT_TYPE.__elasticsearch__.index_name
end
def field_name
:upvotes
end
end
# frozen_string_literal: true
class AddNamespaceAncestryToIssuesMapping < Elastic::Migration
include Elastic::MigrationHelper
include Elastic::MigrationBackfillHelper
DOCUMENT_KLASS = Issue
......
......@@ -30,7 +30,7 @@ module Elastic
# rubocop: disable CodeReuse/ActiveRecord
def preload_indexing_data(relation)
relation.includes(target_project: [:project_feature])
Preloaders::MergeRequestsPreloader.new(relation).execute
end
# rubocop: enable CodeReuse/ActiveRecord
......
......@@ -41,6 +41,7 @@ module Elastic
indexes :visibility_level, type: :integer
indexes :merge_requests_access_level, type: :integer
indexes :upvotes, type: :integer
end
end
end
......
......@@ -29,6 +29,7 @@ module Elastic
data['visibility_level'] = target.project.visibility_level
data['merge_requests_access_level'] = safely_read_project_feature_for_elasticsearch(:merge_requests)
data['upvotes'] = target.lazy_upvotes_count
data.merge(generic_attributes)
end
......
# frozen_string_literal: true
require 'spec_helper'
require File.expand_path('ee/elastic/migrate/20210722112600_add_upvotes_to_merge_requests.rb')
RSpec.describe AddUpvotesToMergeRequests, :elastic, :sidekiq_inline do
let(:version) { 20210722112600 }
let(:migration) { described_class.new(version) }
let(:merge_requests) { create_list(:merge_request, 3, :unique_branches) }
before do
stub_ee_application_setting(elasticsearch_search: true, elasticsearch_indexing: true)
# ensure merge requests are indexed
merge_requests
ensure_elasticsearch_index!
end
describe 'migration_options' do
it 'has migration options set', :aggregate_failures do
expect(migration).to be_batched
expect(migration.throttle_delay).to eq(3.minutes)
expect(migration.batch_size).to eq(5000)
end
end
describe '.migrate' do
subject { migration.migrate }
context 'when migration is already completed' do
it 'does not modify data' do
expect(::Elastic::ProcessInitialBookkeepingService).not_to receive(:track!)
subject
end
end
context 'migration process' do
before do
remove_upvotes_from_merge_requests(merge_requests)
end
it 'updates all merge request documents' do
# track calls are batched in groups of 100
expect(::Elastic::ProcessInitialBookkeepingService).to receive(:track!).once do |*tracked_refs|
expect(tracked_refs.count).to eq(3)
end
subject
end
it 'only updates merge request documents missing upvotes', :aggregate_failures do
merge_request = merge_requests.first
add_upvotes_for_merge_requests(merge_requests[1..-1])
expected = [Gitlab::Elastic::DocumentReference.new(MergeRequest, merge_request.id, merge_request.es_id, merge_request.es_parent)]
expect(::Elastic::ProcessInitialBookkeepingService).to receive(:track!).with(*expected).once
subject
end
it 'processes in batches', :aggregate_failures do
allow(migration).to receive(:batch_size).and_return(2)
stub_const('::Elastic::MigrationBackfillHelper::UPDATE_BATCH_SIZE', 1)
expect(::Elastic::ProcessInitialBookkeepingService).to receive(:track!).exactly(3).times.and_call_original
# cannot use subject in spec because it is memoized
migration.migrate
ensure_elasticsearch_index!
migration.migrate
end
it 'only updates mappings on the first run' do
helper = Gitlab::Elastic::Helper.new
allow(Gitlab::Elastic::Helper).to receive(:default).and_return(helper)
allow(migration).to receive(:batch_size).and_return(2)
expect(helper).to receive(:update_mapping).and_call_original
# cannot use subject in spec because it is memoized
migration.migrate
ensure_elasticsearch_index!
expect(migration.migration_state).to include(update_mappings: true)
expect(helper).not_to receive(:update_mapping)
migration.migrate
end
end
end
describe '.completed?' do
context 'when documents are missing upvotes' do
before do
remove_upvotes_from_merge_requests(merge_requests)
end
specify { expect(migration).not_to be_completed }
end
context 'when no documents are missing upvotes' do
specify { expect(migration).to be_completed }
end
end
private
def add_upvotes_for_merge_requests(merge_requests)
script = {
source: "ctx._source['upvotes'] = params.upvotes;",
lang: "painless",
params: {
upvotes: 0
}
}
update_by_query(merge_requests, script)
end
def remove_upvotes_from_merge_requests(merge_requests)
script = {
source: "ctx._source.remove('upvotes')"
}
update_by_query(merge_requests, script)
end
def update_by_query(merge_requests, script)
merge_request_ids = merge_requests.map(&:id)
client = MergeRequest.__elasticsearch__.client
client.update_by_query({
index: MergeRequest.__elasticsearch__.index_name,
wait_for_completion: true, # run synchronously
refresh: true, # make operation visible to search
body: {
script: script,
query: {
bool: {
must: [
{
terms: {
id: merge_request_ids
}
}
]
}
}
}
})
end
end
......@@ -72,6 +72,7 @@ RSpec.describe MergeRequest, :elastic do
it "returns json with all needed elements" do
merge_request = create :merge_request
merge_request.project.update!(visibility_level: Gitlab::VisibilityLevel::INTERNAL)
create(:award_emoji, :upvote, awardable: merge_request)
expected_hash = merge_request.attributes.extract!(
'id',
......@@ -92,7 +93,8 @@ RSpec.describe MergeRequest, :elastic do
'type' => merge_request.es_type,
'merge_requests_access_level' => ProjectFeature::ENABLED,
'visibility_level' => Gitlab::VisibilityLevel::INTERNAL,
'project_id' => merge_request.target_project.id
'project_id' => merge_request.target_project.id,
'upvotes' => 1
})
expect(merge_request.__elasticsearch__.as_indexed_json).to eq(expected_hash)
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe AwardEmoji do
describe '#update_elastic_associations' do
# cannot test with issue awardable because maintain_elaticsearch_update
# is called when the upvotes_count column on the issues table is updated
let_it_be(:note) { create(:note) }
let_it_be(:merge_request) { create(:merge_request) }
context 'maintaining_elasticsearch is true' do
before do
allow(note).to receive(:maintaining_elasticsearch?).and_return(true)
allow(merge_request).to receive(:maintaining_elasticsearch?).and_return(true)
end
it 'calls maintain_elasticsearch_update on create' do
expect(merge_request).to receive(:maintain_elasticsearch_update)
create(:award_emoji, :upvote, awardable: merge_request)
end
it 'calls maintain_elasticsearch_update on destroy' do
award_emoji = create(:award_emoji, :upvote, awardable: merge_request)
expect(merge_request).to receive(:maintain_elasticsearch_update)
award_emoji.destroy!
end
it 'does nothing for other awardable_type' do
expect(note).not_to receive(:maintain_elasticsearch_update)
create(:award_emoji, :upvote, awardable: note)
end
end
context 'maintaining_elasticsearch is false' do
it 'does not call maintain_elasticsearch_update' do
expect(merge_request).not_to receive(:maintain_elasticsearch_update)
award_emoji = create(:award_emoji, :upvote, awardable: merge_request)
expect(merge_request).not_to receive(:maintain_elasticsearch_update)
award_emoji.destroy!
end
end
end
end
......@@ -15,9 +15,9 @@ module Elastic
def setup
clear_tracking!
delete_indices!
helper.create_empty_index(options: { settings: { number_of_replicas: 0 } })
helper.create_migrations_index
::Elastic::DataMigrationService.mark_all_as_completed!
helper.create_empty_index(options: { settings: { number_of_replicas: 0 } })
helper.create_standalone_indices
refresh_elasticsearch_index!
end
......
......@@ -48,4 +48,18 @@ RSpec.describe Elastic::MigrationOptions do
expect(subject).to eq(30.seconds)
end
end
describe '#batch_size' do
subject { migration_class.new.batch_size }
it 'has a default' do
expect(subject).to eq(described_class::DEFAULT_BATCH_SIZE)
end
it 'respects when batch_size is set for the class' do
migration_class.batch_size 10000
expect(subject).to eq(10000)
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Preloaders::MergeRequestsPreloader do
describe '#execute' do
let_it_be_with_refind(:merge_requests) { create_list(:merge_request, 3) }
let_it_be(:upvotes) { merge_requests.each { |m| create(:award_emoji, :upvote, awardable: m) } }
it 'does not make n+1 queries' do
described_class.new(merge_requests).execute
control = ActiveRecord::QueryRecorder.new(skip_cached: false) do
# expectations make sure the queries execute
merge_requests.each do |m|
expect(m.target_project.project_feature).not_to be_nil
expect(m.lazy_upvotes_count).to eq(1)
end
end
# 1 query for BatchLoader to load all upvotes at once
expect(control.count).to eq(1)
end
it 'runs extra queries without preloading' do
control = ActiveRecord::QueryRecorder.new(skip_cached: false) do
# expectations make sure the queries execute
merge_requests.each do |m|
expect(m.target_project.project_feature).not_to be_nil
expect(m.lazy_upvotes_count).to eq(1)
end
end
# 4 queries per merge request =
# 1 to load merge request
# 1 to load project
# 1 to load project_feature
# 1 to load upvotes count
expect(control.count).to eq(4 * merge_requests.size)
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment