Commit a6a7ee75 authored by Bob Van Landuyt's avatar Bob Van Landuyt

Drop duplicate jobs from Sidekiq when enqueuing

This extends the `UntilExecuting` deduplication strategy to cancel
scheduling jobs when they are already in the queue.

We only deduplicate when the `drop_duplicate_sidekiq_jobs` feature
flag is enabled for now.

When we drop a job, we log that to the Gitlab::SidekiqLogger.
parent 2665e40c
# frozen_string_literal: true
module Gitlab
module SidekiqLogging
class DeduplicationLogger
include Singleton
include LogsJobs
def log(job, deduplication_type)
payload = parse_job(job)
payload['job_status'] = 'deduplicated'
payload['message'] = "#{base_message(payload)}: deduplicated: #{deduplication_type}"
payload['deduplication_type'] = deduplication_type
Sidekiq.logger.info payload
end
end
end
end
# frozen_string_literal: true
module Gitlab
module SidekiqLogging
module LogsJobs
def base_message(payload)
"#{payload['class']} JID-#{payload['jid']}"
end
def parse_job(job)
# Error information from the previous try is in the payload for
# displaying in the Sidekiq UI, but is very confusing in logs!
job = job.except('error_backtrace', 'error_class', 'error_message')
# Add process id params
job['pid'] = ::Process.pid
job.delete('args') unless ENV['SIDEKIQ_LOG_ARGUMENTS']
job['args'] = Gitlab::Utils::LogLimitedArray.log_limited_array(job['args'].map(&:to_s)) if job['args']
job
end
end
end
end
...@@ -6,6 +6,8 @@ require 'active_record/log_subscriber' ...@@ -6,6 +6,8 @@ require 'active_record/log_subscriber'
module Gitlab module Gitlab
module SidekiqLogging module SidekiqLogging
class StructuredLogger class StructuredLogger
include LogsJobs
def call(job, queue) def call(job, queue)
started_time = get_time started_time = get_time
base_payload = parse_job(job) base_payload = parse_job(job)
...@@ -24,10 +26,6 @@ module Gitlab ...@@ -24,10 +26,6 @@ module Gitlab
private private
def base_message(payload)
"#{payload['class']} JID-#{payload['jid']}"
end
def add_instrumentation_keys!(job, output_payload) def add_instrumentation_keys!(job, output_payload)
output_payload.merge!(job.slice(*::Gitlab::InstrumentationHelper::KEYS)) output_payload.merge!(job.slice(*::Gitlab::InstrumentationHelper::KEYS))
end end
...@@ -76,20 +74,6 @@ module Gitlab ...@@ -76,20 +74,6 @@ module Gitlab
payload['completed_at'] = Time.now.utc.to_f payload['completed_at'] = Time.now.utc.to_f
end end
def parse_job(job)
# Error information from the previous try is in the payload for
# displaying in the Sidekiq UI, but is very confusing in logs!
job = job.except('error_backtrace', 'error_class', 'error_message')
# Add process id params
job['pid'] = ::Process.pid
job.delete('args') unless ENV['SIDEKIQ_LOG_ARGUMENTS']
job['args'] = Gitlab::Utils::LogLimitedArray.log_limited_array(job['args'].map(&:to_s)) if job['args']
job
end
def elapsed(t0) def elapsed(t0)
t1 = get_time t1 = get_time
{ {
......
# frozen_string_literal: true
require 'digest'
module Gitlab
module SidekiqMiddleware
module DuplicateJobs
def self.drop_duplicates?
Feature.enabled?(:drop_duplicate_sidekiq_jobs)
end
end
end
end
...@@ -66,6 +66,10 @@ module Gitlab ...@@ -66,6 +66,10 @@ module Gitlab
jid != existing_jid jid != existing_jid
end end
def droppable?
idempotent? && duplicate? && DuplicateJobs.drop_duplicates?
end
private private
attr_reader :queue_name, :strategy, :job attr_reader :queue_name, :strategy, :job
...@@ -98,6 +102,14 @@ module Gitlab ...@@ -98,6 +102,14 @@ module Gitlab
def idempotency_string def idempotency_string
"#{worker_class_name}:#{arguments.join('-')}" "#{worker_class_name}:#{arguments.join('-')}"
end end
def idempotent?
worker_class = worker_class_name.to_s.safe_constantize
return false unless worker_class
return false unless worker_class.respond_to?(:idempotent?)
worker_class.idempotent?
end
end end
end end
end end
......
...@@ -17,6 +17,11 @@ module Gitlab ...@@ -17,6 +17,11 @@ module Gitlab
job['duplicate-of'] = duplicate_job.existing_jid job['duplicate-of'] = duplicate_job.existing_jid
end end
if duplicate_job.droppable?
Gitlab::SidekiqLogging::DeduplicationLogger.instance.log(job, "dropped until executing")
return false
end
yield yield
end end
......
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::SidekiqLogging::DeduplicationLogger do
describe '#log_deduplication' do
let(:job) do
{
'class' => 'TestWorker',
'args' => [1234, 'hello', { 'key' => 'value' }],
'jid' => 'da883554ee4fe414012f5f42',
'correlation_id' => 'cid',
'duplicate-of' => 'other_jid'
}
end
it 'logs a deduplication message to the sidekiq logger' do
expected_payload = {
'job_status' => 'deduplicated',
'message' => "#{job['class']} JID-#{job['jid']}: deduplicated: a fancy strategy",
'deduplication_type' => 'a fancy strategy'
}
expect(Sidekiq.logger).to receive(:info).with(a_hash_including(expected_payload)).and_call_original
described_class.instance.log(job, "a fancy strategy")
end
it "does not modify the job" do
expect { described_class.instance.log(job, "a fancy strategy") }
.not_to change { job }
end
end
end
...@@ -3,6 +3,8 @@ ...@@ -3,6 +3,8 @@
require 'spec_helper' require 'spec_helper'
describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gitlab_redis_queues do describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gitlab_redis_queues do
using RSpec::Parameterized::TableSyntax
subject(:duplicate_job) do subject(:duplicate_job) do
described_class.new(job, queue) described_class.new(job, queue)
end end
...@@ -110,6 +112,36 @@ describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gitlab_r ...@@ -110,6 +112,36 @@ describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gitlab_r
end end
end end
describe 'droppable?' do
where(:idempotent, :duplicate, :feature_enabled) do
# [true, false].repeated_permutation(3)
[[true, true, true],
[true, true, false],
[true, false, true],
[true, false, false],
[false, true, true],
[false, true, false],
[false, false, true],
[false, false, false]]
end
with_them do
before do
allow(AuthorizedProjectsWorker).to receive(:idempotent?).and_return(idempotent)
allow(duplicate_job).to receive(:duplicate?).and_return(duplicate)
stub_feature_flags(drop_duplicate_sidekiq_jobs: feature_enabled)
end
it 'is droppable when all conditions are met' do
if idempotent && duplicate && feature_enabled
expect(duplicate_job).to be_droppable
else
expect(duplicate_job).not_to be_droppable
end
end
end
end
def set_idempotency_key(key, value = '1') def set_idempotency_key(key, value = '1')
Sidekiq.redis { |r| r.set(key, value) } Sidekiq.redis { |r| r.set(key, value) }
end end
......
...@@ -10,14 +10,21 @@ describe Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies::UntilExecuting do ...@@ -10,14 +10,21 @@ describe Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies::UntilExecuting do
subject(:strategy) { described_class.new(fake_duplicate_job) } subject(:strategy) { described_class.new(fake_duplicate_job) }
describe '#schedule' do describe '#schedule' do
before do
allow(Gitlab::SidekiqLogging::DeduplicationLogger.instance).to receive(:log)
end
it 'checks for duplicates before yielding' do it 'checks for duplicates before yielding' do
expect(fake_duplicate_job).to receive(:check!).ordered.and_return('a jid') expect(fake_duplicate_job).to receive(:check!).ordered.and_return('a jid')
expect(fake_duplicate_job).to receive(:duplicate?).ordered.and_return(false) expect(fake_duplicate_job).to receive(:duplicate?).ordered.and_return(false)
expect(fake_duplicate_job).to receive(:droppable?).ordered.and_return(false)
expect { |b| strategy.schedule({}, &b) }.to yield_control expect { |b| strategy.schedule({}, &b) }.to yield_control
end end
it 'adds the jid of the existing job to the job hash' do it 'adds the jid of the existing job to the job hash' do
allow(fake_duplicate_job).to receive(:check!).and_return('the jid') allow(fake_duplicate_job).to receive(:check!).and_return('the jid')
allow(fake_duplicate_job).to receive(:droppable?).and_return(true)
job_hash = {} job_hash = {}
expect(fake_duplicate_job).to receive(:duplicate?).and_return(true) expect(fake_duplicate_job).to receive(:duplicate?).and_return(true)
...@@ -27,6 +34,33 @@ describe Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies::UntilExecuting do ...@@ -27,6 +34,33 @@ describe Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies::UntilExecuting do
expect(job_hash).to include('duplicate-of' => 'the jid') expect(job_hash).to include('duplicate-of' => 'the jid')
end end
context "when the job is droppable" do
before do
allow(fake_duplicate_job).to receive(:check!).and_return('the jid')
allow(fake_duplicate_job).to receive(:duplicate?).and_return(true)
allow(fake_duplicate_job).to receive(:existing_jid).and_return('the jid')
allow(fake_duplicate_job).to receive(:droppable?).and_return(true)
end
it 'drops the job' do
schedule_result = nil
expect(fake_duplicate_job).to receive(:droppable?).and_return(true)
expect { |b| schedule_result = strategy.schedule({}, &b) }.not_to yield_control
expect(schedule_result).to be(false)
end
it 'logs that the job wass dropped' do
fake_logger = instance_double(Gitlab::SidekiqLogging::DeduplicationLogger)
expect(Gitlab::SidekiqLogging::DeduplicationLogger).to receive(:instance).and_return(fake_logger)
expect(fake_logger).to receive(:log).with(a_hash_including({ 'jid' => 'new jid' }), 'dropped until executing')
strategy.schedule({ 'jid' => 'new jid' }) {}
end
end
end end
describe '#perform' do describe '#perform' do
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment