Commit 8aa9f254 authored by Tiger's avatar Tiger

Remove expired agent activity events as new events are created

Whenever an activity event is created for an agent, a worker is
scheduled to remove all but the most recent 200 events.

If the agent does not yet have 200 events, the worker still runs,
however it will not delete any records.

Events may be created frequently, so to prevent many workers running
unnecessarily they are scheduled with a delay of up to an hour, and
then de-duplicated until executed.

Changelog: added
parent dfdb363a
......@@ -5,6 +5,7 @@ module Clusters
self.table_name = 'cluster_agents'
INACTIVE_AFTER = 1.hour.freeze
ACTIVITY_EVENT_LIMIT = 200
belongs_to :created_by_user, class_name: 'User', optional: true
belongs_to :project, class_name: '::Project' # Otherwise, it will load ::Clusters::Project
......@@ -39,5 +40,12 @@ module Clusters
def connected?
agent_tokens.active.where("last_used_at > ?", INACTIVE_AFTER.ago).exists?
end
def activity_event_deletion_cutoff
# Order is defined by the association
activity_events
.offset(ACTIVITY_EVENT_LIMIT - 1)
.pick(:recorded_at)
end
end
end
......@@ -56,12 +56,13 @@ module Clusters
end
def log_activity_event!(recorded_at)
agent.activity_events.create!(
Clusters::Agents::CreateActivityEventService.new( # rubocop: disable CodeReuse/ServiceClass
agent,
kind: :agent_connected,
level: :info,
recorded_at: recorded_at,
agent_token: self
)
).execute
end
end
end
......@@ -3,6 +3,7 @@
module Clusters
module Agents
class ActivityEvent < ApplicationRecord
include EachBatch
include NullifyIfBlank
self.table_name = 'agent_activity_events'
......@@ -12,6 +13,7 @@ module Clusters
belongs_to :agent_token, class_name: 'Clusters::AgentToken'
scope :in_timeline_order, -> { order(recorded_at: :desc, id: :desc) }
scope :recorded_before, -> (cutoff) { where('recorded_at < ?', cutoff) }
validates :recorded_at, :kind, :level, presence: true
......
......@@ -30,13 +30,14 @@ module Clusters
end
def log_activity_event!(token)
token.agent.activity_events.create!(
Clusters::Agents::CreateActivityEventService.new(
token.agent,
kind: :token_created,
level: :info,
recorded_at: token.created_at,
user: current_user,
agent_token: token
)
).execute
end
end
end
......
# frozen_string_literal: true
module Clusters
module Agents
class CreateActivityEventService
def initialize(agent, **params)
@agent = agent
@params = params
end
def execute
agent.activity_events.create!(params)
DeleteExpiredEventsWorker.perform_at(schedule_cleanup_at, agent.id)
ServiceResponse.success
end
private
attr_reader :agent, :params
def schedule_cleanup_at
1.hour.from_now.change(min: agent.id % 60)
end
end
end
end
# frozen_string_literal: true
module Clusters
module Agents
class DeleteExpiredEventsService
def initialize(agent)
@agent = agent
end
def execute
agent.activity_events
.recorded_before(remove_events_before)
.each_batch { |batch| batch.delete_all }
end
private
attr_reader :agent
def remove_events_before
agent.activity_event_deletion_cutoff
end
end
end
end
......@@ -129,6 +129,15 @@
:weight: 2
:idempotent:
:tags: []
- :name: cluster_agent:clusters_agents_delete_expired_events
:worker_name: Clusters::Agents::DeleteExpiredEventsWorker
:feature_category: :kubernetes_management
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: container_repository:cleanup_container_repository
:worker_name: CleanupContainerRepositoryWorker
:feature_category: :container_registry
......
# frozen_string_literal: true
module Clusters
module Agents
class DeleteExpiredEventsWorker
include ApplicationWorker
include ClusterAgentQueue
deduplicate :until_executed, including_scheduled: true
idempotent!
data_consistency :always
def perform(agent_id)
if agent = Clusters::Agent.find_by_id(agent_id)
Clusters::Agents::DeleteExpiredEventsService.new(agent).execute
end
end
end
end
end
# frozen_string_literal: true
module ClusterAgentQueue
extend ActiveSupport::Concern
included do
queue_namespace :cluster_agent
feature_category :kubernetes_management
end
end
......@@ -75,6 +75,8 @@
- 1
- - ci_upstream_projects_subscriptions_cleanup
- 1
- - cluster_agent
- 1
- - container_repository
- 1
- - create_commit_signature
......
......@@ -116,4 +116,19 @@ RSpec.describe Clusters::Agent do
it { is_expected.to be_truthy }
end
end
describe '#activity_event_deletion_cutoff' do
let_it_be(:agent) { create(:cluster_agent) }
let_it_be(:event1) { create(:agent_activity_event, agent: agent, recorded_at: 1.hour.ago) }
let_it_be(:event2) { create(:agent_activity_event, agent: agent, recorded_at: 2.hours.ago) }
let_it_be(:event3) { create(:agent_activity_event, agent: agent, recorded_at: 3.hours.ago) }
subject { agent.activity_event_deletion_cutoff }
before do
stub_const("#{described_class}::ACTIVITY_EVENT_LIMIT", 2)
end
it { is_expected.to be_like_time(event2.recorded_at) }
end
end
......@@ -16,11 +16,10 @@ RSpec.describe Clusters::Agents::ActivityEvent do
let_it_be(:agent) { create(:cluster_agent) }
describe '.in_timeline_order' do
let(:recorded_at) { 1.hour.ago }
let!(:event1) { create(:agent_activity_event, agent: agent, recorded_at: recorded_at) }
let!(:event2) { create(:agent_activity_event, agent: agent, recorded_at: Time.current) }
let!(:event3) { create(:agent_activity_event, agent: agent, recorded_at: recorded_at) }
let_it_be(:recorded_at) { 1.hour.ago }
let_it_be(:event1) { create(:agent_activity_event, agent: agent, recorded_at: recorded_at) }
let_it_be(:event2) { create(:agent_activity_event, agent: agent, recorded_at: Time.current) }
let_it_be(:event3) { create(:agent_activity_event, agent: agent, recorded_at: recorded_at) }
subject { described_class.in_timeline_order }
......@@ -28,5 +27,19 @@ RSpec.describe Clusters::Agents::ActivityEvent do
is_expected.to eq([event2, event3, event1])
end
end
describe '.recorded_before' do
let_it_be(:event1) { create(:agent_activity_event, agent: agent, recorded_at: 1.hour.ago) }
let_it_be(:event2) { create(:agent_activity_event, agent: agent, recorded_at: 2.hours.ago) }
let_it_be(:event3) { create(:agent_activity_event, agent: agent, recorded_at: 3.hours.ago) }
let(:cutoff) { event2.recorded_at }
subject { described_class.recorded_before(cutoff) }
it 'returns only events recorded before the cutoff' do
is_expected.to contain_exactly(event3)
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Clusters::Agents::CreateActivityEventService do
let_it_be(:agent) { create(:cluster_agent) }
let_it_be(:token) { create(:cluster_agent_token, agent: agent) }
let_it_be(:user) { create(:user) }
describe '#execute' do
let(:params) do
{
kind: :token_created,
level: :info,
recorded_at: token.created_at,
user: user,
agent_token: token
}
end
subject { described_class.new(agent, **params).execute }
it 'creates an activity event record' do
expect { subject }.to change(agent.activity_events, :count).from(0).to(1)
event = agent.activity_events.last
expect(event).to have_attributes(
kind: 'token_created',
level: 'info',
recorded_at: token.reload.created_at,
user: user,
agent_token_id: token.id
)
end
it 'schedules the cleanup worker' do
expect(Clusters::Agents::DeleteExpiredEventsWorker).to receive(:perform_at)
.with(1.hour.from_now.change(min: agent.id % 60), agent.id)
subject
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Clusters::Agents::DeleteExpiredEventsService do
let_it_be(:agent) { create(:cluster_agent) }
describe '#execute' do
let_it_be(:event1) { create(:agent_activity_event, agent: agent, recorded_at: 1.hour.ago) }
let_it_be(:event2) { create(:agent_activity_event, agent: agent, recorded_at: 2.hours.ago) }
let_it_be(:event3) { create(:agent_activity_event, agent: agent, recorded_at: 3.hours.ago) }
let_it_be(:event4) { create(:agent_activity_event, agent: agent, recorded_at: 4.hours.ago) }
let_it_be(:event5) { create(:agent_activity_event, agent: agent, recorded_at: 5.hours.ago) }
let(:deletion_cutoff) { 1.day.ago }
subject { described_class.new(agent).execute }
before do
allow(agent).to receive(:activity_event_deletion_cutoff).and_return(deletion_cutoff)
end
it 'does not delete events if the limit has not been reached' do
expect { subject }.not_to change(agent.activity_events, :count)
end
context 'there are more events than the limit' do
let(:deletion_cutoff) { event3.recorded_at }
it 'removes events to remain at the limit, keeping the most recent' do
expect { subject }.to change(agent.activity_events, :count).from(5).to(3)
expect(agent.activity_events).to contain_exactly(event1, event2, event3)
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Clusters::Agents::DeleteExpiredEventsWorker do
let(:agent) { create(:cluster_agent) }
describe '#perform' do
let(:agent_id) { agent.id }
let(:deletion_service) { double(execute: true) }
subject { described_class.new.perform(agent_id) }
it 'calls the deletion service' do
expect(deletion_service).to receive(:execute).once
expect(Clusters::Agents::DeleteExpiredEventsService).to receive(:new)
.with(agent).and_return(deletion_service)
subject
end
context 'agent no longer exists' do
let(:agent_id) { -1 }
it 'completes without raising an error' do
expect { subject }.not_to raise_error
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe ClusterAgentQueue do
let(:worker) do
Class.new do
def self.name
'ExampleWorker'
end
include ApplicationWorker
include ClusterAgentQueue
end
end
it { expect(worker.queue).to eq('cluster_agent:example') }
it { expect(worker.get_feature_category).to eq(:kubernetes_management) }
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment