Commit 4523cbbb authored by Jan Provaznik's avatar Jan Provaznik

Merge branch '205178-change-repository-indexing-to-sorted-sets-algorithm' into 'master'

Enable de-duplication of the ElasticCommitIndexerWorker jobs

See merge request gitlab-org/gitlab!31500
parents 80963589 5602d2f6
...@@ -14,8 +14,8 @@ module Elastic ...@@ -14,8 +14,8 @@ module Elastic
end end
end end
def index_commits_and_blobs(from_rev: nil, to_rev: nil) def index_commits_and_blobs
::ElasticCommitIndexerWorker.perform_async(project.id, from_rev, to_rev) ::ElasticCommitIndexerWorker.perform_async(project.id)
end end
end end
end end
...@@ -8,8 +8,8 @@ module Elastic ...@@ -8,8 +8,8 @@ module Elastic
delegate(:delete_index_for_commits_and_blobs, :elastic_search, to: :__elasticsearch__) delegate(:delete_index_for_commits_and_blobs, :elastic_search, to: :__elasticsearch__)
def index_wiki_blobs(to_sha = nil) def index_wiki_blobs
ElasticCommitIndexerWorker.perform_async(project.id, nil, to_sha, true) ElasticCommitIndexerWorker.perform_async(project.id, nil, nil, true)
end end
end end
end end
...@@ -61,8 +61,7 @@ module EE ...@@ -61,8 +61,7 @@ module EE
after_transition started: :finished do |state, _| after_transition started: :finished do |state, _|
if state.project.use_elasticsearch? if state.project.use_elasticsearch?
state.run_after_commit do state.run_after_commit do
last_indexed_commit = state.project.index_status&.last_commit ElasticCommitIndexerWorker.perform_async(state.project_id)
ElasticCommitIndexerWorker.perform_async(state.project_id, last_indexed_commit)
end end
end end
end end
......
...@@ -18,7 +18,7 @@ module EE ...@@ -18,7 +18,7 @@ module EE
def enqueue_elasticsearch_indexing def enqueue_elasticsearch_indexing
return unless should_index_commits? return unless should_index_commits?
project.repository.index_commits_and_blobs(from_rev: oldrev, to_rev: newrev) project.repository.index_commits_and_blobs
end end
def enqueue_update_external_pull_requests def enqueue_update_external_pull_requests
......
...@@ -10,11 +10,9 @@ module EE ...@@ -10,11 +10,9 @@ module EE
super super
return unless project.use_elasticsearch? return unless project.use_elasticsearch?
return unless default_branch_changes.any?
# For all changes on the default branch (usually master) trigger an ES update project.wiki.index_wiki_blobs
default_branch_changes.each do |change|
project.wiki.index_wiki_blobs(change[:newrev])
end
end end
end end
end end
......
...@@ -485,7 +485,7 @@ ...@@ -485,7 +485,7 @@
:urgency: :throttled :urgency: :throttled
:resource_boundary: :unknown :resource_boundary: :unknown
:weight: 1 :weight: 1
:idempotent: :idempotent: true
- :name: elastic_full_index - :name: elastic_full_index
:feature_category: :global_search :feature_category: :global_search
:has_external_dependencies: :has_external_dependencies:
......
# frozen_string_literal: true # frozen_string_literal: true
class ElasticCommitIndexerWorker # rubocop:disable Scalability/IdempotentWorker class ElasticCommitIndexerWorker
include ApplicationWorker include ApplicationWorker
feature_category :global_search feature_category :global_search
sidekiq_options retry: 2 sidekiq_options retry: 2
urgency :throttled urgency :throttled
idempotent!
# Performs the commits and blobs indexation
#
# project_id - The ID of the project to index
# oldrev @deprecated - The revision to start indexing at (default: INDEXED_SHA)
# newrev @deprecated - The revision to stop indexing at (default: HEAD)
# wiki - Treat this project as a Wiki
#
# The indexation will cover all commits within INDEXED_SHA..HEAD
def perform(project_id, oldrev = nil, newrev = nil, wiki = false) def perform(project_id, oldrev = nil, newrev = nil, wiki = false)
return true unless Gitlab::CurrentSettings.elasticsearch_indexing? return true unless Gitlab::CurrentSettings.elasticsearch_indexing?
project = Project.find(project_id) project = Project.find(project_id)
return true unless project.use_elasticsearch? return true unless project.use_elasticsearch?
Gitlab::Elastic::Indexer.new(project, wiki: wiki).run(newrev) Gitlab::Elastic::Indexer.new(project, wiki: wiki).run
end end
end end
---
title: Make the ElasticCommitIndexer idempotent to enable job de-duplication.
merge_request: 31500
author: mbergeron
type: performance
...@@ -40,9 +40,7 @@ module Elastic ...@@ -40,9 +40,7 @@ module Elastic
def search_commit(query, page: 1, per: 20, options: {}) def search_commit(query, page: 1, per: 20, options: {})
page ||= 1 page ||= 1
fields = %w(message^10 sha^5 author.name^2 author.email^2 committer.name committer.email).map {|i| "commit.#{i}"} fields = %w(message^10 sha^5 author.name^2 author.email^2 committer.name committer.email).map {|i| "commit.#{i}"}
query_with_prefix = query.split(/\s+/).map { |s| s.gsub(SHA_REGEX) { |sha| "#{sha}*" } }.join(' ') query_with_prefix = query.split(/\s+/).map { |s| s.gsub(SHA_REGEX) { |sha| "#{sha}*" } }.join(' ')
query_hash = { query_hash = {
......
...@@ -16,7 +16,8 @@ module Gitlab ...@@ -16,7 +16,8 @@ module Gitlab
end end
end end
attr_reader :project, :index_status attr_reader :project, :index_status, :wiki
alias_method :index_wiki?, :wiki
def initialize(project, wiki: false) def initialize(project, wiki: false)
@project = project @project = project
...@@ -26,45 +27,52 @@ module Gitlab ...@@ -26,45 +27,52 @@ module Gitlab
@index_status = project.index_status @index_status = project.index_status
end end
def run(to_sha = nil) # Runs the indexation process, which is the following:
to_sha = nil if to_sha == Gitlab::Git::BLANK_SHA # - Purge the index for any unreachable commits;
# - Run the `gitlab-elasticsearch-indexer`;
head_commit = repository.try(:commit) # - Update the `index_status` for the associated project;
#
if repository.nil? || !repository.exists? || repository.empty? || head_commit.nil? # ref - Git ref up to which the indexation will run (default: HEAD)
update_index_status(Gitlab::Git::BLANK_SHA) def run(ref = 'HEAD')
return commit = find_indexable_commit(ref)
end return update_index_status(Gitlab::Git::BLANK_SHA) unless commit
repository.__elasticsearch__.elastic_writing_targets.each do |target| repository.__elasticsearch__.elastic_writing_targets.each do |target|
run_indexer!(to_sha, target) Sidekiq.logger.debug(message: "Indexation running for #{project.id} #{from_sha}..#{commit.sha}",
project_id: project.id,
wiki: index_wiki?)
run_indexer!(commit.sha, target)
end end
update_index_status(to_sha)
# update the index status only if all writes were successful
update_index_status(commit.sha)
true true
end end
private def find_indexable_commit(ref)
!repository.empty? && repository.commit(ref)
def wiki?
@wiki
end end
private
def repository def repository
wiki? ? project.wiki.repository : project.repository index_wiki? ? project.wiki.repository : project.repository
end end
def run_indexer!(to_sha, target) def run_indexer!(to_sha, target)
vars = build_envvars(to_sha, target) # This might happen when default branch has been reset or rebased.
base_sha = if purge_unreachable_commits_from_index!(to_sha, target)
if index_status && !repository_contains_last_indexed_commit? Gitlab::Git::EMPTY_TREE_ID
target.delete_index_for_commits_and_blobs(wiki: wiki?) else
end from_sha
end
vars = build_envvars(base_sha, to_sha, target)
path_to_indexer = Gitlab.config.elasticsearch.indexer_path path_to_indexer = Gitlab.config.elasticsearch.indexer_path
command = command =
if wiki? if index_wiki?
[path_to_indexer, "--blob-type=wiki_blob", "--skip-commits", project.id.to_s, repository_path] [path_to_indexer, "--blob-type=wiki_blob", "--skip-commits", project.id.to_s, repository_path]
else else
[path_to_indexer, project.id.to_s, repository_path] [path_to_indexer, project.id.to_s, repository_path]
...@@ -75,7 +83,19 @@ module Gitlab ...@@ -75,7 +83,19 @@ module Gitlab
raise Error, output unless status&.zero? raise Error, output unless status&.zero?
end end
def build_envvars(to_sha, target) # Remove all indexed data for commits and blobs for a project.
#
# @return: whether the index has been purged
def purge_unreachable_commits_from_index!(to_sha, target)
return false if last_commit_ancestor_of?(to_sha)
target.delete_index_for_commits_and_blobs(wiki: index_wiki?)
true
rescue ::Elasticsearch::Transport::Transport::Errors::BadRequest => e
Gitlab::ErrorTracking.track_exception(e, project_id: project.id)
end
def build_envvars(from_sha, to_sha, target)
# We accept any form of settings, including string and array # We accept any form of settings, including string and array
# This is why JSON is needed # This is why JSON is needed
vars = { vars = {
...@@ -96,15 +116,13 @@ module Gitlab ...@@ -96,15 +116,13 @@ module Gitlab
end end
def last_commit def last_commit
if wiki? index_wiki? ? index_status&.last_wiki_commit : index_status&.last_commit
index_status&.last_wiki_commit
else
index_status&.last_commit
end
end end
def from_sha def from_sha
repository_contains_last_indexed_commit? ? last_commit : Gitlab::Git::EMPTY_TREE_ID strong_memoize(:from_sha) do
repository_contains_last_indexed_commit? ? last_commit : Gitlab::Git::EMPTY_TREE_ID
end
end end
def repository_contains_last_indexed_commit? def repository_contains_last_indexed_commit?
...@@ -113,6 +131,15 @@ module Gitlab ...@@ -113,6 +131,15 @@ module Gitlab
end end
end end
def last_commit_ancestor_of?(to_sha)
return true if from_sha == Gitlab::Git::BLANK_SHA
return false unless repository_contains_last_indexed_commit?
# we always treat the `EMPTY_TREE_ID` as an ancestor to make sure
# we don't try to purge an empty index
from_sha == Gitlab::Git::EMPTY_TREE_ID || repository.ancestor?(from_sha, to_sha)
end
def repository_path def repository_path
"#{repository.disk_path}.git" "#{repository.disk_path}.git"
end end
...@@ -131,7 +158,7 @@ module Gitlab ...@@ -131,7 +158,7 @@ module Gitlab
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def update_index_status(to_sha) def update_index_status(to_sha)
head_commit = repository.try(:commit) raise "Invalid sha #{to_sha}" unless to_sha.present?
# An index_status should always be created, # An index_status should always be created,
# even if the repository is empty, so we know it's been looked at. # even if the repository is empty, so we know it's been looked at.
...@@ -142,17 +169,11 @@ module Gitlab ...@@ -142,17 +169,11 @@ module Gitlab
retry retry
end end
# Don't update the index status if we never reached HEAD
return if head_commit && to_sha && head_commit.sha != to_sha
sha = head_commit.try(:sha)
sha ||= Gitlab::Git::BLANK_SHA
attributes = attributes =
if wiki? if index_wiki?
{ last_wiki_commit: sha, wiki_indexed_at: Time.now } { last_wiki_commit: to_sha, wiki_indexed_at: Time.now }
else else
{ last_commit: sha, indexed_at: Time.now } { last_commit: to_sha, indexed_at: Time.now }
end end
@index_status.update(attributes) @index_status.update(attributes)
......
This diff is collapsed.
...@@ -33,13 +33,12 @@ describe ProjectWiki, :elastic do ...@@ -33,13 +33,12 @@ describe ProjectWiki, :elastic do
Sidekiq::Testing.inline! do Sidekiq::Testing.inline! do
project.wiki.find_page('omega_page').delete project.wiki.find_page('omega_page').delete
last_commit = project.wiki.repository.commit.sha
expect_next_instance_of(Gitlab::Elastic::Indexer) do |indexer| expect_next_instance_of(Gitlab::Elastic::Indexer) do |indexer|
expect(indexer).to receive(:run).with(last_commit).and_call_original expect(indexer).to receive(:run).and_call_original
end end
project.wiki.index_wiki_blobs(last_commit) project.wiki.index_wiki_blobs
ensure_elasticsearch_index! ensure_elasticsearch_index!
end end
......
...@@ -52,7 +52,7 @@ describe ProjectImportState, type: :model do ...@@ -52,7 +52,7 @@ describe ProjectImportState, type: :model do
context 'no index status' do context 'no index status' do
it 'schedules a full index of the repository' do it 'schedules a full index of the repository' do
expect(ElasticCommitIndexerWorker).to receive(:perform_async).with(import_state.project_id, nil) expect(ElasticCommitIndexerWorker).to receive(:perform_async).with(import_state.project_id)
import_state.finish import_state.finish
end end
...@@ -61,8 +61,8 @@ describe ProjectImportState, type: :model do ...@@ -61,8 +61,8 @@ describe ProjectImportState, type: :model do
context 'with index status' do context 'with index status' do
let(:index_status) { IndexStatus.create!(project: project, indexed_at: Time.now, last_commit: 'foo') } let(:index_status) { IndexStatus.create!(project: project, indexed_at: Time.now, last_commit: 'foo') }
it 'schedules a progressive index of the repository' do it 'schedules a full index of the repository' do
expect(ElasticCommitIndexerWorker).to receive(:perform_async).with(import_state.project_id, index_status.last_commit) expect(ElasticCommitIndexerWorker).to receive(:perform_async).with(import_state.project_id)
import_state.finish import_state.finish
end end
......
...@@ -55,7 +55,7 @@ describe Git::BranchPushService do ...@@ -55,7 +55,7 @@ describe Git::BranchPushService do
end end
it 'runs ElasticCommitIndexerWorker' do it 'runs ElasticCommitIndexerWorker' do
expect(ElasticCommitIndexerWorker).to receive(:perform_async).with(project.id, oldrev, newrev) expect(ElasticCommitIndexerWorker).to receive(:perform_async).with(project.id)
subject.execute subject.execute
end end
...@@ -95,7 +95,7 @@ describe Git::BranchPushService do ...@@ -95,7 +95,7 @@ describe Git::BranchPushService do
end end
it 'runs ElasticCommitIndexerWorker' do it 'runs ElasticCommitIndexerWorker' do
expect(ElasticCommitIndexerWorker).to receive(:perform_async).with(project.id, oldrev, newrev) expect(ElasticCommitIndexerWorker).to receive(:perform_async).with(project.id)
subject.execute subject.execute
end end
...@@ -110,7 +110,7 @@ describe Git::BranchPushService do ...@@ -110,7 +110,7 @@ describe Git::BranchPushService do
end end
it 'runs ElasticCommitIndexerWorker' do it 'runs ElasticCommitIndexerWorker' do
expect(ElasticCommitIndexerWorker).to receive(:perform_async).with(project.id, oldrev, newrev) expect(ElasticCommitIndexerWorker).to receive(:perform_async).with(project.id)
subject.execute subject.execute
end end
......
...@@ -28,7 +28,7 @@ describe Git::WikiPushService do ...@@ -28,7 +28,7 @@ describe Git::WikiPushService do
end end
it 'triggers a wiki update' do it 'triggers a wiki update' do
expect(project.wiki).to receive(:index_wiki_blobs).with("797823") expect(project.wiki).to receive(:index_wiki_blobs)
described_class.new(project, project.owner, changes: post_received.changes).execute described_class.new(project, project.owner, changes: post_received.changes).execute
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment