Commit 6869975b authored by Fabio Pitino's avatar Fabio Pitino Committed by Bob Van Landuyt

Split CI minutes resets into different workers

with a feature flag, ClearSharedRunnersMinutesWorker defines the
batches based on ID ranges and spawns a Ci::BatchResetMinutesWorker
per batch. This will parallelize the processing having each worker
taking less and avoiding running for long time.
parent af527d07
......@@ -135,11 +135,6 @@ class Namespace < ApplicationRecord
name = host.delete_suffix(gitlab_host)
Namespace.where(parent_id: nil).by_path(name)
end
# overridden in ee
def reset_ci_minutes!(namespace_id)
false
end
end
def default_branch_protection
......
......@@ -21,6 +21,9 @@ class ProjectStatistics < ApplicationRecord
scope :for_project_ids, ->(project_ids) { where(project_id: project_ids) }
scope :for_namespaces, -> (namespaces) { where(namespace: namespaces) }
scope :with_any_ci_minutes_used, -> { where.not(shared_runners_seconds: 0) }
def total_repository_size
repository_size + lfs_objects_size
end
......
......@@ -34,15 +34,12 @@ module EE
scope :include_gitlab_subscription, -> { includes(:gitlab_subscription) }
scope :join_gitlab_subscription, -> { joins("LEFT OUTER JOIN gitlab_subscriptions ON gitlab_subscriptions.namespace_id=namespaces.id") }
scope :with_shared_runners_minutes_limit, -> { where("namespaces.shared_runners_minutes_limit > 0") }
scope :with_extra_shared_runners_minutes_limit, -> { where("namespaces.extra_shared_runners_minutes_limit > 0") }
scope :with_shared_runners_minutes_exceeding_default_limit, -> do
where('namespace_statistics.namespace_id = namespaces.id')
.where('namespace_statistics.shared_runners_seconds > (namespaces.shared_runners_minutes_limit * 60)')
end
scope :with_ci_minutes_notification_sent, -> do
where('last_ci_minutes_notification_at IS NOT NULL OR last_ci_minutes_usage_notification_level IS NOT NULL')
scope :requiring_ci_extra_minutes_recalculation, -> do
joins(:namespace_statistics)
.where('namespaces.shared_runners_minutes_limit > 0')
.where('namespaces.extra_shared_runners_minutes_limit > 0')
.where('namespace_statistics.shared_runners_seconds > (namespaces.shared_runners_minutes_limit * 60)')
end
scope :with_feature_available_in_plan, -> (feature) do
......@@ -77,72 +74,9 @@ module EE
class_methods do
extend ::Gitlab::Utils::Override
NamespaceStatisticsNotResetError = Class.new(StandardError)
def plans_with_feature(feature)
LICENSE_PLANS_TO_NAMESPACE_PLANS.values_at(*License.plans_with_feature(feature))
end
def reset_ci_minutes_in_batches!
each_batch do |namespaces|
reset_ci_minutes!(namespaces)
end
end
def reset_ci_minutes_for_batch!(from_id, to_id, batch_size: 1000)
where(id: from_id..to_id).each_batch(of: batch_size) do |namespaces|
reset_ci_minutes!(namespaces)
end
end
# ensure that recalculation of extra shared runners minutes occurs in the same
# transaction as the reset of the namespace statistics. If the transaction fails
# none of the changes apply but the numbers still remain consistent with each other.
override :reset_ci_minutes!
def reset_ci_minutes!(namespaces)
transaction do
recalculate_extra_shared_runners_minutes_limits!(namespaces)
reset_shared_runners_seconds!(namespaces)
reset_ci_minutes_notifications!(namespaces)
end
true
rescue ActiveRecord::ActiveRecordError
# We don't need to print thousands of namespace_ids
# in the message if all batches failed.
# A small batch would be sufficient for investigation.
failed_namespace_ids = namespaces.first(10).pluck(:id)
raise EE::Namespace::NamespaceStatisticsNotResetError,
"#{namespaces.size} namespace shared runner minutes were not reset and the transaction was rolled back. Namespace Ids: #{failed_namespace_ids}"
end
def extra_minutes_left_sql
"GREATEST((namespaces.shared_runners_minutes_limit + namespaces.extra_shared_runners_minutes_limit) - ROUND(namespace_statistics.shared_runners_seconds / 60.0), 0)"
end
def recalculate_extra_shared_runners_minutes_limits!(namespaces)
namespaces
.with_shared_runners_minutes_limit
.with_extra_shared_runners_minutes_limit
.with_shared_runners_minutes_exceeding_default_limit
.update_all("extra_shared_runners_minutes_limit = #{extra_minutes_left_sql} FROM namespace_statistics")
end
def reset_shared_runners_seconds!(namespaces)
NamespaceStatistics
.where(namespace: namespaces)
.where.not(shared_runners_seconds: 0)
.update_all(shared_runners_seconds: 0, shared_runners_seconds_last_reset: Time.current)
::ProjectStatistics
.where(namespace: namespaces)
.where.not(shared_runners_seconds: 0)
.update_all(shared_runners_seconds: 0, shared_runners_seconds_last_reset: Time.current)
end
def reset_ci_minutes_notifications!(namespaces)
namespaces.update_all(last_ci_minutes_notification_at: nil, last_ci_minutes_usage_notification_level: nil)
end
end
override :move_dir
......
......@@ -5,6 +5,9 @@ class NamespaceStatistics < ApplicationRecord
validates :namespace, presence: true
scope :for_namespaces, -> (namespaces) { where(namespace: namespaces) }
scope :with_any_ci_minutes_used, -> { where.not(shared_runners_seconds: 0) }
def shared_runners_minutes(include_extra: true)
minutes = shared_runners_seconds.to_i / 60
......
# frozen_string_literal: true
module Ci
module Minutes
class BatchResetService
BatchNotResetError = Class.new(StandardError)
BATCH_SIZE = 1000.freeze
def execute!(ids_range: nil, batch_size: BATCH_SIZE)
relation = Namespace
relation = relation.id_in(ids_range) if ids_range
relation.each_batch(of: batch_size) do |namespaces|
reset_ci_minutes!(namespaces)
end
end
private
# ensure that recalculation of extra shared runners minutes occurs in the same
# transaction as the reset of the namespace statistics. If the transaction fails
# none of the changes apply but the numbers still remain consistent with each other.
def reset_ci_minutes!(namespaces)
Namespace.transaction do
recalculate_extra_shared_runners_minutes_limits!(namespaces)
reset_shared_runners_seconds!(namespaces)
reset_ci_minutes_notifications!(namespaces)
end
rescue ActiveRecord::ActiveRecordError
# We don't need to print a thousand of namespace_ids
# in the message if all batches failed.
# A small batch would be sufficient for investigation.
failed_namespace_ids = namespaces.limit(10).ids # rubocop: disable CodeReuse/ActiveRecord
raise BatchNotResetError.new(
"#{namespaces.size} namespace shared runner minutes were not reset and the transaction was rolled back. Namespace Ids: #{failed_namespace_ids}")
end
def recalculate_extra_shared_runners_minutes_limits!(namespaces)
namespaces
.requiring_ci_extra_minutes_recalculation
.update_all("extra_shared_runners_minutes_limit = #{extra_minutes_left_sql} FROM namespace_statistics")
end
def extra_minutes_left_sql
"GREATEST((namespaces.shared_runners_minutes_limit + namespaces.extra_shared_runners_minutes_limit) - ROUND(namespace_statistics.shared_runners_seconds / 60.0), 0)"
end
def reset_shared_runners_seconds!(namespaces)
NamespaceStatistics
.for_namespaces(namespaces)
.with_any_ci_minutes_used
.update_all(shared_runners_seconds: 0, shared_runners_seconds_last_reset: Time.current)
::ProjectStatistics
.for_namespaces(namespaces)
.with_any_ci_minutes_used
.update_all(shared_runners_seconds: 0, shared_runners_seconds_last_reset: Time.current)
end
def reset_ci_minutes_notifications!(namespaces)
namespaces.update_all(
last_ci_minutes_notification_at: nil,
last_ci_minutes_usage_notification_level: nil)
end
end
end
end
......@@ -10,7 +10,7 @@ module Ci
def perform(from_id, to_id)
return unless Feature.enabled?(:ci_parallel_minutes_reset, default_enabled: true)
Namespace.reset_ci_minutes_for_batch!(from_id, to_id)
::Ci::Minutes::BatchResetService.new.execute!(ids_range: (from_id..to_id))
end
end
end
......@@ -24,7 +24,7 @@ class ClearSharedRunnersMinutesWorker # rubocop:disable Scalability/IdempotentWo
else
return unless try_obtain_lease
Namespace.reset_ci_minutes_in_batches!
Ci::Minutes::BatchResetService.new.execute!
end
end
......
......@@ -49,129 +49,6 @@ describe Namespace do
end
end
describe '.reset_ci_minutes_for_batch!' do
let(:batch_size) { 3 }
let!(:namespace_1) { create(:namespace, id: 1) }
let!(:namespace_2) { create(:namespace, id: 2) }
let!(:namespace_3) { create(:namespace, id: 3) }
let!(:namespace_4) { create(:namespace, id: 4) }
let!(:namespace_5) { create(:namespace, id: 5) }
let!(:namespace_6) { create(:namespace, id: 6) }
let!(:namespace_7) { create(:namespace, id: 7) }
it 'resets all ci minutes' do
expect(described_class).to receive(:reset_ci_minutes!).with([namespace_1, namespace_2, namespace_3])
expect(described_class).to receive(:reset_ci_minutes!).with([namespace_4, namespace_5, namespace_6])
expect(described_class).to receive(:reset_ci_minutes!).with([namespace_7])
described_class.reset_ci_minutes_for_batch!(1, 7, batch_size: batch_size)
end
it 'resets ci minutes for the ID range' do
expect(described_class).to receive(:reset_ci_minutes!).with([namespace_2, namespace_3, namespace_4])
expect(described_class).to receive(:reset_ci_minutes!).with([namespace_5, namespace_6])
described_class.reset_ci_minutes_for_batch!(2, 6, batch_size: batch_size)
end
end
describe '.reset_ci_minutes_in_batches!' do
it 'returns when there were no failures' do
expect { described_class.reset_ci_minutes_in_batches! }.not_to raise_error
end
it 'raises an exception when with a list of namespace ids to investigate if there were any failures' do
failed_namespace = create(:namespace)
allow(described_class).to receive(:transaction).and_raise(ActiveRecord::ActiveRecordError)
expect { described_class.reset_ci_minutes_in_batches! }.to raise_error(
EE::Namespace::NamespaceStatisticsNotResetError,
"1 namespace shared runner minutes were not reset and the transaction was rolled back. Namespace Ids: [#{failed_namespace.id}]")
end
end
describe '.reset_ci_minutes!' do
it 'returns true if there were no exceptions to the db transaction' do
result = described_class.reset_ci_minutes!(Namespace.none)
expect(result).to be true
end
it 'raises an exception if anything in the transaction rolled back' do
namespace = create(:namespace)
allow(described_class).to receive(:transaction).and_raise(ActiveRecord::ActiveRecordError)
expect { described_class.reset_ci_minutes!([namespace]) }.to raise_error(
EE::Namespace::NamespaceStatisticsNotResetError,
"1 namespace shared runner minutes were not reset and the transaction was rolled back. Namespace Ids: [#{namespace.id}]")
end
end
describe '.recalculate_extra_shared_runners_minutes_limits!' do
context 'when the namespace had used runner minutes for the month' do
let(:namespace) { create(:namespace, shared_runners_minutes_limit: 5000, extra_shared_runners_minutes_limit: 5000) }
let(:namespaces) { Namespace.where(id: namespace) }
it 'updates the namespace extra_shared_runners_minutes_limit subtracting used minutes above the shared_runners_minutes_limit' do
minutes_used = 6000
create(:namespace_statistics, namespace: namespace, shared_runners_seconds: minutes_used * 60)
described_class.recalculate_extra_shared_runners_minutes_limits!(namespaces)
expect(namespace.reload.extra_shared_runners_minutes_limit).to eq(4000)
end
end
end
describe '.reset_shared_runners_seconds!' do
let(:namespace) do
create(:namespace,
shared_runners_minutes_limit: 5000,
extra_shared_runners_minutes_limit: 5000)
end
subject do
described_class.reset_shared_runners_seconds!(Namespace.where(id: namespace))
end
it 'resets NamespaceStatistics shared_runners_seconds and updates the timestamp' do
namespace_statistics = create(:namespace_statistics,
namespace: namespace,
shared_runners_seconds: 360000 )
expect { subject && namespace_statistics.reload }
.to change { namespace_statistics.shared_runners_seconds }.to(0)
.and change { namespace_statistics.shared_runners_seconds_last_reset }
end
it 'resets ProjectStatistics shared_runners_seconds and updates the timestamp' do
project_statistics = create(:project_statistics,
namespace: namespace,
shared_runners_seconds: 120)
expect { subject && project_statistics.reload }
.to change { project_statistics.shared_runners_seconds }.to(0)
.and change { project_statistics.shared_runners_seconds_last_reset }
end
end
describe 'reset_ci_minutes_notifications!' do
it 'updates the last_ci_minutes_notification_at and last_ci_minutes_usage_notification_level flags' do
namespace = create(:namespace,
last_ci_minutes_notification_at: Date.yesterday,
last_ci_minutes_usage_notification_level: 50 )
subject = described_class.reset_ci_minutes_notifications!(Namespace.where(id: namespace))
expect { subject && namespace.reload }
.to change { namespace.last_ci_minutes_notification_at }.to(nil)
.and change { namespace.last_ci_minutes_usage_notification_level }.to(nil)
end
end
describe '#use_elasticsearch?' do
let(:namespace) { create :namespace }
......
# frozen_string_literal: true
require 'spec_helper'
describe Ci::Minutes::BatchResetService do
let(:service) { described_class.new }
describe '#execute!' do
def create_namespace_with_project(id, seconds_used)
namespace = create(:namespace,
id: id,
shared_runners_minutes_limit: 100,
extra_shared_runners_minutes_limit: 50,
last_ci_minutes_notification_at: Time.now,
last_ci_minutes_usage_notification_level: 30)
create(:namespace_statistics, namespace: namespace, shared_runners_seconds: seconds_used)
create(:project, namespace: namespace).tap do |project|
create(:project_statistics,
project: project,
namespace: namespace,
shared_runners_seconds: seconds_used)
end
namespace
end
subject { service.execute!(ids_range: ids_range, batch_size: 3) }
let!(:namespace_1) { create_namespace_with_project(1, 120.minutes) }
let!(:namespace_2) { create_namespace_with_project(2, 120.minutes) }
let!(:namespace_3) { create_namespace_with_project(3, 120.minutes) }
let!(:namespace_4) { create_namespace_with_project(4, 90.minutes) }
let!(:namespace_5) { create_namespace_with_project(5, 90.minutes) }
let!(:namespace_6) { create_namespace_with_project(6, 90.minutes) }
context 'when ID range is provided' do
let(:ids_range) { (1..5) }
let(:namespaces_exceeding_minutes) { [namespace_1, namespace_2, namespace_3] }
let(:namespaces_not_exceeding_minutes) { [namespace_4, namespace_5] }
it 'resets minutes in batches for the given range' do
expect(service).to receive(:reset_ci_minutes!).with([namespace_1, namespace_2, namespace_3])
expect(service).to receive(:reset_ci_minutes!).with([namespace_4, namespace_5])
subject
end
it 'resets CI minutes and recalculates purchased minutes for the namespace exceeding the monthly minutes' do
subject
namespaces_exceeding_minutes.each do |namespace|
namespace.reset
expect(namespace.extra_shared_runners_minutes_limit).to eq 30
expect(namespace.namespace_statistics.shared_runners_seconds).to eq 0
expect(namespace.namespace_statistics.shared_runners_seconds_last_reset).to be_present
expect(ProjectStatistics.find_by(namespace: namespace).shared_runners_seconds).to eq 0
expect(ProjectStatistics.find_by(namespace: namespace).shared_runners_seconds_last_reset).to be_present
expect(namespace.last_ci_minutes_notification_at).to be_nil
expect(namespace.last_ci_minutes_usage_notification_level).to be_nil
end
end
it 'resets CI minutes but does not recalculate purchased minutes for the namespace not exceeding the monthly minutes' do
subject
namespaces_not_exceeding_minutes.each do |namespace|
namespace.reset
expect(namespace.extra_shared_runners_minutes_limit).to eq 50
expect(namespace.namespace_statistics.shared_runners_seconds).to eq 0
expect(namespace.namespace_statistics.shared_runners_seconds_last_reset).to be_present
expect(ProjectStatistics.find_by(namespace: namespace).shared_runners_seconds).to eq 0
expect(ProjectStatistics.find_by(namespace: namespace).shared_runners_seconds_last_reset).to be_present
expect(namespace.last_ci_minutes_notification_at).to be_nil
expect(namespace.last_ci_minutes_usage_notification_level).to be_nil
end
end
end
context 'when ID range is not provided' do
let(:ids_range) { nil }
it 'resets minutes in batches for all namespaces' do
expect(service).to receive(:reset_ci_minutes!).with([namespace_1, namespace_2, namespace_3])
expect(service).to receive(:reset_ci_minutes!).with([namespace_4, namespace_5, namespace_6])
subject
end
end
context 'when an ActiveRecordError is raised' do
let(:ids_range) { nil }
before do
allow(Namespace).to receive(:transaction).and_raise(ActiveRecord::ActiveRecordError)
end
it 'decorates the error with more information' do
expect { subject }
.to raise_error(
Ci::Minutes::BatchResetService::BatchNotResetError,
'3 namespace shared runner minutes were not reset and the transaction was rolled back. Namespace Ids: [1, 2, 3]')
end
end
end
end
......@@ -32,35 +32,50 @@ describe Ci::BatchResetMinutesWorker do
create(:namespace_statistics, namespace: last_namespace, shared_runners_seconds: 90.minutes)
end
include_examples 'an idempotent worker' do
let(:job_args) { [first_namespace.id, last_namespace.id] }
it 'delegates to Ci::Minutes::BatchResetService' do
expect_next_instance_of(Ci::Minutes::BatchResetService) do |service|
expect(service)
.to receive(:execute!)
.with(ids_range: ((first_namespace.id)..(last_namespace.id)))
end
it 'delegates to Namespace method' do
expect(Namespace).to receive(:reset_ci_minutes!).with([first_namespace, last_namespace]).twice
worker.perform(first_namespace.id, last_namespace.id)
end
it_behaves_like 'an idempotent worker' do
let(:job_args) { [first_namespace.id, last_namespace.id] }
shared_examples 'resets CI minutes and notification' do
it 'resets CI minutes used and notification data' do
subject
namespace.reset
expect(namespace.namespace_statistics.shared_runners_seconds).to eq 0
expect(namespace.last_ci_minutes_notification_at).to be_nil
expect(namespace.last_ci_minutes_usage_notification_level).to be_nil
end
end
it_behaves_like 'resets CI minutes and notification' do
let(:namespace) { first_namespace }
end
it_behaves_like 'resets CI minutes and notification' do
let(:namespace) { last_namespace }
end
it 'resets CI minutes and recalculates purchased minutes for the namespace exceeding the monthly minutes' do
it 'recalculates purchased minutes for the namespace exceeding the monthly minutes' do
subject
first_namespace.reset
first_namespace_statistics.reset
expect(first_namespace.extra_shared_runners_minutes_limit).to eq 30
expect(first_namespace_statistics.shared_runners_seconds).to eq 0
expect(first_namespace.last_ci_minutes_notification_at).to be_nil
expect(first_namespace.last_ci_minutes_usage_notification_level).to be_nil
expect(first_namespace.reset.extra_shared_runners_minutes_limit).to eq 30
end
it 'resets CI minutes but does not recalculate purchased minutes for the namespace not exceeding the monthly minutes' do
it 'does not recalculate purchased minutes for the namespace not exceeding the monthly minutes' do
subject
last_namespace.reset
last_namespace_statistics.reset
expect(last_namespace.extra_shared_runners_minutes_limit).to eq 50
expect(last_namespace_statistics.shared_runners_seconds).to eq 0
expect(last_namespace.last_ci_minutes_notification_at).to be_nil
expect(last_namespace.last_ci_minutes_usage_notification_level).to be_nil
expect(last_namespace.reset.extra_shared_runners_minutes_limit).to eq 50
end
end
......@@ -69,8 +84,8 @@ describe Ci::BatchResetMinutesWorker do
stub_feature_flags(ci_parallel_minutes_reset: false)
end
it 'does not delegate to Namespace method' do
expect(Namespace).not_to receive(:reset_ci_minutes!)
it 'does not call Ci::Minutes::BatchResetService' do
expect(Ci::Minutes::BatchResetService).not_to receive(:new)
worker.perform(first_namespace.id, last_namespace.id)
end
......
......@@ -49,7 +49,7 @@ describe ClearSharedRunnersMinutesWorker do
it 'raises an exception' do
expect { worker.perform }.to raise_error(
EE::Namespace::NamespaceStatisticsNotResetError,
Ci::Minutes::BatchResetService::BatchNotResetError,
"#{namespace_ids.count} namespace shared runner minutes were not reset and the transaction was rolled back. Namespace Ids: #{namespace_ids}")
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment