Commit acd89bf9 authored by Bob Van Landuyt's avatar Bob Van Landuyt Committed by Kamil Trzciński

Mark duplicate jobs in Sidekiq

This marks a job as duplicate when there was already a job in the same
queue with the same arguments.

We do this by storing a key in Redis based on the argumens, worker and
queue when a job gets scheduled. If another job gets scheduled and the
key already exists, we mark the job as duplicate.

When a job starts, we delete its key from Redis.

Later we can use this for dropping jobs from redis if they are
idempotent.
parent c394109a
......@@ -20,6 +20,7 @@ module Gitlab
chain.add Gitlab::SidekiqMiddleware::AdminMode::Server
chain.add Gitlab::SidekiqStatus::ServerMiddleware
chain.add Gitlab::SidekiqMiddleware::WorkerContext::Server
chain.add Gitlab::SidekiqMiddleware::DuplicateJobs::Server
end
end
......@@ -33,6 +34,7 @@ module Gitlab
chain.add Gitlab::SidekiqMiddleware::WorkerContext::Client # needs to be before the Labkit middleware
chain.add Labkit::Middleware::Sidekiq::Client
chain.add Gitlab::SidekiqMiddleware::AdminMode::Client
chain.add Gitlab::SidekiqMiddleware::DuplicateJobs::Client
end
end
end
......
# frozen_string_literal: true
module Gitlab
module SidekiqMiddleware
module DuplicateJobs
class Client
def call(worker_class, job, queue, _redis_pool, &block)
DuplicateJob.new(job, queue).schedule(&block)
end
end
end
end
end
# frozen_string_literal: true
require 'digest'
module Gitlab
module SidekiqMiddleware
module DuplicateJobs
# This class defines an identifier of a job in a queue
# The identifier based on a job's class and arguments.
#
# As strategy decides when to keep track of the job in redis and when to
# remove it.
#
# Storing the deduplication key in redis can be done by calling `check!`
# check returns the `jid` of the job if it was scheduled, or the `jid` of
# the duplicate job if it was already scheduled
#
# When new jobs can be scheduled again, the strategy calls `#delete`.
class DuplicateJob
DUPLICATE_KEY_TTL = 6.hours
attr_reader :existing_jid
def initialize(job, queue_name, strategy: :until_executing)
@job = job
@queue_name = queue_name
@strategy = strategy
end
# This will continue the middleware chain if the job should be scheduled
# It will return false if the job needs to be cancelled
def schedule(&block)
Strategies.for(strategy).new(self).schedule(job, &block)
end
# This will continue the server middleware chain if the job should be
# executed.
# It will return false if the job should not be executed.
def perform(&block)
Strategies.for(strategy).new(self).perform(job, &block)
end
# This method will return the jid that was set in redis
def check!
read_jid = nil
Sidekiq.redis do |redis|
redis.multi do |multi|
redis.set(idempotency_key, jid, ex: DUPLICATE_KEY_TTL, nx: true)
read_jid = redis.get(idempotency_key)
end
end
self.existing_jid = read_jid.value
end
def delete!
Sidekiq.redis do |redis|
redis.del(idempotency_key)
end
end
def duplicate?
raise "Call `#check!` first to check for existing duplicates" unless existing_jid
jid != existing_jid
end
private
attr_reader :queue_name, :strategy, :job
attr_writer :existing_jid
def worker_class_name
job['class']
end
def arguments
job['args']
end
def jid
job['jid']
end
def idempotency_key
@idempotency_key ||= "#{namespace}:#{idempotency_hash}"
end
def idempotency_hash
Digest::SHA256.hexdigest(idempotency_string)
end
def namespace
"#{Gitlab::Redis::Queues::SIDEKIQ_NAMESPACE}:duplicate:#{queue_name}"
end
def idempotency_string
"#{worker_class_name}:#{arguments.join('-')}"
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module SidekiqMiddleware
module DuplicateJobs
class Server
def call(worker, job, queue, &block)
DuplicateJob.new(job, queue).perform(&block)
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module SidekiqMiddleware
module DuplicateJobs
module Strategies
UnknownStrategyError = Class.new(StandardError)
STRATEGIES = {
until_executing: UntilExecuting
}.freeze
def self.for(name)
STRATEGIES.fetch(name)
rescue KeyError
raise UnknownStrategyError, "Unknown deduplication strategy #{name}"
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module SidekiqMiddleware
module DuplicateJobs
module Strategies
# This strategy takes a lock before scheduling the job in a queue and
# removes the lock before the job starts allowing a new job to be queued
# while a job is still executing.
class UntilExecuting
def initialize(duplicate_job)
@duplicate_job = duplicate_job
end
def schedule(job)
if duplicate_job.check! && duplicate_job.duplicate?
job['duplicate-of'] = duplicate_job.existing_jid
end
yield
end
def perform(_job)
duplicate_job.delete!
yield
end
private
attr_reader :duplicate_job
end
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::SidekiqMiddleware::DuplicateJobs::Client, :clean_gitlab_redis_queues do
let(:worker_class) do
Class.new do
def self.name
'TestDeduplicationWorker'
end
include ApplicationWorker
def perform(*args)
end
end
end
before do
stub_const('TestDeduplicationWorker', worker_class)
end
describe '#call' do
it 'adds a correct duplicate tag to the jobs', :aggregate_failures do
TestDeduplicationWorker.bulk_perform_async([['args1'], ['args2'], ['args1']])
job1, job2, job3 = TestDeduplicationWorker.jobs
expect(job1['duplicate-of']).to be_nil
expect(job2['duplicate-of']).to be_nil
expect(job3['duplicate-of']).to eq(job1['jid'])
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gitlab_redis_queues do
subject(:duplicate_job) do
described_class.new(job, queue)
end
let(:job) { { 'class' => 'AuthorizedProjectsWorker', 'args' => [1], 'jid' => '123' } }
let(:queue) { 'authorized_projects' }
let(:idempotency_key) do
hash = Digest::SHA256.hexdigest("#{job['class']}:#{job['args'].join('-')}")
"#{Gitlab::Redis::Queues::SIDEKIQ_NAMESPACE}:duplicate:#{queue}:#{hash}"
end
describe '#schedule' do
it 'calls schedule on the strategy' do
expect do |block|
expect_next_instance_of(Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies::UntilExecuting) do |strategy|
expect(strategy).to receive(:schedule).with(job, &block)
end
duplicate_job.schedule(&block)
end.to yield_control
end
end
describe '#perform' do
it 'calls perform on the strategy' do
expect do |block|
expect_next_instance_of(Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies::UntilExecuting) do |strategy|
expect(strategy).to receive(:perform).with(job, &block)
end
duplicate_job.perform(&block)
end.to yield_control
end
end
describe '#check!' do
context 'when there was no job in the queue yet' do
it { expect(duplicate_job.check!).to eq('123') }
it "adds a key with ttl set to #{described_class::DUPLICATE_KEY_TTL}" do
expect { duplicate_job.check! }
.to change { read_idempotency_key_with_ttl(idempotency_key) }
.from([nil, -2])
.to(['123', be_within(1).of(described_class::DUPLICATE_KEY_TTL)])
end
end
context 'when there was already a job with same arguments in the same queue' do
before do
set_idempotency_key(idempotency_key, 'existing-key')
end
it { expect(duplicate_job.check!).to eq('existing-key') }
it "does not change the existing key's TTL" do
expect { duplicate_job.check! }
.not_to change { read_idempotency_key_with_ttl(idempotency_key) }
.from(['existing-key', -1])
end
it 'sets the existing jid' do
duplicate_job.check!
expect(duplicate_job.existing_jid).to eq('existing-key')
end
end
end
describe '#delete!' do
context "when we didn't track the definition" do
it { expect { duplicate_job.delete! }.not_to raise_error }
end
context 'when the key exists in redis' do
before do
set_idempotency_key(idempotency_key, 'existing-key')
end
it 'removes the key from redis' do
expect { duplicate_job.delete! }
.to change { read_idempotency_key_with_ttl(idempotency_key) }
.from(['existing-key', -1])
.to([nil, -2])
end
end
end
describe '#duplicate?' do
it "raises an error if the check wasn't performed" do
expect { duplicate_job.duplicate? }.to raise_error /Call `#check!` first/
end
it 'returns false if the existing jid equals the job jid' do
duplicate_job.check!
expect(duplicate_job.duplicate?).to be(false)
end
it 'returns false if the existing jid is different from the job jid' do
set_idempotency_key(idempotency_key, 'a different jid')
duplicate_job.check!
expect(duplicate_job.duplicate?).to be(true)
end
end
def set_idempotency_key(key, value = '1')
Sidekiq.redis { |r| r.set(key, value) }
end
def read_idempotency_key_with_ttl(key)
Sidekiq.redis do |redis|
redis.pipelined do |p|
p.get(key)
p.ttl(key)
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::SidekiqMiddleware::DuplicateJobs::Server, :clean_gitlab_redis_queues do
let(:worker_class) do
Class.new do
def self.name
'TestDeduplicationWorker'
end
include ApplicationWorker
def perform(*args)
end
end
end
before do
stub_const('TestDeduplicationWorker', worker_class)
end
around do |example|
Sidekiq::Testing.inline! { example.run }
end
before(:context) do
Sidekiq::Testing.server_middleware do |chain|
chain.add described_class
end
end
after(:context) do
Sidekiq::Testing.server_middleware do |chain|
chain.remove described_class
end
end
describe '#call' do
it 'removes the stored job from redis' do
bare_job = { 'class' => 'TestDeduplicationWorker', 'args' => ['hello'] }
job_definition = Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob.new(bare_job.dup, 'test_deduplication')
expect(Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob)
.to receive(:new).with(a_hash_including(bare_job), 'test_deduplication')
.and_return(job_definition).twice # once in client middleware
expect(job_definition).to receive(:delete!).and_call_original
TestDeduplicationWorker.perform_async('hello')
end
end
end
# frozen_string_literal: true
require 'fast_spec_helper'
describe Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies::UntilExecuting do
let(:fake_duplicate_job) do
instance_double(Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob)
end
subject(:strategy) { described_class.new(fake_duplicate_job) }
describe '#schedule' do
it 'checks for duplicates before yielding' do
expect(fake_duplicate_job).to receive(:check!).ordered.and_return('a jid')
expect(fake_duplicate_job).to receive(:duplicate?).ordered.and_return(false)
expect { |b| strategy.schedule({}, &b) }.to yield_control
end
it 'adds the jid of the existing job to the job hash' do
allow(fake_duplicate_job).to receive(:check!).and_return('the jid')
job_hash = {}
expect(fake_duplicate_job).to receive(:duplicate?).and_return(true)
expect(fake_duplicate_job).to receive(:existing_jid).and_return('the jid')
strategy.schedule(job_hash) {}
expect(job_hash).to include('duplicate-of' => 'the jid')
end
end
describe '#perform' do
it 'deletes the lock before executing' do
expect(fake_duplicate_job).to receive(:delete!).ordered
expect { |b| strategy.perform({}, &b) }.to yield_control
end
end
end
# frozen_string_literal: true
require 'fast_spec_helper'
describe Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies do
describe '.for' do
it 'returns the right class for `until_executing`' do
expect(described_class.for(:until_executing)).to eq(described_class::UntilExecuting)
end
it 'raises an UnknownStrategyError when passing an unknown key' do
expect { described_class.for(:unknown) }.to raise_error(described_class::UnknownStrategyError)
end
end
end
......@@ -46,7 +46,8 @@ describe Gitlab::SidekiqMiddleware do
Gitlab::SidekiqMiddleware::MemoryKiller,
Gitlab::SidekiqMiddleware::RequestStoreMiddleware,
Gitlab::SidekiqMiddleware::WorkerContext::Server,
Gitlab::SidekiqMiddleware::AdminMode::Server
Gitlab::SidekiqMiddleware::AdminMode::Server,
Gitlab::SidekiqMiddleware::DuplicateJobs::Server
]
end
let(:enabled_sidekiq_middlewares) { all_sidekiq_middlewares - disabled_sidekiq_middlewares }
......@@ -117,7 +118,8 @@ describe Gitlab::SidekiqMiddleware do
Gitlab::SidekiqMiddleware::ClientMetrics,
Gitlab::SidekiqMiddleware::WorkerContext::Client,
Labkit::Middleware::Sidekiq::Client,
Gitlab::SidekiqMiddleware::AdminMode::Client
Gitlab::SidekiqMiddleware::AdminMode::Client,
Gitlab::SidekiqMiddleware::DuplicateJobs::Client
]
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment