Commit 2fa14ee0 authored by Tiger Watson's avatar Tiger Watson

Merge branch '344804-vsa-consistency-service' into 'master'

Add aggregated VSA consistency check service

See merge request gitlab-org/gitlab!79519
parents 7c7bcd85 5446bc51
......@@ -12,7 +12,7 @@ module Namespaces
def as_ids
return super unless use_traversal_ids?
select('namespaces.traversal_ids[array_length(namespaces.traversal_ids, 1)] AS id')
select(Arel.sql('namespaces.traversal_ids[array_length(namespaces.traversal_ids, 1)]').as('id'))
end
def roots
......
# frozen_string_literal: true
module Analytics
module CycleAnalytics
class ConsistencyCheckService
include Validations
BATCH_LIMIT = 1000
def initialize(group:, event_model:)
@group = group
@event_model = event_model
end
# rubocop: disable CodeReuse/ActiveRecord
def execute
error_response = validate
return error_response if error_response
stage_event_hash_ids.each do |stage_event_hash_id|
scope = event_model.where(stage_event_hash_id: stage_event_hash_id).where.not(end_event_timestamp: nil).order_by_end_event(:asc)
iterator(scope).each_batch(of: BATCH_LIMIT) do |relation|
ids = relation.pluck(event_model.issuable_id_column)
next if ids.empty?
id_list = Arel::Nodes::ValuesList.new(ids.map { |id| [id] })
# Check if the referenced issues or merge requests still exist
inconsistent_id_list = model.connection.select_values <<~SQL
SELECT ids.stage_event_issuable_id
FROM ((#{id_list.to_sql})) AS ids(stage_event_issuable_id)
WHERE
NOT EXISTS (#{model.select('1').where('id = ids.stage_event_issuable_id').to_sql})
SQL
next if inconsistent_id_list.empty?
event_model.where(stage_event_hash_id: stage_event_hash_id, event_model.issuable_id_column => inconsistent_id_list).delete_all
end
end
success(:group_processed)
end
# rubocop: enable CodeReuse/ActiveRecord
private
attr_reader :group, :event_model
# rubocop: disable CodeReuse/ActiveRecord
def iterator(scope)
opts = {
in_operator_optimization_options: {
array_scope: group.self_and_descendant_ids,
array_mapping_scope: -> (id_expression) { event_model.where(event_model.arel_table[:group_id].eq(id_expression)) }
}
}
Gitlab::Pagination::Keyset::Iterator.new(scope: scope, **opts)
end
# rubocop: enable CodeReuse/ActiveRecord
def stage_event_hash_ids
@stage_event_hash_ids ||= ::Gitlab::Analytics::CycleAnalytics::DistinctStageLoader
.new(group: group)
.stages
.select { |stage| stage.start_event.object_type == model }
.map(&:stage_event_hash_id)
end
def model
event_model.issuable_model
end
end
end
end
......@@ -3,6 +3,8 @@
module Analytics
module CycleAnalytics
class DataLoaderService
include Validations
MAX_UPSERT_COUNT = 10_000
UPSERT_LIMIT = 1000
BATCH_LIMIT = 500
......@@ -24,19 +26,10 @@ module Analytics
end
def execute
unless model == Issue || model == MergeRequest
return error(:invalid_model)
end
unless group.licensed_feature_available?(:cycle_analytics_for_groups)
return error(:missing_license)
end
unless group.root_ancestor == group
return error(:requires_top_level_group)
end
error_response = validate
return error_response if error_response
response = success(:model_processed)
response = success(:model_processed, cursor: {})
iterator.each_batch(of: BATCH_LIMIT) do |records|
loaded_records = records.to_a
......@@ -58,17 +51,6 @@ module Analytics
attr_reader :group, :model, :cursor, :updated_at_before, :upsert_count, :stages
def error(error_reason)
ServiceResponse.error(
message: "DataLoader error for group: #{group.id} (#{error_reason})",
payload: { reason: error_reason }
)
end
def success(success_reason, cursor: {})
ServiceResponse.success(payload: { reason: success_reason, cursor: cursor })
end
# rubocop: disable CodeReuse/ActiveRecord
def iterator_base_scope
model.updated_before(updated_at_before).order(:updated_at, :id)
......
# frozen_string_literal: true
module Analytics
module CycleAnalytics
module Validations
def validate
unless model == Issue || model == MergeRequest
return error(:invalid_model)
end
unless group.licensed_feature_available?(:cycle_analytics_for_groups)
return error(:missing_license)
end
unless group.root?
error(:requires_top_level_group)
end
end
def error(error_reason)
ServiceResponse.error(
message: "#{self.class.name} error for group: #{group.id} (#{error_reason})",
payload: { reason: error_reason }
)
end
def success(success_reason, payload = {})
ServiceResponse.success(payload: { reason: success_reason }.merge(payload))
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Analytics::CycleAnalytics::ConsistencyCheckService, :aggregate_failures do
let_it_be_with_refind(:group) { create(:group) }
let_it_be_with_refind(:subgroup) { create(:group, parent: group) }
let_it_be(:project1) { create(:project, group: group) }
let_it_be(:project2) { create(:project, group: subgroup) }
subject(:service_response) { described_class.new(group: group, event_model: event_model).execute }
shared_examples 'consistency check examples' do
context 'when two records are deleted' do
before do
stub_licensed_features(cycle_analytics_for_groups: true)
Analytics::CycleAnalytics::DataLoaderService.new(group: group, model: event_model.issuable_model).execute
record1.delete
record3.delete
end
it 'cleans up the stage event records' do
expect(service_response).to be_success
expect(service_response.payload[:reason]).to eq(:group_processed)
all_stage_events = event_model.all
expect(all_stage_events.size).to eq(1)
expect(all_stage_events.first[event_model.issuable_id_column]).to eq(record2.id)
end
end
describe 'validation' do
context 'when license is missing' do
before do
stub_licensed_features(cycle_analytics_for_groups: false)
end
it 'fails' do
expect(service_response).to be_error
expect(service_response.payload[:reason]).to eq(:missing_license)
end
end
context 'when sub-group is given' do
let(:group) { subgroup }
before do
stub_licensed_features(cycle_analytics_for_groups: true)
end
it 'fails' do
expect(service_response).to be_error
expect(service_response.payload[:reason]).to eq(:requires_top_level_group)
end
end
end
end
context 'for issue based stage' do
let(:event_model) { Analytics::CycleAnalytics::IssueStageEvent }
let!(:record1) { create(:issue, :closed, project: project1) }
let!(:record2) { create(:issue, :closed, project: project2) }
let!(:record3) { create(:issue, :closed, project: project2) }
let!(:stage) { create(:cycle_analytics_group_stage, group: group, start_event_identifier: :issue_created, end_event_identifier: :issue_closed) }
it_behaves_like 'consistency check examples'
end
context 'for merge request based stage' do
let(:event_model) { Analytics::CycleAnalytics::MergeRequestStageEvent }
let!(:record1) { create(:merge_request, :closed_last_month, project: project1) }
let!(:record2) { create(:merge_request, :closed_last_month, project: project2) }
let!(:record3) { create(:merge_request, :closed_last_month, project: project2) }
let!(:stage) { create(:cycle_analytics_group_stage, group: group, start_event_identifier: :merge_request_created, end_event_identifier: :merge_request_closed) }
it_behaves_like 'consistency check examples'
end
end
......@@ -12,6 +12,7 @@ module Gitlab
array_scope_table = Arel::Table.new(ARRAY_SCOPE_CTE_NAME)
@columns = columns.map do |column|
column = column.right if column.is_a?(Arel::Nodes::As)
ColumnData.new(column, "array_scope_#{column}", array_scope_table)
end
end
......
......@@ -16,4 +16,13 @@ RSpec.describe Gitlab::Pagination::Keyset::InOperatorOptimization::ArrayScopeCol
it { expect { array_scope_columns }.to raise_error /No array columns were given/ }
end
context 'when Arel AS node is given as input' do
let(:scope) { Issue.select(Issue.arel_table[:id].as('id'), :title) }
let(:columns) { scope.select_values }
it 'works with Arel AS nodes' do
expect(array_scope_columns.array_aggregated_column_names).to eq(%w[array_cte_id_array array_cte_title_array])
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment