Commit 4916c1dd authored by Fabio Pitino's avatar Fabio Pitino Committed by Mayra Cabrera

Split CI minutes resets into different workers

with a feature flag, ClearSharedRunnersMinutesWorker defines the
batches based on ID ranges and spawns a Ci::BatchResetMinutesWorker
per batch. This will parallelize the processing having each worker
taking less and avoiding running for long time.
parent 0a0077c3
...@@ -44,6 +44,8 @@ ...@@ -44,6 +44,8 @@
- 2 - 2
- - chat_notification - - chat_notification
- 2 - 2
- - ci_batch_reset_minutes
- 1
- - container_repository - - container_repository
- 1 - 1
- - create_commit_signature - - create_commit_signature
......
...@@ -85,8 +85,13 @@ module EE ...@@ -85,8 +85,13 @@ module EE
def reset_ci_minutes_in_batches! def reset_ci_minutes_in_batches!
each_batch do |namespaces| each_batch do |namespaces|
namespace_ids = namespaces.pluck(:id) reset_ci_minutes!(namespaces)
reset_ci_minutes!(namespace_ids) end
end
def reset_ci_minutes_for_batch!(from_id, to_id, batch_size: 1000)
where(id: from_id..to_id).each_batch(of: batch_size) do |namespaces|
reset_ci_minutes!(namespaces)
end end
end end
...@@ -94,50 +99,49 @@ module EE ...@@ -94,50 +99,49 @@ module EE
# transaction as the reset of the namespace statistics. If the transaction fails # transaction as the reset of the namespace statistics. If the transaction fails
# none of the changes apply but the numbers still remain consistent with each other. # none of the changes apply but the numbers still remain consistent with each other.
override :reset_ci_minutes! override :reset_ci_minutes!
def reset_ci_minutes!(namespace_ids) def reset_ci_minutes!(namespaces)
transaction do transaction do
recalculate_extra_shared_runners_minutes_limits!(namespace_ids) recalculate_extra_shared_runners_minutes_limits!(namespaces)
reset_shared_runners_seconds!(namespace_ids) reset_shared_runners_seconds!(namespaces)
reset_ci_minutes_notifications!(namespace_ids) reset_ci_minutes_notifications!(namespaces)
end end
true true
rescue ActiveRecord::ActiveRecordError rescue ActiveRecord::ActiveRecordError
# We don't need to print thousands of namespace_ids # We don't need to print thousands of namespace_ids
# in the message if all batches failed. # in the message if all batches failed.
# A small batch would be sufficient for investigation. # A small batch would be sufficient for investigation.
failed_namespace_ids = namespace_ids.first(10) failed_namespace_ids = namespaces.first(10).pluck(:id)
raise EE::Namespace::NamespaceStatisticsNotResetError, raise EE::Namespace::NamespaceStatisticsNotResetError,
"#{namespace_ids.count} namespace shared runner minutes were not reset and the transaction was rolled back. Namespace Ids: #{failed_namespace_ids}" "#{namespaces.size} namespace shared runner minutes were not reset and the transaction was rolled back. Namespace Ids: #{failed_namespace_ids}"
end end
def extra_minutes_left_sql def extra_minutes_left_sql
"GREATEST((namespaces.shared_runners_minutes_limit + namespaces.extra_shared_runners_minutes_limit) - ROUND(namespace_statistics.shared_runners_seconds / 60.0), 0)" "GREATEST((namespaces.shared_runners_minutes_limit + namespaces.extra_shared_runners_minutes_limit) - ROUND(namespace_statistics.shared_runners_seconds / 60.0), 0)"
end end
def recalculate_extra_shared_runners_minutes_limits!(namespace_ids) def recalculate_extra_shared_runners_minutes_limits!(namespaces)
where(id: namespace_ids) namespaces
.with_shared_runners_minutes_limit .with_shared_runners_minutes_limit
.with_extra_shared_runners_minutes_limit .with_extra_shared_runners_minutes_limit
.with_shared_runners_minutes_exceeding_default_limit .with_shared_runners_minutes_exceeding_default_limit
.update_all("extra_shared_runners_minutes_limit = #{extra_minutes_left_sql} FROM namespace_statistics") .update_all("extra_shared_runners_minutes_limit = #{extra_minutes_left_sql} FROM namespace_statistics")
end end
def reset_shared_runners_seconds!(namespace_ids) def reset_shared_runners_seconds!(namespaces)
NamespaceStatistics NamespaceStatistics
.where(namespace: namespace_ids) .where(namespace: namespaces)
.where.not(shared_runners_seconds: 0) .where.not(shared_runners_seconds: 0)
.update_all(shared_runners_seconds: 0, shared_runners_seconds_last_reset: Time.current) .update_all(shared_runners_seconds: 0, shared_runners_seconds_last_reset: Time.current)
::ProjectStatistics ::ProjectStatistics
.where(namespace: namespace_ids) .where(namespace: namespaces)
.where.not(shared_runners_seconds: 0) .where.not(shared_runners_seconds: 0)
.update_all(shared_runners_seconds: 0, shared_runners_seconds_last_reset: Time.current) .update_all(shared_runners_seconds: 0, shared_runners_seconds_last_reset: Time.current)
end end
def reset_ci_minutes_notifications!(namespace_ids) def reset_ci_minutes_notifications!(namespaces)
where(id: namespace_ids) namespaces.update_all(last_ci_minutes_notification_at: nil, last_ci_minutes_usage_notification_level: nil)
.update_all(last_ci_minutes_notification_at: nil, last_ci_minutes_usage_notification_level: nil)
end end
end end
......
...@@ -472,6 +472,13 @@ ...@@ -472,6 +472,13 @@
:resource_boundary: :unknown :resource_boundary: :unknown
:weight: 1 :weight: 1
:idempotent: true :idempotent: true
- :name: ci_batch_reset_minutes
:feature_category: :continuous_integration
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
- :name: create_github_webhook - :name: create_github_webhook
:feature_category: :integrations :feature_category: :integrations
:has_external_dependencies: true :has_external_dependencies: true
......
# frozen_string_literal: true
module Ci
class BatchResetMinutesWorker
include ApplicationWorker
feature_category :continuous_integration
idempotent!
def perform(from_id, to_id)
return unless Feature.enabled?(:ci_parallel_minutes_reset, default_enabled: true)
Namespace.reset_ci_minutes_for_batch!(from_id, to_id)
end
end
end
# frozen_string_literal: true # frozen_string_literal: true
class ClearSharedRunnersMinutesWorker # rubocop:disable Scalability/IdempotentWorker class ClearSharedRunnersMinutesWorker # rubocop:disable Scalability/IdempotentWorker
LEASE_TIMEOUT = 3600
include ApplicationWorker include ApplicationWorker
# rubocop:disable Scalability/CronWorkerContext # rubocop:disable Scalability/CronWorkerContext
# This worker does not perform work scoped to a context # This worker does not perform work scoped to a context
...@@ -11,10 +9,23 @@ class ClearSharedRunnersMinutesWorker # rubocop:disable Scalability/IdempotentWo ...@@ -11,10 +9,23 @@ class ClearSharedRunnersMinutesWorker # rubocop:disable Scalability/IdempotentWo
# rubocop:enable Scalability/CronWorkerContext # rubocop:enable Scalability/CronWorkerContext
feature_category :continuous_integration feature_category :continuous_integration
LEASE_TIMEOUT = 3600
BATCH_SIZE = 100_000
def perform def perform
return unless try_obtain_lease if Feature.enabled?(:ci_parallel_minutes_reset, default_enabled: true)
start_id = Namespace.minimum(:id)
last_id = Namespace.maximum(:id)
(start_id..last_id).step(BATCH_SIZE) do |batch_start_id|
batch_end_id = batch_start_id + BATCH_SIZE - 1
Ci::BatchResetMinutesWorker.perform_async(batch_start_id, batch_end_id)
end
else
return unless try_obtain_lease
Namespace.reset_ci_minutes_in_batches! Namespace.reset_ci_minutes_in_batches!
end
end end
private private
......
---
title: Split CI minutes resets into different workers
merge_request: 29017
author:
type: fixed
...@@ -49,6 +49,33 @@ describe Namespace do ...@@ -49,6 +49,33 @@ describe Namespace do
end end
end end
describe '.reset_ci_minutes_for_batch!' do
let(:batch_size) { 3 }
let!(:namespace_1) { create(:namespace, id: 1) }
let!(:namespace_2) { create(:namespace, id: 2) }
let!(:namespace_3) { create(:namespace, id: 3) }
let!(:namespace_4) { create(:namespace, id: 4) }
let!(:namespace_5) { create(:namespace, id: 5) }
let!(:namespace_6) { create(:namespace, id: 6) }
let!(:namespace_7) { create(:namespace, id: 7) }
it 'resets all ci minutes' do
expect(described_class).to receive(:reset_ci_minutes!).with([namespace_1, namespace_2, namespace_3])
expect(described_class).to receive(:reset_ci_minutes!).with([namespace_4, namespace_5, namespace_6])
expect(described_class).to receive(:reset_ci_minutes!).with([namespace_7])
described_class.reset_ci_minutes_for_batch!(1, 7, batch_size: batch_size)
end
it 'resets ci minutes for the ID range' do
expect(described_class).to receive(:reset_ci_minutes!).with([namespace_2, namespace_3, namespace_4])
expect(described_class).to receive(:reset_ci_minutes!).with([namespace_5, namespace_6])
described_class.reset_ci_minutes_for_batch!(2, 6, batch_size: batch_size)
end
end
describe '.reset_ci_minutes_in_batches!' do describe '.reset_ci_minutes_in_batches!' do
it 'returns when there were no failures' do it 'returns when there were no failures' do
expect { described_class.reset_ci_minutes_in_batches! }.not_to raise_error expect { described_class.reset_ci_minutes_in_batches! }.not_to raise_error
...@@ -67,7 +94,7 @@ describe Namespace do ...@@ -67,7 +94,7 @@ describe Namespace do
describe '.reset_ci_minutes!' do describe '.reset_ci_minutes!' do
it 'returns true if there were no exceptions to the db transaction' do it 'returns true if there were no exceptions to the db transaction' do
result = described_class.reset_ci_minutes!([]) result = described_class.reset_ci_minutes!(Namespace.none)
expect(result).to be true expect(result).to be true
end end
...@@ -77,7 +104,7 @@ describe Namespace do ...@@ -77,7 +104,7 @@ describe Namespace do
allow(described_class).to receive(:transaction).and_raise(ActiveRecord::ActiveRecordError) allow(described_class).to receive(:transaction).and_raise(ActiveRecord::ActiveRecordError)
expect { described_class.reset_ci_minutes!([namespace.id]) }.to raise_error( expect { described_class.reset_ci_minutes!([namespace]) }.to raise_error(
EE::Namespace::NamespaceStatisticsNotResetError, EE::Namespace::NamespaceStatisticsNotResetError,
"1 namespace shared runner minutes were not reset and the transaction was rolled back. Namespace Ids: [#{namespace.id}]") "1 namespace shared runner minutes were not reset and the transaction was rolled back. Namespace Ids: [#{namespace.id}]")
end end
...@@ -86,12 +113,13 @@ describe Namespace do ...@@ -86,12 +113,13 @@ describe Namespace do
describe '.recalculate_extra_shared_runners_minutes_limits!' do describe '.recalculate_extra_shared_runners_minutes_limits!' do
context 'when the namespace had used runner minutes for the month' do context 'when the namespace had used runner minutes for the month' do
let(:namespace) { create(:namespace, shared_runners_minutes_limit: 5000, extra_shared_runners_minutes_limit: 5000) } let(:namespace) { create(:namespace, shared_runners_minutes_limit: 5000, extra_shared_runners_minutes_limit: 5000) }
let(:namespaces) { Namespace.where(id: namespace) }
it 'updates the namespace extra_shared_runners_minutes_limit subtracting used minutes above the shared_runners_minutes_limit' do it 'updates the namespace extra_shared_runners_minutes_limit subtracting used minutes above the shared_runners_minutes_limit' do
minutes_used = 6000 minutes_used = 6000
create(:namespace_statistics, namespace: namespace, shared_runners_seconds: minutes_used * 60) create(:namespace_statistics, namespace: namespace, shared_runners_seconds: minutes_used * 60)
described_class.recalculate_extra_shared_runners_minutes_limits!([namespace.id]) described_class.recalculate_extra_shared_runners_minutes_limits!(namespaces)
expect(namespace.reload.extra_shared_runners_minutes_limit).to eq(4000) expect(namespace.reload.extra_shared_runners_minutes_limit).to eq(4000)
end end
...@@ -106,7 +134,7 @@ describe Namespace do ...@@ -106,7 +134,7 @@ describe Namespace do
end end
subject do subject do
described_class.reset_shared_runners_seconds!([namespace.id]) described_class.reset_shared_runners_seconds!(Namespace.where(id: namespace))
end end
it 'resets NamespaceStatistics shared_runners_seconds and updates the timestamp' do it 'resets NamespaceStatistics shared_runners_seconds and updates the timestamp' do
...@@ -136,7 +164,7 @@ describe Namespace do ...@@ -136,7 +164,7 @@ describe Namespace do
last_ci_minutes_notification_at: Date.yesterday, last_ci_minutes_notification_at: Date.yesterday,
last_ci_minutes_usage_notification_level: 50 ) last_ci_minutes_usage_notification_level: 50 )
subject = described_class.reset_ci_minutes_notifications!([namespace.id]) subject = described_class.reset_ci_minutes_notifications!(Namespace.where(id: namespace))
expect { subject && namespace.reload } expect { subject && namespace.reload }
.to change { namespace.last_ci_minutes_notification_at }.to(nil) .to change { namespace.last_ci_minutes_notification_at }.to(nil)
......
# frozen_string_literal: true
require 'spec_helper'
describe Ci::BatchResetMinutesWorker do
let(:worker) { described_class.new }
describe '#perform' do
let(:first_namespace) do
create(:namespace,
id: 1,
shared_runners_minutes_limit: 100,
extra_shared_runners_minutes_limit: 50,
last_ci_minutes_notification_at: Time.now,
last_ci_minutes_usage_notification_level: 30)
end
let(:last_namespace) do
create(:namespace,
id: 10,
shared_runners_minutes_limit: 100,
extra_shared_runners_minutes_limit: 50,
last_ci_minutes_notification_at: Time.now,
last_ci_minutes_usage_notification_level: 30)
end
let!(:first_namespace_statistics) do
create(:namespace_statistics, namespace: first_namespace, shared_runners_seconds: 120.minutes)
end
let!(:last_namespace_statistics) do
create(:namespace_statistics, namespace: last_namespace, shared_runners_seconds: 90.minutes)
end
include_examples 'an idempotent worker' do
let(:job_args) { [first_namespace.id, last_namespace.id] }
it 'delegates to Namespace method' do
expect(Namespace).to receive(:reset_ci_minutes!).with([first_namespace, last_namespace]).twice
subject
end
it 'resets CI minutes and recalculates purchased minutes for the namespace exceeding the monthly minutes' do
subject
first_namespace.reset
first_namespace_statistics.reset
expect(first_namespace.extra_shared_runners_minutes_limit).to eq 30
expect(first_namespace_statistics.shared_runners_seconds).to eq 0
expect(first_namespace.last_ci_minutes_notification_at).to be_nil
expect(first_namespace.last_ci_minutes_usage_notification_level).to be_nil
end
it 'resets CI minutes but does not recalculate purchased minutes for the namespace not exceeding the monthly minutes' do
subject
last_namespace.reset
last_namespace_statistics.reset
expect(last_namespace.extra_shared_runners_minutes_limit).to eq 50
expect(last_namespace_statistics.shared_runners_seconds).to eq 0
expect(last_namespace.last_ci_minutes_notification_at).to be_nil
expect(last_namespace.last_ci_minutes_usage_notification_level).to be_nil
end
end
context 'when feature flag ci_parallel_minutes_reset is disabled' do
before do
stub_feature_flags(ci_parallel_minutes_reset: false)
end
it 'does not delegate to Namespace method' do
expect(Namespace).not_to receive(:reset_ci_minutes!)
worker.perform(first_namespace.id, last_namespace.id)
end
end
end
end
...@@ -6,126 +6,154 @@ describe ClearSharedRunnersMinutesWorker do ...@@ -6,126 +6,154 @@ describe ClearSharedRunnersMinutesWorker do
let(:worker) { described_class.new } let(:worker) { described_class.new }
describe '#perform' do describe '#perform' do
let(:namespace) { create(:namespace) }
before do
expect_next_instance_of(described_class) do |instance|
expect(instance).to receive(:try_obtain_lease).and_return(true)
end
end
subject { worker.perform } subject { worker.perform }
context 'when project statistics are defined' do context 'when ci_parallel_minutes_reset feature flag is disabled' do
let(:project) { create(:project, namespace: namespace) } let(:namespace) { create(:namespace) }
let(:statistics) { project.statistics }
before do before do
statistics.update(shared_runners_seconds: 100) stub_feature_flags(ci_parallel_minutes_reset: false)
end
it 'clears counters' do
subject
expect(statistics.reload.shared_runners_seconds).to be_zero expect_next_instance_of(described_class) do |instance|
end expect(instance).to receive(:try_obtain_lease).and_return(true)
end
it 'resets timer' do
subject
expect(statistics.reload.shared_runners_seconds_last_reset).to be_like_time(Time.now)
end end
context 'when there are namespaces that were not reset after the reset steps' do context 'when project statistics are defined' do
let(:namespace_ids) { [namespace.id] } let(:project) { create(:project, namespace: namespace) }
let(:statistics) { project.statistics }
before do before do
allow(Namespace).to receive(:each_batch).and_yield(Namespace.all) statistics.update(shared_runners_seconds: 100)
allow(Namespace).to receive(:transaction).and_raise(ActiveRecord::ActiveRecordError)
end end
it 'raises an exception' do it 'clears counters' do
expect { worker.perform }.to raise_error( subject
EE::Namespace::NamespaceStatisticsNotResetError,
"#{namespace_ids.count} namespace shared runner minutes were not reset and the transaction was rolled back. Namespace Ids: #{namespace_ids}")
end
end
end
context 'when namespace statistics are defined' do expect(statistics.reload.shared_runners_seconds).to be_zero
let!(:statistics) { create(:namespace_statistics, namespace: namespace, shared_runners_seconds: 100) } end
it 'clears counters' do it 'resets timer' do
subject subject
expect(statistics.reload.shared_runners_seconds).to be_zero expect(statistics.reload.shared_runners_seconds_last_reset).to be_like_time(Time.now)
end end
it 'resets timer' do context 'when there are namespaces that were not reset after the reset steps' do
subject let(:namespace_ids) { [namespace.id] }
expect(statistics.reload.shared_runners_seconds_last_reset).to be_like_time(Time.now) before do
end allow(Namespace).to receive(:each_batch).and_yield(Namespace.all)
end allow(Namespace).to receive(:transaction).and_raise(ActiveRecord::ActiveRecordError)
end
context 'when namespace has extra shared runner minutes' do it 'raises an exception' do
let!(:namespace) do expect { worker.perform }.to raise_error(
create(:namespace, shared_runners_minutes_limit: 100, extra_shared_runners_minutes_limit: 10 ) EE::Namespace::NamespaceStatisticsNotResetError,
"#{namespace_ids.count} namespace shared runner minutes were not reset and the transaction was rolled back. Namespace Ids: #{namespace_ids}")
end
end
end end
let!(:statistics) do context 'when namespace statistics are defined' do
create(:namespace_statistics, namespace: namespace, shared_runners_seconds: minutes_used * 60) let!(:statistics) { create(:namespace_statistics, namespace: namespace, shared_runners_seconds: 100) }
end
let(:minutes_used) { 0 } it 'clears counters' do
subject
context 'when consumption is below the default quota' do expect(statistics.reload.shared_runners_seconds).to be_zero
let(:minutes_used) { 50 } end
it 'does not modify the extra minutes quota' do it 'resets timer' do
subject subject
expect(namespace.reload.extra_shared_runners_minutes_limit).to eq(10) expect(statistics.reload.shared_runners_seconds_last_reset).to be_like_time(Time.now)
end end
end end
context 'when consumption is above the default quota' do context 'when namespace has extra shared runner minutes' do
context 'when all extra minutes are used' do let!(:namespace) do
let(:minutes_used) { 115 } create(:namespace, shared_runners_minutes_limit: 100, extra_shared_runners_minutes_limit: 10 )
end
let!(:statistics) do
create(:namespace_statistics, namespace: namespace, shared_runners_seconds: minutes_used * 60)
end
let(:minutes_used) { 0 }
context 'when consumption is below the default quota' do
let(:minutes_used) { 50 }
it 'sets extra minutes to 0' do it 'does not modify the extra minutes quota' do
subject subject
expect(namespace.reload.extra_shared_runners_minutes_limit).to eq(0) expect(namespace.reload.extra_shared_runners_minutes_limit).to eq(10)
end end
end end
context 'when some extra minutes are used' do context 'when consumption is above the default quota' do
let(:minutes_used) { 105 } context 'when all extra minutes are used' do
let(:minutes_used) { 115 }
it 'discounts the extra minutes used' do it 'sets extra minutes to 0' do
subject subject
expect(namespace.reload.extra_shared_runners_minutes_limit).to eq(5) expect(namespace.reload.extra_shared_runners_minutes_limit).to eq(0)
end
end end
end
end
[:last_ci_minutes_notification_at, :last_ci_minutes_usage_notification_level].each do |attr| context 'when some extra minutes are used' do
context "when #{attr} is present" do let(:minutes_used) { 105 }
before do
namespace.update_attribute(attr, Time.now) it 'discounts the extra minutes used' do
subject
expect(namespace.reload.extra_shared_runners_minutes_limit).to eq(5)
end
end end
end
it 'nullifies the field' do [:last_ci_minutes_notification_at, :last_ci_minutes_usage_notification_level].each do |attr|
expect(namespace.send(attr)).to be_present context "when #{attr} is present" do
before do
namespace.update_attribute(attr, Time.now)
end
subject it 'nullifies the field' do
expect(namespace.send(attr)).to be_present
subject
expect(namespace.reload.send(attr)).not_to be_present expect(namespace.reload.send(attr)).not_to be_present
end
end end
end end
end end
end end
context 'when ci_parallel_minutes_reset feature flag is enabled' do
subject { worker.perform }
before do
stub_feature_flags(ci_parallel_minutes_reset: true)
[2, 3, 4, 5, 7, 8, 10, 14].each do |id|
create(:namespace, id: id)
end
stub_const("#{described_class}::BATCH_SIZE", 3)
end
it 'runs a worker per batch' do
expect(Ci::BatchResetMinutesWorker).to receive(:perform_async).with(2, 4)
expect(Ci::BatchResetMinutesWorker).to receive(:perform_async).with(5, 7)
expect(Ci::BatchResetMinutesWorker).to receive(:perform_async).with(8, 10)
expect(Ci::BatchResetMinutesWorker).to receive(:perform_async).with(11, 13)
expect(Ci::BatchResetMinutesWorker).to receive(:perform_async).with(14, 16)
subject
end
end
end end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment