Commit 19fecb63 authored by Adam Hegyi's avatar Adam Hegyi

Add aggregated VSA consistency check service

This MR adds a new model to keep track of the Value Stream Analytics
data aggregation progress.

Changelog: added
parent d2f1a542
# frozen_string_literal: true
class Analytics::CycleAnalytics::Aggregation < ApplicationRecord
belongs_to :group, optional: false
validates :incremental_runtimes_in_seconds, :incremental_processed_records, :last_full_run_runtimes_in_seconds, :last_full_run_processed_records, presence: true, length: { maximum: 10 }
def self.safe_create_for_group(group)
top_level_group = group.root_ancestor
return if Analytics::CycleAnalytics::Aggregation.exists?(group_id: top_level_group.id)
insert({ group_id: top_level_group.id }, unique_by: :group_id)
end
end
# frozen_string_literal: true
class CreateAnalyticsCycleAnalyticsAggregations < Gitlab::Database::Migration[1.0]
enable_lock_retries!
def up
create_table :analytics_cycle_analytics_aggregations, id: false do |t|
t.references :group, index: false, null: false, foreign_key: { to_table: :namespaces, on_delete: :cascade }
t.integer :incremental_runtimes_in_seconds, array: true, default: [], null: false
t.integer :incremental_processed_records, array: true, default: [], null: false
t.integer :last_full_run_runtimes_in_seconds, array: true, default: [], null: false
t.integer :last_full_run_processed_records, array: true, default: [], null: false
t.integer :last_incremental_issues_id
t.integer :last_incremental_merge_requests_id
t.integer :last_full_run_issues_id
t.integer :last_full_run_merge_requests_id
t.datetime_with_timezone :last_incremental_run_at
t.datetime_with_timezone :last_incremental_issues_updated_at
t.datetime_with_timezone :last_incremental_merge_requests_updated_at
t.datetime_with_timezone :last_full_run_at
t.datetime_with_timezone :last_full_run_issues_updated_at
t.datetime_with_timezone :last_full_run_mrs_updated_at
t.datetime_with_timezone :last_consistency_check_updated_at
t.boolean :enabled, default: true, null: false
t.index :last_incremental_run_at, where: 'enabled IS TRUE', name: 'ca_aggregations_last_incremental_run_at', order: { last_incremental_run_at: 'ASC NULLS FIRST' }
t.index :last_full_run_at, where: 'enabled IS TRUE', name: 'ca_aggregations_last_full_run_at', order: { last_full_run_at: 'ASC NULLS FIRST' }
t.index :last_consistency_check_updated_at, where: 'enabled IS TRUE', name: 'ca_aggregations_last_consistency_check_updated_at', order: { last_consistency_check_updated_at: 'ASC NULLS FIRST' }
t.check_constraint 'CARDINALITY(incremental_runtimes_in_seconds) <= 10'
t.check_constraint 'CARDINALITY(incremental_processed_records) <= 10'
t.check_constraint 'CARDINALITY(last_full_run_runtimes_in_seconds) <= 10'
t.check_constraint 'CARDINALITY(last_full_run_processed_records) <= 10'
end
execute("ALTER TABLE analytics_cycle_analytics_aggregations ADD PRIMARY KEY (group_id)")
end
def down
drop_table :analytics_cycle_analytics_aggregations
end
end
# frozen_string_literal: true
class BackfillCycleAnalyticsAggregations < Gitlab::Database::Migration[1.0]
BATCH_SIZE = 50
def up
model = define_batchable_model('analytics_cycle_analytics_group_value_streams')
model.each_batch(of: BATCH_SIZE) do |relation|
execute <<~SQL
WITH records_to_be_inserted AS #{Gitlab::Database::AsWithMaterialized.materialized_if_supported} (
SELECT root_ancestor.id AS group_id
FROM (#{relation.select(:group_id).to_sql}) as value_streams,
LATERAL (
WITH RECURSIVE "base_and_ancestors" AS (
(SELECT "namespaces"."id", "namespaces"."parent_id" FROM "namespaces" WHERE "namespaces"."id" = value_streams.group_id)
UNION
(SELECT "namespaces"."id", "namespaces"."parent_id" FROM "namespaces", "base_and_ancestors" WHERE "namespaces"."id" = "base_and_ancestors"."parent_id")
)
SELECT "namespaces"."id" FROM "base_and_ancestors" as "namespaces" WHERE parent_id IS NULL LIMIT 1
) as root_ancestor
)
INSERT INTO "analytics_cycle_analytics_aggregations"
SELECT * FROM "records_to_be_inserted"
ON CONFLICT DO NOTHING
SQL
end
end
def down
# no-op
end
end
e147a8281f98ee397d7d9b652ce21b943e4e87c11fca906b72db839e0e2fa360
\ No newline at end of file
c87ca83f592c6688c31182fcd4cf6fe282c00a3c92ebe245b66455f57b50fc32
\ No newline at end of file
...@@ -10036,6 +10036,30 @@ CREATE SEQUENCE allowed_email_domains_id_seq ...@@ -10036,6 +10036,30 @@ CREATE SEQUENCE allowed_email_domains_id_seq
ALTER SEQUENCE allowed_email_domains_id_seq OWNED BY allowed_email_domains.id; ALTER SEQUENCE allowed_email_domains_id_seq OWNED BY allowed_email_domains.id;
CREATE TABLE analytics_cycle_analytics_aggregations (
group_id bigint NOT NULL,
incremental_runtimes_in_seconds integer[] DEFAULT '{}'::integer[] NOT NULL,
incremental_processed_records integer[] DEFAULT '{}'::integer[] NOT NULL,
last_full_run_runtimes_in_seconds integer[] DEFAULT '{}'::integer[] NOT NULL,
last_full_run_processed_records integer[] DEFAULT '{}'::integer[] NOT NULL,
last_incremental_issues_id integer,
last_incremental_merge_requests_id integer,
last_full_run_issues_id integer,
last_full_run_merge_requests_id integer,
last_incremental_run_at timestamp with time zone,
last_incremental_issues_updated_at timestamp with time zone,
last_incremental_merge_requests_updated_at timestamp with time zone,
last_full_run_at timestamp with time zone,
last_full_run_issues_updated_at timestamp with time zone,
last_full_run_mrs_updated_at timestamp with time zone,
last_consistency_check_updated_at timestamp with time zone,
enabled boolean DEFAULT true NOT NULL,
CONSTRAINT chk_rails_1ef688e577 CHECK ((cardinality(incremental_runtimes_in_seconds) <= 10)),
CONSTRAINT chk_rails_7810292ec9 CHECK ((cardinality(last_full_run_processed_records) <= 10)),
CONSTRAINT chk_rails_8b9e89687c CHECK ((cardinality(last_full_run_runtimes_in_seconds) <= 10)),
CONSTRAINT chk_rails_e16bf3913a CHECK ((cardinality(incremental_processed_records) <= 10))
);
CREATE TABLE analytics_cycle_analytics_group_stages ( CREATE TABLE analytics_cycle_analytics_group_stages (
id bigint NOT NULL, id bigint NOT NULL,
created_at timestamp with time zone NOT NULL, created_at timestamp with time zone NOT NULL,
...@@ -22928,6 +22952,9 @@ ALTER TABLE ONLY alert_management_http_integrations ...@@ -22928,6 +22952,9 @@ ALTER TABLE ONLY alert_management_http_integrations
ALTER TABLE ONLY allowed_email_domains ALTER TABLE ONLY allowed_email_domains
ADD CONSTRAINT allowed_email_domains_pkey PRIMARY KEY (id); ADD CONSTRAINT allowed_email_domains_pkey PRIMARY KEY (id);
ALTER TABLE ONLY analytics_cycle_analytics_aggregations
ADD CONSTRAINT analytics_cycle_analytics_aggregations_pkey PRIMARY KEY (group_id);
ALTER TABLE ONLY analytics_cycle_analytics_group_stages ALTER TABLE ONLY analytics_cycle_analytics_group_stages
ADD CONSTRAINT analytics_cycle_analytics_group_stages_pkey PRIMARY KEY (id); ADD CONSTRAINT analytics_cycle_analytics_group_stages_pkey PRIMARY KEY (id);
...@@ -25243,6 +25270,12 @@ CREATE INDEX approval_mr_rule_index_merge_request_id ON approval_merge_request_r ...@@ -25243,6 +25270,12 @@ CREATE INDEX approval_mr_rule_index_merge_request_id ON approval_merge_request_r
CREATE UNIQUE INDEX bulk_import_trackers_uniq_relation_by_entity ON bulk_import_trackers USING btree (bulk_import_entity_id, relation); CREATE UNIQUE INDEX bulk_import_trackers_uniq_relation_by_entity ON bulk_import_trackers USING btree (bulk_import_entity_id, relation);
CREATE INDEX ca_aggregations_last_consistency_check_updated_at ON analytics_cycle_analytics_aggregations USING btree (last_consistency_check_updated_at NULLS FIRST) WHERE (enabled IS TRUE);
CREATE INDEX ca_aggregations_last_full_run_at ON analytics_cycle_analytics_aggregations USING btree (last_full_run_at NULLS FIRST) WHERE (enabled IS TRUE);
CREATE INDEX ca_aggregations_last_incremental_run_at ON analytics_cycle_analytics_aggregations USING btree (last_incremental_run_at NULLS FIRST) WHERE (enabled IS TRUE);
CREATE INDEX cadence_create_iterations_automation ON iterations_cadences USING btree (automatic, duration_in_weeks, date((COALESCE(last_run_date, '1970-01-01'::date) + ((duration_in_weeks)::double precision * '7 days'::interval)))) WHERE (duration_in_weeks IS NOT NULL); CREATE INDEX cadence_create_iterations_automation ON iterations_cadences USING btree (automatic, duration_in_weeks, date((COALESCE(last_run_date, '1970-01-01'::date) + ((duration_in_weeks)::double precision * '7 days'::interval)))) WHERE (duration_in_weeks IS NOT NULL);
CREATE INDEX ci_builds_gitlab_monitor_metrics ON ci_builds USING btree (status, created_at, project_id) WHERE ((type)::text = 'Ci::Build'::text); CREATE INDEX ci_builds_gitlab_monitor_metrics ON ci_builds USING btree (status, created_at, project_id) WHERE ((type)::text = 'Ci::Build'::text);
...@@ -30367,6 +30400,9 @@ ALTER TABLE ONLY bulk_imports ...@@ -30367,6 +30400,9 @@ ALTER TABLE ONLY bulk_imports
ALTER TABLE ONLY diff_note_positions ALTER TABLE ONLY diff_note_positions
ADD CONSTRAINT fk_rails_13c7212859 FOREIGN KEY (note_id) REFERENCES notes(id) ON DELETE CASCADE; ADD CONSTRAINT fk_rails_13c7212859 FOREIGN KEY (note_id) REFERENCES notes(id) ON DELETE CASCADE;
ALTER TABLE ONLY analytics_cycle_analytics_aggregations
ADD CONSTRAINT fk_rails_13c8374c7a FOREIGN KEY (group_id) REFERENCES namespaces(id) ON DELETE CASCADE;
ALTER TABLE ONLY users_security_dashboard_projects ALTER TABLE ONLY users_security_dashboard_projects
ADD CONSTRAINT fk_rails_150cd5682c FOREIGN KEY (project_id) REFERENCES projects(id) ON DELETE CASCADE; ADD CONSTRAINT fk_rails_150cd5682c FOREIGN KEY (project_id) REFERENCES projects(id) ON DELETE CASCADE;
...@@ -12,6 +12,8 @@ class Analytics::CycleAnalytics::GroupValueStream < ApplicationRecord ...@@ -12,6 +12,8 @@ class Analytics::CycleAnalytics::GroupValueStream < ApplicationRecord
scope :preload_associated_models, -> { includes(:group, stages: [:group, :end_event_label, :start_event_label]) } scope :preload_associated_models, -> { includes(:group, stages: [:group, :end_event_label, :start_event_label]) }
after_save :ensure_aggregation_record_presence
def custom? def custom?
persisted? || name != Analytics::CycleAnalytics::Stages::BaseService::DEFAULT_VALUE_STREAM_NAME persisted? || name != Analytics::CycleAnalytics::Stages::BaseService::DEFAULT_VALUE_STREAM_NAME
end end
...@@ -19,4 +21,10 @@ class Analytics::CycleAnalytics::GroupValueStream < ApplicationRecord ...@@ -19,4 +21,10 @@ class Analytics::CycleAnalytics::GroupValueStream < ApplicationRecord
def self.build_default_value_stream(group) def self.build_default_value_stream(group)
new(name: Analytics::CycleAnalytics::Stages::BaseService::DEFAULT_VALUE_STREAM_NAME, group: group) new(name: Analytics::CycleAnalytics::Stages::BaseService::DEFAULT_VALUE_STREAM_NAME, group: group)
end end
private
def ensure_aggregation_record_presence
Analytics::CycleAnalytics::Aggregation.safe_create_for_group(group)
end
end end
...@@ -8,6 +8,7 @@ alert_management_alert_metric_images: :gitlab_main ...@@ -8,6 +8,7 @@ alert_management_alert_metric_images: :gitlab_main
alert_management_alert_user_mentions: :gitlab_main alert_management_alert_user_mentions: :gitlab_main
alert_management_http_integrations: :gitlab_main alert_management_http_integrations: :gitlab_main
allowed_email_domains: :gitlab_main allowed_email_domains: :gitlab_main
analytics_cycle_analytics_aggregations: :gitlab_main
analytics_cycle_analytics_group_stages: :gitlab_main analytics_cycle_analytics_group_stages: :gitlab_main
analytics_cycle_analytics_group_value_streams: :gitlab_main analytics_cycle_analytics_group_value_streams: :gitlab_main
analytics_cycle_analytics_issue_stage_events: :gitlab_main analytics_cycle_analytics_issue_stage_events: :gitlab_main
......
...@@ -18,6 +18,7 @@ RSpec.describe 'Database schema' do ...@@ -18,6 +18,7 @@ RSpec.describe 'Database schema' do
approvals: %w[user_id], approvals: %w[user_id],
approver_groups: %w[target_id], approver_groups: %w[target_id],
approvers: %w[target_id user_id], approvers: %w[target_id user_id],
analytics_cycle_analytics_aggregations: %w[last_full_run_issues_id last_full_run_merge_requests_id last_incremental_issues_id last_incremental_merge_requests_id],
analytics_cycle_analytics_merge_request_stage_events: %w[author_id group_id merge_request_id milestone_id project_id stage_event_hash_id state_id], analytics_cycle_analytics_merge_request_stage_events: %w[author_id group_id merge_request_id milestone_id project_id stage_event_hash_id state_id],
analytics_cycle_analytics_issue_stage_events: %w[author_id group_id issue_id milestone_id project_id stage_event_hash_id state_id], analytics_cycle_analytics_issue_stage_events: %w[author_id group_id issue_id milestone_id project_id stage_event_hash_id state_id],
audit_events: %w[author_id entity_id target_id], audit_events: %w[author_id entity_id target_id],
......
# frozen_string_literal: true
require 'spec_helper'
require_migration!
RSpec.describe BackfillCycleAnalyticsAggregations, :migration do
let(:migration) { described_class.new }
let(:aggregations) { table(:analytics_cycle_analytics_aggregations) }
let(:namespaces) { table(:namespaces) }
let(:group_value_streams) { table(:analytics_cycle_analytics_group_value_streams) }
context 'when there are value stream records' do
it 'inserts a record for each top-level namespace' do
group1 = namespaces.create!(path: 'aaa', name: 'aaa')
subgroup1 = namespaces.create!(path: 'bbb', name: 'bbb', parent_id: group1.id)
group2 = namespaces.create!(path: 'ccc', name: 'ccc')
namespaces.create!(path: 'ddd', name: 'ddd') # not used
group_value_streams.create!(name: 'for top level group', group_id: group2.id)
group_value_streams.create!(name: 'another for top level group', group_id: group2.id)
group_value_streams.create!(name: 'for subgroup', group_id: subgroup1.id)
group_value_streams.create!(name: 'another for subgroup', group_id: subgroup1.id)
migrate!
expect(aggregations.pluck(:group_id)).to match_array([group1.id, group2.id])
end
end
it 'does nothing' do
expect { migrate! }.not_to change { aggregations.count }
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Analytics::CycleAnalytics::Aggregation, type: :model do
describe 'associations' do
it { is_expected.to belong_to(:group).required }
end
describe 'validations' do
it { is_expected.not_to validate_presence_of(:group) }
it { is_expected.not_to validate_presence_of(:enabled) }
%i[incremental_runtimes_in_seconds incremental_processed_records last_full_run_runtimes_in_seconds last_full_run_processed_records].each do |column|
it "validates the array length of #{column}" do
record = described_class.new(column => [1] * 11)
expect(record).to be_invalid
expect(record.errors).to have_key(column)
end
end
end
describe '#safe_create_for_group' do
let_it_be(:group) { create(:group) }
let_it_be(:subgroup) { create(:group, parent: group) }
it 'creates the aggregation record' do
described_class.safe_create_for_group(group)
record = described_class.find_by(group_id: group)
expect(record).to be_present
end
context 'when non top-level group is given' do
it 'creates the aggregation record for the top-level group' do
described_class.safe_create_for_group(subgroup)
record = described_class.find_by(group_id: group)
expect(record).to be_present
end
end
context 'when the record is already present' do
it 'does nothing' do
described_class.safe_create_for_group(group)
expect do
described_class.safe_create_for_group(group)
described_class.safe_create_for_group(subgroup)
end.not_to change { described_class.count }
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment