Commit 87dfebea authored by Adam Hegyi's avatar Adam Hegyi Committed by Aleksei Lipniagov

Use the new VSA query backend when loading records

parent aa8bd302
......@@ -135,11 +135,7 @@ module Issuables
# rubocop: disable CodeReuse/ActiveRecord
def label_link_query(target_model, label_ids: nil, label_names: nil)
relation = LabelLink
.where(target_type: target_model.name)
.where(LabelLink.arel_table['target_id'].eq(target_model.arel_table['id']))
relation = relation.where(label_id: label_ids) if label_ids
relation = LabelLink.by_target_for_exists_query(target_model.name, target_model.arel_table['id'], label_ids)
relation = relation.joins(:label).where(labels: { name: label_names }) if label_names
relation
......
......@@ -20,6 +20,10 @@ module Analytics
def self.issuable_id_column
:issue_id
end
def self.issuable_model
::Issue
end
end
end
end
......@@ -20,6 +20,10 @@ module Analytics
def self.issuable_id_column
:merge_request_id
end
def self.issuable_model
::MergeRequest
end
end
end
end
......@@ -18,6 +18,10 @@ module Analytics
scope :end_event_is_not_happened_yet, -> { where(end_event_timestamp: nil) }
end
def issuable_id
attributes[self.class.issuable_id_column.to_s]
end
class_methods do
def upsert_data(data)
upsert_values = data.map do |row|
......@@ -26,8 +30,8 @@ module Analytics
:issuable_id,
:group_id,
:project_id,
:author_id,
:milestone_id,
:author_id,
:state_id,
:start_event_timestamp,
:end_event_timestamp
......
......@@ -11,4 +11,16 @@ class LabelLink < ApplicationRecord
validates :label, presence: true, unless: :importing?
scope :for_target, -> (target_id, target_type) { where(target_id: target_id, target_type: target_type) }
# Example: Issues has at least one label within a project
# > Issue.where(project_id: 100) # or any scope on issues
# > .where(LabelLink.by_target_for_exists_query('Issue', Issue.arel_table[:id]).arel.exists)
scope :by_target_for_exists_query, -> (target_type, arel_join_column, label_ids = nil) do
relation = LabelLink
.where(target_type: target_type)
.where(arel_table['target_id'].eq(arel_join_column))
relation = relation.where(label_id: label_ids) if label_ids
relation
end
end
......@@ -10,7 +10,21 @@ RSpec.describe Groups::Analytics::CycleAnalytics::StagesController do
let(:params) { { group_id: group } }
let(:parent) { group }
it_behaves_like 'Value Stream Analytics Stages controller'
context 'when use_vsa_aggregated_tables FF is enabled' do
it_behaves_like 'Value Stream Analytics Stages controller' do
before do
stub_feature_flags(use_vsa_aggregated_tables: true)
end
end
end
context 'when use_vsa_aggregated_tables FF is disabled' do
it_behaves_like 'Value Stream Analytics Stages controller' do
before do
stub_feature_flags(use_vsa_aggregated_tables: false)
end
end
end
end
context 'when params have group_id and value_stream_id' do
......@@ -19,6 +33,20 @@ RSpec.describe Groups::Analytics::CycleAnalytics::StagesController do
let(:params) { { group_id: group, value_stream_id: value_stream.id } }
let(:parent) { group }
it_behaves_like 'Value Stream Analytics Stages controller'
context 'when use_vsa_aggregated_tables FF is enabled' do
it_behaves_like 'Value Stream Analytics Stages controller' do
before do
stub_feature_flags(use_vsa_aggregated_tables: true)
end
end
end
context 'when use_vsa_aggregated_tables FF is disabled' do
it_behaves_like 'Value Stream Analytics Stages controller' do
before do
stub_feature_flags(use_vsa_aggregated_tables: false)
end
end
end
end
end
......@@ -7,6 +7,10 @@ RSpec.describe Gitlab::Analytics::CycleAnalytics::DataCollector do
let(:current_time) { Time.new(2019, 6, 1) }
before do
stub_licensed_features(cycle_analytics_for_groups: true)
end
around do |example|
Timecop.freeze(current_time) { example.run }
end
......@@ -15,6 +19,20 @@ RSpec.describe Gitlab::Analytics::CycleAnalytics::DataCollector do
seconds.fdiv(1.day.to_i).round
end
def aggregate_vsa_data(group)
Analytics::CycleAnalytics::DataLoaderService.new(
group: group,
model: Issue,
updated_at_before: Time.now
).execute
Analytics::CycleAnalytics::DataLoaderService.new(
group: group,
model: MergeRequest,
updated_at_before: Time.now
).execute
end
# Setting up test data for a stage depends on the `start_event_identifier` and
# `end_event_identifier` attributes. Since stages can be customized, the test
# uses two methods for the data preparaton: `create_data_for_start_event` and
......@@ -23,7 +41,7 @@ RSpec.describe Gitlab::Analytics::CycleAnalytics::DataCollector do
shared_examples 'custom Value Stream Analytics Stage' do
let(:params) { { from: Time.new(2019), to: Time.new(2020), current_user: user } }
let(:data_collector) { described_class.new(stage: stage, params: params) }
let(:aggregated_data_collector) { described_class.new(stage: stage, params: params.merge(use_vsa_aggregated_tables: true)) }
let(:aggregated_data_collector) { described_class.new(stage: stage, params: params.merge(use_aggregated_data_collector: true)) }
let_it_be(:resource_1_end_time) { Time.new(2019, 3, 15) }
let_it_be(:resource_2_end_time) { Time.new(2019, 3, 10) }
......@@ -86,6 +104,8 @@ RSpec.describe Gitlab::Analytics::CycleAnalytics::DataCollector do
it 'loads serialized records' do
items = data_collector.serialized_records
expect(items.size).to eq(3)
expect(aggregated_data_collector.serialized_records.size).to eq(3) if aggregated_data_collector_enabled
end
context 'when sorting by duration' do
......@@ -149,7 +169,7 @@ RSpec.describe Gitlab::Analytics::CycleAnalytics::DataCollector do
expected_median = (duration_1 + duration_2).fdiv(2)
expect(round_to_days(data_collector.median.seconds)).to eq(round_to_days(expected_median))
expect(round_to_days(aggregated_data_collector.median.seconds)).to eq(round_to_days(expected_median))
expect(round_to_days(aggregated_data_collector.median.seconds)).to eq(round_to_days(expected_median)) if aggregated_data_collector_enabled
end
it 'loads serialized records' do
......@@ -528,19 +548,7 @@ RSpec.describe Gitlab::Analytics::CycleAnalytics::DataCollector do
end
before do
stub_licensed_features(cycle_analytics_for_groups: true)
Analytics::CycleAnalytics::DataLoaderService.new(
group: group,
model: Issue,
updated_at_before: Time.now
).execute
Analytics::CycleAnalytics::DataLoaderService.new(
group: group,
model: MergeRequest,
updated_at_before: Time.now
).execute
aggregate_vsa_data(group)
end
before_all do
......@@ -552,18 +560,17 @@ RSpec.describe Gitlab::Analytics::CycleAnalytics::DataCollector do
let_it_be(:group) { create(:group) }
let_it_be(:project1) { create(:project, :repository, group: group) }
let_it_be(:project2) { create(:project, :repository, group: group) }
let_it_be(:stage) do
create(:cycle_analytics_group_stage,
name: 'My Stage',
group: group,
start_event_identifier: :merge_request_created,
end_event_identifier: :merge_request_merged
)
end
let(:merge_request) { project2.merge_requests.first }
let(:stage) do
Analytics::CycleAnalytics::GroupStage.new(
name: 'My Stage',
group: group,
start_event_identifier: :merge_request_created,
end_event_identifier: :merge_request_merged
)
end
let(:data_collector_params) do
{
created_after: Time.new(2019, 1, 1),
......@@ -601,6 +608,8 @@ RSpec.describe Gitlab::Analytics::CycleAnalytics::DataCollector do
context 'when `project_ids` parameter is given' do
before do
data_collector_params[:project_ids] = [project2.id]
aggregate_vsa_data(group)
end
it_behaves_like 'filter examples'
......@@ -613,6 +622,8 @@ RSpec.describe Gitlab::Analytics::CycleAnalytics::DataCollector do
merge_request.assignees << assignee
data_collector_params[:assignee_username] = [assignee.username]
aggregate_vsa_data(group)
end
it_behaves_like 'filter examples'
......@@ -625,6 +636,8 @@ RSpec.describe Gitlab::Analytics::CycleAnalytics::DataCollector do
merge_request.update!(author: author)
data_collector_params[:author_username] = author.username
aggregate_vsa_data(group)
end
it_behaves_like 'filter examples'
......@@ -641,6 +654,8 @@ RSpec.describe Gitlab::Analytics::CycleAnalytics::DataCollector do
).execute(merge_request)
data_collector_params[:label_name] = [label.name]
aggregate_vsa_data(group)
end
it_behaves_like 'filter examples'
......@@ -657,6 +672,8 @@ RSpec.describe Gitlab::Analytics::CycleAnalytics::DataCollector do
).execute(merge_request)
data_collector_params[:label_name] = ['Any']
aggregate_vsa_data(group)
end
it_behaves_like 'filter examples'
......@@ -674,6 +691,8 @@ RSpec.describe Gitlab::Analytics::CycleAnalytics::DataCollector do
).execute(merge_request)
data_collector_params[:label_name] = [label1.name, label2.name]
aggregate_vsa_data(group)
end
it_behaves_like 'filter examples'
......@@ -686,6 +705,8 @@ RSpec.describe Gitlab::Analytics::CycleAnalytics::DataCollector do
merge_request.update!(milestone: milestone)
data_collector_params[:milestone_title] = milestone.title
aggregate_vsa_data(group)
end
it_behaves_like 'filter examples'
......
......@@ -227,8 +227,22 @@ RSpec.shared_examples 'Value Stream Analytics Stages controller' do
end
it 'accepts sort params' do
expect_next_instance_of(Gitlab::Analytics::CycleAnalytics::Sorting) do |sort|
expect(sort).to receive(:apply).with(:duration, :asc).and_call_original
if Feature.enabled?(:use_vsa_aggregated_tables)
event_1 = create(:cycle_analytics_merge_request_stage_event, merge_request_id: 1, start_event_timestamp: 4.months.ago, end_event_timestamp: Date.today)
event_2 = create(:cycle_analytics_merge_request_stage_event, merge_request_id: 2, start_event_timestamp: 2.months.ago, end_event_timestamp: Date.today)
event_3 = create(:cycle_analytics_merge_request_stage_event, merge_request_id: 3, start_event_timestamp: 3.months.ago, end_event_timestamp: Date.today)
allow_any_instance_of(Gitlab::Analytics::CycleAnalytics::Aggregated::RecordsFetcher).to receive(:query).and_return(Analytics::CycleAnalytics::MergeRequestStageEvent.all)
expect_next_instance_of(Gitlab::Analytics::CycleAnalytics::Aggregated::RecordsFetcher) do |records_fetcher|
records_fetcher.serialized_records do |raw_active_record_scope|
expect(raw_active_record_scope.pluck(:merge_request_id)).to eq([event_2.merge_request_id, event_3.merge_request_id, event_1.merge_request_id])
end
end
else
expect_next_instance_of(Gitlab::Analytics::CycleAnalytics::Sorting) do |sort|
expect(sort).to receive(:apply).with(:duration, :asc).and_call_original
end
end
subject
......@@ -239,9 +253,19 @@ RSpec.shared_examples 'Value Stream Analytics Stages controller' do
context 'pagination' do
it 'exposes pagination headers' do
create_list(:merge_request, 3)
stub_const('Gitlab::Analytics::CycleAnalytics::RecordsFetcher::MAX_RECORDS', 2)
allow_any_instance_of(Gitlab::Analytics::CycleAnalytics::RecordsFetcher).to receive(:query).and_return(MergeRequest.join_metrics.all)
if Feature.enabled?(:use_vsa_aggregated_tables)
create_list(:cycle_analytics_merge_request_stage_event, 3)
stub_const('Gitlab::Analytics::CycleAnalytics::Aggregated::RecordsFetcher::MAX_RECORDS', 2)
else
create_list(:merge_request, 3)
stub_const('Gitlab::Analytics::CycleAnalytics::RecordsFetcher::MAX_RECORDS', 2)
end
if Feature.enabled?(:use_vsa_aggregated_tables)
allow_any_instance_of(Gitlab::Analytics::CycleAnalytics::Aggregated::RecordsFetcher).to receive(:query).and_return(Analytics::CycleAnalytics::MergeRequestStageEvent.all)
else
allow_any_instance_of(Gitlab::Analytics::CycleAnalytics::RecordsFetcher).to receive(:query).and_return(MergeRequest.join_metrics.all)
end
subject
......
......@@ -63,23 +63,12 @@ module Gitlab
def filter_label_names(query)
return query if params[:label_name].blank?
all_label_ids = Issuables::LabelFilter
.new(group: root_ancestor, project: nil, params: { label_name: params[:label_name] })
.find_label_ids(params[:label_name])
return query.none if params[:label_name].size != all_label_ids.size
all_label_ids.each do |label_ids|
relation = LabelLink
.where(target_type: stage.subject_class.name)
.where(LabelLink.arel_table['target_id'].eq(query.model.arel_table[query.model.issuable_id_column]))
relation = relation.where(label_id: label_ids)
query = query.where(relation.arel.exists)
end
query
LabelFilter.new(
stage: stage,
params: params,
project: nil,
group: root_ancestor
).filter(query)
end
def filter_assignees(query)
......
......@@ -30,6 +30,12 @@ module Gitlab
strong_memoize(:count) { limit_count }
end
def records_fetcher
strong_memoize(:records_fetcher) do
RecordsFetcher.new(stage: stage, query: query, params: params)
end
end
private
attr_reader :stage, :params
......
# frozen_string_literal: true
module Gitlab
module Analytics
module CycleAnalytics
module Aggregated
# This class makes it possible to add label filters to stage event tables
class LabelFilter < Issuables::LabelFilter
extend ::Gitlab::Utils::Override
def initialize(stage:, project:, group:, **kwargs)
@stage = stage
super(project: project, group: group, **kwargs)
end
private
attr_reader :stage
override :label_link_query
def label_link_query(target_model, label_ids: nil)
join_column = target_model.arel_table[target_model.issuable_id_column]
LabelLink.by_target_for_exists_query(stage.subject_class.name, join_column, label_ids)
end
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Analytics
module CycleAnalytics
module Aggregated
class RecordsFetcher
include Gitlab::Utils::StrongMemoize
include StageQueryHelpers
MAX_RECORDS = 20
MAPPINGS = {
Issue => {
serializer_class: AnalyticsIssueSerializer,
includes_for_query: { project: { namespace: [:route] }, author: [] },
columns_for_select: %I[title iid id created_at author_id project_id]
},
MergeRequest => {
serializer_class: AnalyticsMergeRequestSerializer,
includes_for_query: { target_project: [:namespace], author: [] },
columns_for_select: %I[title iid id created_at author_id state_id target_project_id]
}
}.freeze
def initialize(stage:, query:, params: {})
@stage = stage
@query = query
@params = params
@sort = params[:sort] || :end_event
@direction = params[:direction] || :desc
@page = params[:page] || 1
@per_page = MAX_RECORDS
@stage_event_model = query.model
end
def serialized_records
strong_memoize(:serialized_records) do
records = ordered_and_limited_query.select(stage_event_model.arel_table[Arel.star], duration.as('total_time'))
yield records if block_given?
issuables_and_records = load_issuables(records)
preload_associations(issuables_and_records.map(&:first))
issuables_and_records.map do |issuable, record|
project = issuable.project
attributes = issuable.attributes.merge({
project_path: project.path,
namespace_path: project.namespace.route.path,
author: issuable.author,
total_time: record.total_time
})
serializer.represent(attributes)
end
end
end
# rubocop: disable CodeReuse/ActiveRecord
def ordered_and_limited_query
sorting_options = {
end_event: {
asc: -> { query.order(end_event_timestamp: :asc) },
desc: -> { query.order(end_event_timestamp: :desc) }
},
duration: {
asc: -> { query.order(duration.asc) },
desc: -> { query.order(duration.desc) }
}
}
sort_lambda = sorting_options.dig(sort, direction) || sorting_options.dig(:end_event, :desc)
sort_lambda.call
.page(page)
.per(per_page)
.without_count
end
# rubocop: enable CodeReuse/ActiveRecord
private
attr_reader :stage, :query, :sort, :direction, :params, :page, :per_page, :stage_event_model
delegate :subject_class, to: :stage
def load_issuables(stage_event_records)
stage_event_records_by_issuable_id = stage_event_records.index_by(&:issuable_id)
issuable_model = stage_event_model.issuable_model
issuables_by_id = issuable_model.id_in(stage_event_records_by_issuable_id.keys).index_by(&:id)
stage_event_records_by_issuable_id.map do |issuable_id, record|
[issuables_by_id[issuable_id], record] if issuables_by_id[issuable_id]
end.compact
end
def serializer
MAPPINGS.fetch(subject_class).fetch(:serializer_class).new
end
# rubocop: disable CodeReuse/ActiveRecord
def preload_associations(records)
ActiveRecord::Associations::Preloader.new.preload(
records,
MAPPINGS.fetch(subject_class).fetch(:includes_for_query)
)
records
end
# rubocop: enable CodeReuse/ActiveRecord
end
end
end
end
end
......@@ -23,7 +23,11 @@ module Gitlab
def records_fetcher
strong_memoize(:records_fetcher) do
RecordsFetcher.new(stage: stage, query: query, params: params)
if use_aggregated_data_collector?
aggregated_data_collector.records_fetcher
else
RecordsFetcher.new(stage: stage, query: query, params: params)
end
end
end
......
......@@ -8,23 +8,11 @@ module Gitlab
include StageQueryHelpers
include Gitlab::CycleAnalytics::MetricsTables
MAX_RECORDS = 20
MAPPINGS = {
Issue => {
serializer_class: AnalyticsIssueSerializer,
includes_for_query: { project: { namespace: [:route] }, author: [] },
columns_for_select: %I[title iid id created_at author_id project_id]
},
MergeRequest => {
serializer_class: AnalyticsMergeRequestSerializer,
includes_for_query: { target_project: [:namespace], author: [] },
columns_for_select: %I[title iid id created_at author_id state_id target_project_id]
}
}.freeze
delegate :subject_class, to: :stage
MAX_RECORDS = Gitlab::Analytics::CycleAnalytics::Aggregated::RecordsFetcher::MAX_RECORDS
MAPPINGS = Gitlab::Analytics::CycleAnalytics::Aggregated::RecordsFetcher::MAPPINGS
def initialize(stage:, query:, params: {})
@stage = stage
@query = query
......
......@@ -30,6 +30,10 @@ RSpec.describe 'Value Stream Analytics', :js do
wait_for_all_requests
end
before do
stub_feature_flags(use_vsa_aggregated_tables: false)
end
context 'as an allowed user' do
context 'when project is new' do
before do
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::Analytics::CycleAnalytics::Aggregated::RecordsFetcher do
let_it_be(:project) { create(:project) }
let_it_be(:issue_1) { create(:issue, project: project) }
let_it_be(:issue_2) { create(:issue, project: project) }
let_it_be(:issue_3) { create(:issue, project: project) }
let_it_be(:stage_event_1) { create(:cycle_analytics_issue_stage_event, issue_id: issue_1.id, start_event_timestamp: 2.years.ago, end_event_timestamp: 1.year.ago) } # duration: 1 year
let_it_be(:stage_event_2) { create(:cycle_analytics_issue_stage_event, issue_id: issue_2.id, start_event_timestamp: 5.years.ago, end_event_timestamp: 2.years.ago) } # duration: 3 years
let_it_be(:stage_event_3) { create(:cycle_analytics_issue_stage_event, issue_id: issue_3.id, start_event_timestamp: 6.years.ago, end_event_timestamp: 3.months.ago) } # duration: 5+ years
let_it_be(:stage) { create(:cycle_analytics_project_stage, start_event_identifier: :issue_created, end_event_identifier: :issue_deployed_to_production, project: project) }
let(:params) { {} }
subject(:records_fetcher) do
described_class.new(stage: stage, query: Analytics::CycleAnalytics::IssueStageEvent.all, params: params)
end
shared_examples 'match returned records' do
it 'returns issues in the correct order' do
returned_iids = records_fetcher.serialized_records.pluck(:iid).map(&:to_i)
expect(returned_iids).to eq(expected_issue_ids)
end
end
describe '#serialized_records' do
describe 'sorting' do
context 'when sorting by end event DESC' do
let(:expected_issue_ids) { [issue_3.iid, issue_1.iid, issue_2.iid] }
before do
params[:sort] = :end_event
params[:direction] = :desc
end
it_behaves_like 'match returned records'
end
context 'when sorting by end event ASC' do
let(:expected_issue_ids) { [issue_2.iid, issue_1.iid, issue_3.iid] }
before do
params[:sort] = :end_event
params[:direction] = :asc
end
it_behaves_like 'match returned records'
end
context 'when sorting by duration DESC' do
let(:expected_issue_ids) { [issue_3.iid, issue_2.iid, issue_1.iid] }
before do
params[:sort] = :duration
params[:direction] = :desc
end
it_behaves_like 'match returned records'
end
context 'when sorting by duration ASC' do
let(:expected_issue_ids) { [issue_1.iid, issue_2.iid, issue_3.iid] }
before do
params[:sort] = :duration
params[:direction] = :asc
end
it_behaves_like 'match returned records'
end
end
describe 'pagination' do
let(:expected_issue_ids) { [issue_3.iid] }
before do
params[:sort] = :duration
params[:direction] = :asc
params[:page] = 2
stub_const('Gitlab::Analytics::CycleAnalytics::Aggregated::RecordsFetcher::MAX_RECORDS', 2)
end
it_behaves_like 'match returned records'
end
context 'when passing a block to serialized_records method' do
before do
params[:sort] = :duration
params[:direction] = :asc
end
it 'yields the underlying stage event scope' do
stage_event_records = []
records_fetcher.serialized_records do |scope|
stage_event_records.concat(scope.to_a)
end
expect(stage_event_records.map(&:issue_id)).to eq([issue_1.id, issue_2.id, issue_3.id])
end
end
context 'when the issue record no longer exists' do
it 'skips non-existing issue records' do
create(:cycle_analytics_issue_stage_event, {
issue_id: 0, # non-existing id
start_event_timestamp: 5.months.ago,
end_event_timestamp: 3.months.ago
})
stage_event_count = nil
records_fetcher.serialized_records do |scope|
stage_event_count = scope.to_a.size
end
issue_count = records_fetcher.serialized_records.to_a.size
expect(stage_event_count).to eq(4)
expect(issue_count).to eq(3)
end
end
end
end
......@@ -36,8 +36,8 @@ RSpec.shared_examples 'StageEventModel' do
described_class.issuable_id_column,
:group_id,
:project_id,
:milestone_id,
:author_id,
:milestone_id,
:state_id,
:start_event_timestamp,
:end_event_timestamp
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment