Commit 554fbb2a authored by Kamil Trzciński's avatar Kamil Trzciński

Merge branch 'set-real-next-run-at-for-preventing-duplciate-pipeline-creations' into 'master'

Make pipeline schedule worker resilient

Closes gitlab-com/gl-infra/production#805 and #61955

See merge request gitlab-org/gitlab-ce!28407
parents d16c11ab 6a18a411
...@@ -27,9 +27,13 @@ module Ci ...@@ -27,9 +27,13 @@ module Ci
scope :active, -> { where(active: true) } scope :active, -> { where(active: true) }
scope :inactive, -> { where(active: false) } scope :inactive, -> { where(active: false) }
scope :runnable_schedules, -> { active.where("next_run_at < ?", Time.now) }
scope :preloaded, -> { preload(:owner, :project) }
accepts_nested_attributes_for :variables, allow_destroy: true accepts_nested_attributes_for :variables, allow_destroy: true
alias_attribute :real_next_run, :next_run_at
def owned_by?(current_user) def owned_by?(current_user)
owner == current_user owner == current_user
end end
...@@ -46,8 +50,14 @@ module Ci ...@@ -46,8 +50,14 @@ module Ci
update_attribute(:active, false) update_attribute(:active, false)
end end
##
# The `next_run_at` column is set to the actual execution date of `PipelineScheduleWorker`.
# This way, a schedule like `*/1 * * * *` won't be triggered in a short interval
# when PipelineScheduleWorker runs irregularly by Sidekiq Memory Killer.
def set_next_run_at def set_next_run_at
self.next_run_at = Gitlab::Ci::CronParser.new(cron, cron_timezone).next_time_from(Time.now) self.next_run_at = Gitlab::Ci::CronParser.new(Settings.cron_jobs['pipeline_schedule_worker']['cron'],
Time.zone.name)
.next_time_from(ideal_next_run_at)
end end
def schedule_next_run! def schedule_next_run!
...@@ -56,15 +66,14 @@ module Ci ...@@ -56,15 +66,14 @@ module Ci
update_attribute(:next_run_at, nil) # update without validation update_attribute(:next_run_at, nil) # update without validation
end end
def real_next_run(
worker_cron: Settings.cron_jobs['pipeline_schedule_worker']['cron'],
worker_time_zone: Time.zone.name)
Gitlab::Ci::CronParser.new(worker_cron, worker_time_zone)
.next_time_from(next_run_at)
end
def job_variables def job_variables
variables&.map(&:to_runner_variable) || [] variables&.map(&:to_runner_variable) || []
end end
private
def ideal_next_run_at
Gitlab::Ci::CronParser.new(cron, cron_timezone).next_time_from(Time.now)
end
end end
end end
# frozen_string_literal: true
module Ci
class PipelineScheduleService < BaseService
def execute(schedule)
# Ensure `next_run_at` is set properly before creating a pipeline.
# Otherwise, multiple pipelines could be created in a short interval.
schedule.schedule_next_run!
RunPipelineScheduleWorker.perform_async(schedule.id, schedule.owner.id)
end
end
end
...@@ -3,47 +3,12 @@ ...@@ -3,47 +3,12 @@
class PipelineScheduleWorker class PipelineScheduleWorker
include ApplicationWorker include ApplicationWorker
include CronjobQueue include CronjobQueue
include ::Gitlab::ExclusiveLeaseHelpers
EXCLUSIVE_LOCK_KEY = 'pipeline_schedules:run:lock'
LOCK_TIMEOUT = 50.minutes
# rubocop: disable CodeReuse/ActiveRecord
def perform def perform
in_lock(EXCLUSIVE_LOCK_KEY, ttl: LOCK_TIMEOUT, retries: 1) do Ci::PipelineSchedule.runnable_schedules.preloaded.find_in_batches do |schedules|
Ci::PipelineSchedule.active.where("next_run_at < ?", Time.now) schedules.each do |schedule|
.preload(:owner, :project).find_each do |schedule| Ci::PipelineScheduleService.new(schedule.project, schedule.owner).execute(schedule)
schedule.schedule_next_run!
Ci::CreatePipelineService.new(schedule.project,
schedule.owner,
ref: schedule.ref)
.execute!(:schedule, ignore_skip_ci: true, save_on_errors: true, schedule: schedule)
rescue => e
error(schedule, e)
end
end
end end
# rubocop: enable CodeReuse/ActiveRecord
private
def error(schedule, error)
failed_creation_counter.increment
Rails.logger.error "Failed to create a scheduled pipeline. " \
"schedule_id: #{schedule.id} message: #{error.message}"
Gitlab::Sentry
.track_exception(error,
issue_url: 'https://gitlab.com/gitlab-org/gitlab-ce/issues/41231',
extra: { schedule_id: schedule.id })
end end
def failed_creation_counter
@failed_creation_counter ||=
Gitlab::Metrics.counter(:pipeline_schedule_creation_failed_total,
"Counter of failed attempts of pipeline schedule creation")
end end
end end
...@@ -21,6 +21,30 @@ class RunPipelineScheduleWorker ...@@ -21,6 +21,30 @@ class RunPipelineScheduleWorker
Ci::CreatePipelineService.new(schedule.project, Ci::CreatePipelineService.new(schedule.project,
user, user,
ref: schedule.ref) ref: schedule.ref)
.execute(:schedule, ignore_skip_ci: true, save_on_errors: false, schedule: schedule) .execute!(:schedule, ignore_skip_ci: true, save_on_errors: false, schedule: schedule)
rescue Ci::CreatePipelineService::CreateError
# no-op. This is a user operation error such as corrupted .gitlab-ci.yml.
rescue => e
error(schedule, e)
end
private
def error(schedule, error)
failed_creation_counter.increment
Rails.logger.error "Failed to create a scheduled pipeline. " \
"schedule_id: #{schedule.id} message: #{error.message}"
Gitlab::Sentry
.track_exception(error,
issue_url: 'https://gitlab.com/gitlab-org/gitlab-ce/issues/41231',
extra: { schedule_id: schedule.id })
end
def failed_creation_counter
@failed_creation_counter ||=
Gitlab::Metrics.counter(:pipeline_schedule_creation_failed_total,
"Counter of failed attempts of pipeline schedule creation")
end end
end end
---
title: Make pipeline schedule worker resilient
merge_request: 28407
author:
type: performance
...@@ -7,6 +7,16 @@ FactoryBot.define do ...@@ -7,6 +7,16 @@ FactoryBot.define do
description "pipeline schedule" description "pipeline schedule"
project project
trait :every_minute do
cron '*/1 * * * *'
cron_timezone Gitlab::Ci::CronParser::VALID_SYNTAX_SAMPLE_TIME_ZONE
end
trait :hourly do
cron '* */1 * * *'
cron_timezone Gitlab::Ci::CronParser::VALID_SYNTAX_SAMPLE_TIME_ZONE
end
trait :nightly do trait :nightly do
cron '0 1 * * *' cron '0 1 * * *'
cron_timezone Gitlab::Ci::CronParser::VALID_SYNTAX_SAMPLE_TIME_ZONE cron_timezone Gitlab::Ci::CronParser::VALID_SYNTAX_SAMPLE_TIME_ZONE
......
...@@ -225,7 +225,7 @@ describe 'Pipeline Schedules', :js do ...@@ -225,7 +225,7 @@ describe 'Pipeline Schedules', :js do
context 'when active is true and next_run_at is NULL' do context 'when active is true and next_run_at is NULL' do
before do before do
create(:ci_pipeline_schedule, project: project, owner: user).tap do |pipeline_schedule| create(:ci_pipeline_schedule, project: project, owner: user).tap do |pipeline_schedule|
pipeline_schedule.update_attribute(:cron, nil) # Consequently next_run_at will be nil pipeline_schedule.update_attribute(:next_run_at, nil) # Consequently next_run_at will be nil
end end
end end
......
...@@ -48,32 +48,116 @@ describe Ci::PipelineSchedule do ...@@ -48,32 +48,116 @@ describe Ci::PipelineSchedule do
end end
end end
describe '.runnable_schedules' do
subject { described_class.runnable_schedules }
let!(:pipeline_schedule) do
Timecop.freeze(1.day.ago) do
create(:ci_pipeline_schedule, :hourly)
end
end
it 'returns the runnable schedule' do
is_expected.to eq([pipeline_schedule])
end
context 'when there are no runnable schedules' do
let!(:pipeline_schedule) { }
it 'returns an empty array' do
is_expected.to be_empty
end
end
end
describe '.preloaded' do
subject { described_class.preloaded }
before do
create_list(:ci_pipeline_schedule, 3)
end
it 'preloads the associations' do
subject
query = ActiveRecord::QueryRecorder.new { subject.each(&:project) }
expect(query.count).to eq(2)
end
end
describe '#set_next_run_at' do describe '#set_next_run_at' do
let!(:pipeline_schedule) { create(:ci_pipeline_schedule, :nightly) } let(:pipeline_schedule) { create(:ci_pipeline_schedule, :nightly) }
let(:ideal_next_run_at) { pipeline_schedule.send(:ideal_next_run_at) }
context 'when creates new pipeline schedule' do
let(:expected_next_run_at) do let(:expected_next_run_at) do
Gitlab::Ci::CronParser.new(pipeline_schedule.cron, pipeline_schedule.cron_timezone) Gitlab::Ci::CronParser.new(Settings.cron_jobs['pipeline_schedule_worker']['cron'], Time.zone.name)
.next_time_from(ideal_next_run_at)
end
let(:cron_worker_next_run_at) do
Gitlab::Ci::CronParser.new(Settings.cron_jobs['pipeline_schedule_worker']['cron'], Time.zone.name)
.next_time_from(Time.now) .next_time_from(Time.now)
end end
context 'when creates new pipeline schedule' do
it 'updates next_run_at automatically' do it 'updates next_run_at automatically' do
expect(described_class.last.next_run_at).to eq(expected_next_run_at) expect(pipeline_schedule.next_run_at).to eq(expected_next_run_at)
end end
end end
context 'when updates cron of exsisted pipeline schedule' do context 'when PipelineScheduleWorker runs at a specific interval' do
let(:new_cron) { '0 0 1 1 *' } before do
allow(Settings).to receive(:cron_jobs) do
{
'pipeline_schedule_worker' => {
'cron' => '0 1 2 3 *'
}
}
end
end
let(:expected_next_run_at) do it "updates next_run_at to the sidekiq worker's execution time" do
Gitlab::Ci::CronParser.new(new_cron, pipeline_schedule.cron_timezone) expect(pipeline_schedule.next_run_at.min).to eq(0)
.next_time_from(Time.now) expect(pipeline_schedule.next_run_at.hour).to eq(1)
expect(pipeline_schedule.next_run_at.day).to eq(2)
expect(pipeline_schedule.next_run_at.month).to eq(3)
end end
end
context 'when pipeline schedule runs every minute' do
let(:pipeline_schedule) { create(:ci_pipeline_schedule, :every_minute) }
it "updates next_run_at to the sidekiq worker's execution time" do
expect(pipeline_schedule.next_run_at).to eq(cron_worker_next_run_at)
end
end
context 'when there are two different pipeline schedules in different time zones' do
let(:pipeline_schedule_1) { create(:ci_pipeline_schedule, :weekly, cron_timezone: 'Eastern Time (US & Canada)') }
let(:pipeline_schedule_2) { create(:ci_pipeline_schedule, :weekly, cron_timezone: 'UTC') }
it 'sets different next_run_at' do
expect(pipeline_schedule_1.next_run_at).not_to eq(pipeline_schedule_2.next_run_at)
end
end
context 'when there are two different pipeline schedules in the same time zones' do
let(:pipeline_schedule_1) { create(:ci_pipeline_schedule, :weekly, cron_timezone: 'UTC') }
let(:pipeline_schedule_2) { create(:ci_pipeline_schedule, :weekly, cron_timezone: 'UTC') }
it 'sets the sames next_run_at' do
expect(pipeline_schedule_1.next_run_at).to eq(pipeline_schedule_2.next_run_at)
end
end
context 'when updates cron of exsisted pipeline schedule' do
let(:new_cron) { '0 0 1 1 *' }
it 'updates next_run_at automatically' do it 'updates next_run_at automatically' do
pipeline_schedule.update!(cron: new_cron) pipeline_schedule.update!(cron: new_cron)
expect(described_class.last.next_run_at).to eq(expected_next_run_at) expect(pipeline_schedule.next_run_at).to eq(expected_next_run_at)
end end
end end
end end
...@@ -83,10 +167,11 @@ describe Ci::PipelineSchedule do ...@@ -83,10 +167,11 @@ describe Ci::PipelineSchedule do
context 'when reschedules after 10 days from now' do context 'when reschedules after 10 days from now' do
let(:future_time) { 10.days.from_now } let(:future_time) { 10.days.from_now }
let(:ideal_next_run_at) { pipeline_schedule.send(:ideal_next_run_at) }
let(:expected_next_run_at) do let(:expected_next_run_at) do
Gitlab::Ci::CronParser.new(pipeline_schedule.cron, pipeline_schedule.cron_timezone) Gitlab::Ci::CronParser.new(Settings.cron_jobs['pipeline_schedule_worker']['cron'], Time.zone.name)
.next_time_from(future_time) .next_time_from(ideal_next_run_at)
end end
it 'points to proper next_run_at' do it 'points to proper next_run_at' do
...@@ -99,38 +184,6 @@ describe Ci::PipelineSchedule do ...@@ -99,38 +184,6 @@ describe Ci::PipelineSchedule do
end end
end end
describe '#real_next_run' do
subject do
described_class.last.real_next_run(worker_cron: worker_cron,
worker_time_zone: worker_time_zone)
end
context 'when GitLab time_zone is UTC' do
before do
allow(Time).to receive(:zone)
.and_return(ActiveSupport::TimeZone[worker_time_zone])
end
let(:worker_time_zone) { 'UTC' }
context 'when cron_timezone is Eastern Time (US & Canada)' do
before do
create(:ci_pipeline_schedule, :nightly,
cron_timezone: 'Eastern Time (US & Canada)')
end
let(:worker_cron) { '0 1 2 3 *' }
it 'returns the next time worker executes' do
expect(subject.min).to eq(0)
expect(subject.hour).to eq(1)
expect(subject.day).to eq(2)
expect(subject.month).to eq(3)
end
end
end
end
describe '#job_variables' do describe '#job_variables' do
let!(:pipeline_schedule) { create(:ci_pipeline_schedule) } let!(:pipeline_schedule) { create(:ci_pipeline_schedule) }
......
# frozen_string_literal: true
require 'spec_helper'
describe Ci::PipelineScheduleService do
let(:project) { create(:project) }
let(:user) { create(:user) }
let(:service) { described_class.new(project, user) }
describe '#execute' do
subject { service.execute(schedule) }
let(:schedule) { create(:ci_pipeline_schedule, project: project, owner: user) }
it 'schedules next run' do
expect(schedule).to receive(:schedule_next_run!)
subject
end
it 'runs RunPipelineScheduleWorker' do
expect(RunPipelineScheduleWorker)
.to receive(:perform_async).with(schedule.id, schedule.owner.id)
subject
end
end
end
...@@ -41,16 +41,6 @@ describe PipelineScheduleWorker do ...@@ -41,16 +41,6 @@ describe PipelineScheduleWorker do
it_behaves_like 'successful scheduling' it_behaves_like 'successful scheduling'
context 'when exclusive lease has already been taken by the other instance' do
before do
stub_exclusive_lease_taken(described_class::EXCLUSIVE_LOCK_KEY, timeout: described_class::LOCK_TIMEOUT)
end
it 'raises an error and does not start creating pipelines' do
expect { subject }.to raise_error(Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError)
end
end
context 'when the latest commit contains [ci skip]' do context 'when the latest commit contains [ci skip]' do
before do before do
allow_any_instance_of(Ci::Pipeline) allow_any_instance_of(Ci::Pipeline)
...@@ -77,47 +67,19 @@ describe PipelineScheduleWorker do ...@@ -77,47 +67,19 @@ describe PipelineScheduleWorker do
stub_ci_pipeline_yaml_file(YAML.dump(rspec: { variables: 'rspec' } )) stub_ci_pipeline_yaml_file(YAML.dump(rspec: { variables: 'rspec' } ))
end end
it 'creates a failed pipeline with the reason' do it 'does not creates a new pipeline' do
expect { subject }.to change { project.ci_pipelines.count }.by(1) expect { subject }.not_to change { project.ci_pipelines.count }
expect(Ci::Pipeline.last).to be_config_error
expect(Ci::Pipeline.last.yaml_errors).not_to be_nil
end end
end end
end end
context 'when the schedule is not runnable by the user' do context 'when the schedule is not runnable by the user' do
before do
expect(Gitlab::Sentry)
.to receive(:track_exception)
.with(Ci::CreatePipelineService::CreateError,
issue_url: 'https://gitlab.com/gitlab-org/gitlab-ce/issues/41231',
extra: { schedule_id: pipeline_schedule.id } ).once
end
it 'does not deactivate the schedule' do it 'does not deactivate the schedule' do
subject subject
expect(pipeline_schedule.reload.active).to be_truthy expect(pipeline_schedule.reload.active).to be_truthy
end end
it 'increments Prometheus counter' do
expect(Gitlab::Metrics)
.to receive(:counter)
.with(:pipeline_schedule_creation_failed_total, "Counter of failed attempts of pipeline schedule creation")
.and_call_original
subject
end
it 'logging a pipeline error' do
expect(Rails.logger)
.to receive(:error)
.with(a_string_matching("Insufficient permissions to create a new pipeline"))
.and_call_original
subject
end
it 'does not create a pipeline' do it 'does not create a pipeline' do
expect { subject }.not_to change { project.ci_pipelines.count } expect { subject }.not_to change { project.ci_pipelines.count }
end end
...@@ -131,21 +93,6 @@ describe PipelineScheduleWorker do ...@@ -131,21 +93,6 @@ describe PipelineScheduleWorker do
before do before do
stub_ci_pipeline_yaml_file(nil) stub_ci_pipeline_yaml_file(nil)
project.add_maintainer(user) project.add_maintainer(user)
expect(Gitlab::Sentry)
.to receive(:track_exception)
.with(Ci::CreatePipelineService::CreateError,
issue_url: 'https://gitlab.com/gitlab-org/gitlab-ce/issues/41231',
extra: { schedule_id: pipeline_schedule.id } ).once
end
it 'logging a pipeline error' do
expect(Rails.logger)
.to receive(:error)
.with(a_string_matching("Missing .gitlab-ci.yml file"))
.and_call_original
subject
end end
it 'does not create a pipeline' do it 'does not create a pipeline' do
......
...@@ -32,7 +32,37 @@ describe RunPipelineScheduleWorker do ...@@ -32,7 +32,37 @@ describe RunPipelineScheduleWorker do
it 'calls the Service' do it 'calls the Service' do
expect(Ci::CreatePipelineService).to receive(:new).with(project, user, ref: pipeline_schedule.ref).and_return(create_pipeline_service) expect(Ci::CreatePipelineService).to receive(:new).with(project, user, ref: pipeline_schedule.ref).and_return(create_pipeline_service)
expect(create_pipeline_service).to receive(:execute).with(:schedule, ignore_skip_ci: true, save_on_errors: false, schedule: pipeline_schedule) expect(create_pipeline_service).to receive(:execute!).with(:schedule, ignore_skip_ci: true, save_on_errors: false, schedule: pipeline_schedule)
worker.perform(pipeline_schedule.id, user.id)
end
end
context 'when database statement timeout happens' do
before do
allow(Ci::CreatePipelineService).to receive(:new) { raise ActiveRecord::StatementInvalid }
expect(Gitlab::Sentry)
.to receive(:track_exception)
.with(ActiveRecord::StatementInvalid,
issue_url: 'https://gitlab.com/gitlab-org/gitlab-ce/issues/41231',
extra: { schedule_id: pipeline_schedule.id } ).once
end
it 'increments Prometheus counter' do
expect(Gitlab::Metrics)
.to receive(:counter)
.with(:pipeline_schedule_creation_failed_total, "Counter of failed attempts of pipeline schedule creation")
.and_call_original
worker.perform(pipeline_schedule.id, user.id)
end
it 'logging a pipeline error' do
expect(Rails.logger)
.to receive(:error)
.with(a_string_matching('ActiveRecord::StatementInvalid'))
.and_call_original
worker.perform(pipeline_schedule.id, user.id) worker.perform(pipeline_schedule.id, user.id)
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment