Commit 19d7626c authored by Grzegorz Bizon's avatar Grzegorz Bizon

Add a service responsible for updating build state

This commit adds support for asychronous migration of live trace chunks
to either database or an object store upon a final build update sent
from a runner. In case of async migration scheduled we respond with 202
ACCEPTED to a runner and expect it to retry within a given interval.
parent 47cea0c2
......@@ -26,6 +26,8 @@ module Ci
fog: 3
}
scope :live, -> { redis }
class << self
def all_stores
@all_stores ||= self.data_stores.keys
......@@ -129,6 +131,7 @@ module Ci
current_data = data
old_store_class = current_store
## TODO allow final chunk update if build pending state exists
unless current_data&.bytesize.to_i == CHUNK_SIZE
raise FailedToPersistDataError, 'Data is not fulfilled in a bucket'
end
......
# frozen_string_literal: true
module Ci
class UpdateBuildStateService
Result = Struct.new(:status, keyword_init: true)
ACCEPT_TIMEOUT = 5.minutes.freeze
attr_reader :build, :params, :metrics
def initialize(build, params, metrics = ::Gitlab::Ci::Trace::Metrics.new)
@build = build
@params = params
@metrics = metrics
end
def execute
overwrite_trace! if has_trace?
if accept_request?
accept_build_state!
else
update_build_state!
end
end
private
def build_state
params.dig(:state).to_s
end
def has_trace?
params.dig(:trace).present?
end
def has_checksum?
params.dig(:checksum).present?
end
def has_chunks?
build.trace_chunks.any?
end
def live_chunks_pending?
build.trace_chunks.live.any?
end
def build_running?
build_state == 'running'
end
def accept_available?
!build_running? && has_checksum? && chunks_migration_enabled?
end
def accept_request?
accept_available? && live_chunks_pending?
end
def chunks_migration_enabled?
Feature.enabled?(:ci_enable_live_trace, build.project)
end
def accept_build_state!
# TODO, persist ci_build_state if not present (find or create)
build.trace_chunks.live.find_each do |chunk|
chunk.schedule_to_persist!
end
metrics.increment_trace_operation(operation: :accepted)
Result.new(status: 202)
end
def overwrite_trace!
metrics.increment_trace_operation(operation: :overwrite)
# TODO, disable in FF
build.trace.set(params[:trace])
end
def update_build_state!
if accept_available? && has_chunks?
metrics.increment_trace_operation(operation: :finalized)
end
case build_state
when 'running'
build.touch if build.needs_touch?
Result.new(status: 200)
when 'success'
build.success!
Result.new(status: 200)
when 'failed'
build.drop!(params[:failure_reason] || :unknown_failure)
Result.new(status: 200)
else
Result.new(status: 400)
end
end
end
end
......@@ -159,29 +159,26 @@ module API
end
desc 'Updates a job' do
http_codes [[200, 'Job was updated'], [403, 'Forbidden']]
http_codes [[200, 'Job was updated'],
[202, 'Update accepted'],
[400, 'Unknown parameters'],
[403, 'Forbidden']]
end
params do
requires :token, type: String, desc: %q(Runners's authentication token)
requires :id, type: Integer, desc: %q(Job's ID)
optional :trace, type: String, desc: %q(Job's full trace)
optional :state, type: String, desc: %q(Job's status: success, failed)
optional :checksum, type: String, desc: %q(Job's trace CRC32 checksum)
optional :failure_reason, type: String, desc: %q(Job's failure_reason)
end
put '/:id' do
job = authenticate_job!
job.trace.set(params[:trace]) if params[:trace]
Gitlab::Metrics.add_event(:update_build)
case params[:state].to_s
when 'running'
job.touch if job.needs_touch?
when 'success'
job.success!
when 'failed'
job.drop!(params[:failure_reason] || :unknown_failure)
::Ci::UpdateBuildStateService.new(job, params).then do |service|
service.execute.then { |result| status result.status }
end
end
......
......@@ -6,7 +6,8 @@ module Gitlab
class Metrics
extend Gitlab::Utils::StrongMemoize
OPERATIONS = [:mutated].freeze
OPERATIONS = [:appended, :mutated, :overwrite, :accepted,
:finalized].freeze
def increment_trace_operation(operation: :unknown)
unless OPERATIONS.include?(operation)
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Ci::UpdateBuildStateService do
let(:project) { create(:project) }
let(:pipeline) { create(:ci_pipeline, project: project) }
let(:build) { create(:ci_build, :running, pipeline: pipeline) }
let(:metrics) { spy('metrics') }
subject { described_class.new(build, params, metrics) }
before do
stub_feature_flags(ci_enable_live_trace: true)
end
context 'when build does not have checksum' do
context 'when state has changed' do
let(:params) { { state: 'success' } }
it 'updates a state of a running build' do
subject.execute
expect(build).to be_success
end
it 'returns 200 OK status' do
result = subject.execute
expect(result.status).to eq 200
end
it 'does not increment finalized trace metric' do
subject.execute
expect(metrics)
.not_to have_received(:increment_trace_operation)
.with(operation: :finalized)
end
end
context 'when it is a heartbeat request' do
let(:params) { { state: 'success' } }
it 'updates a build timestamp' do
expect { subject.execute }.to change { build.updated_at }
end
end
context 'when request payload carries a trace' do
let(:params) { { state: 'success', trace: 'overwritten' } }
it 'overwrites a trace and updates trace operation metric' do
result = subject.execute
expect(build.trace.raw).to eq 'overwritten'
expect(result.status).to eq 200
expect(metrics)
.to have_received(:increment_trace_operation)
.with(operation: :overwrite)
end
end
context 'when state is unknown' do
let(:params) { { state: 'unknown' } }
it 'responds with 400 bad request' do
result = subject.execute
expect(result.status).to eq 400
expect(build).to be_running
end
end
end
context 'when build has a checksum' do
let(:params) { { checksum: 'crc32:12345678', state: 'success' } }
context 'when build trace has been migrated' do
before do
create(:ci_build_trace_chunk, :database_with_data, build: build)
end
it 'updates a build state' do
subject.execute
expect(build).to be_success
end
it 'responds with 200 OK status' do
result = subject.execute
expect(result.status).to eq 200
end
it 'increments trace finalized operation metric' do
subject.execute
expect(metrics)
.to have_received(:increment_trace_operation)
.with(operation: :finalized)
end
end
context 'when build trace has not been migrated yet' do
before do
create(:ci_build_trace_chunk, :redis_with_data, build: build)
end
it 'does not update a build state' do
subject.execute
expect(build).to be_running
end
it 'responds with 202 accepted' do
result = subject.execute
expect(result.status).to eq 202
end
it 'schedules live chunks for migration' do
expect(Ci::BuildTraceChunkFlushWorker)
.to receive(:perform_async)
.with(build.trace_chunks.first.id)
subject.execute
end
it 'increments trace accepted operation metric' do
subject.execute
expect(metrics)
.to have_received(:increment_trace_operation)
.with(operation: :accepted)
end
context 'when live traces are disabled' do
before do
stub_feature_flags(ci_enable_live_trace: false)
end
it 'responds with 200 OK' do
result = subject.execute
expect(result.status).to eq 200
end
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment