Commit 2e31e87d authored by nmilojevic1's avatar nmilojevic1

Allow idempotent jobs to use load balancing

- Improve specs
- Add Lua script for atomic increment of wal_location
- Add support for multiple databases for deduplication
- Compare lsn diff using Posrgresql
parent adf91a36
...@@ -58,10 +58,7 @@ module ApplicationWorker ...@@ -58,10 +58,7 @@ module ApplicationWorker
Gitlab::SidekiqConfig::WorkerRouter.queue_name_from_worker_name(self) Gitlab::SidekiqConfig::WorkerRouter.queue_name_from_worker_name(self)
end end
override :validate_worker_attributes!
def validate_worker_attributes! def validate_worker_attributes!
super
# Since the delayed data_consistency will use sidekiq built in retry mechanism, it is required that this mechanism # Since the delayed data_consistency will use sidekiq built in retry mechanism, it is required that this mechanism
# is not disabled. # is not disabled.
if retry_disabled? && get_data_consistency == :delayed if retry_disabled? && get_data_consistency == :delayed
...@@ -81,6 +78,13 @@ module ApplicationWorker ...@@ -81,6 +78,13 @@ module ApplicationWorker
end end
end end
override :data_consistency
def data_consistency(data_consistency, feature_flag: nil)
super
validate_worker_attributes!
end
def perform_async(*args) def perform_async(*args)
# Worker execution for workers with data_consistency set to :delayed or :sticky # Worker execution for workers with data_consistency set to :delayed or :sticky
# will be delayed to give replication enough time to complete # will be delayed to give replication enough time to complete
......
...@@ -92,17 +92,6 @@ module WorkerAttributes ...@@ -92,17 +92,6 @@ module WorkerAttributes
set_class_attribute(:data_consistency_feature_flag, feature_flag) if feature_flag set_class_attribute(:data_consistency_feature_flag, feature_flag) if feature_flag
set_class_attribute(:data_consistency, data_consistency) set_class_attribute(:data_consistency, data_consistency)
validate_worker_attributes!
end
def validate_worker_attributes!
# Since the deduplication should always take into account the latest binary replication pointer into account,
# not the first one, the deduplication will not work with sticky or delayed.
# Follow up issue to improve this: https://gitlab.com/gitlab-org/gitlab/-/issues/325291
if idempotent? && utilizes_load_balancing_capabilities?
raise ArgumentError, "Class can't be marked as idempotent if data_consistency is not set to :always"
end
end end
# If data_consistency is not set to :always, worker will try to utilize load balancing capabilities and use the replica # If data_consistency is not set to :always, worker will try to utilize load balancing capabilities and use the replica
...@@ -147,8 +136,6 @@ module WorkerAttributes ...@@ -147,8 +136,6 @@ module WorkerAttributes
def idempotent! def idempotent!
set_class_attribute(:idempotent, true) set_class_attribute(:idempotent, true)
validate_worker_attributes!
end end
def idempotent? def idempotent?
......
---
name: preserve_latest_wal_locations_for_idempotent_jobs
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/66280
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/338350
milestone: '14.3'
type: development
group: group::memory
default_enabled: false
...@@ -192,6 +192,19 @@ module Gitlab ...@@ -192,6 +192,19 @@ module Gitlab
row['system_identifier'] row['system_identifier']
end end
def pg_wal_lsn_diff(location1, location2)
lsn1 = connection.quote(location1)
lsn2 = connection.quote(location2)
query = <<-SQL.squish
SELECT pg_wal_lsn_diff(#{lsn1}, #{lsn2})
AS result
SQL
row = connection.select_all(query).first
row['result'] if row
end
# @param [ActiveRecord::Connection] ar_connection # @param [ActiveRecord::Connection] ar_connection
# @return [String] # @return [String]
def get_write_location(ar_connection) def get_write_location(ar_connection)
......
...@@ -57,7 +57,7 @@ module Gitlab ...@@ -57,7 +57,7 @@ module Gitlab
end end
def get_wal_locations(job) def get_wal_locations(job)
job['wal_locations'] || legacy_wal_location(job) job['dedup_wal_locations'] || job['wal_locations'] || legacy_wal_location(job)
end end
# Already scheduled jobs could still contain legacy database write location. # Already scheduled jobs could still contain legacy database write location.
......
...@@ -69,6 +69,7 @@ module Gitlab ...@@ -69,6 +69,7 @@ module Gitlab
message = base_message(payload) message = base_message(payload)
payload['load_balancing_strategy'] = job['load_balancing_strategy'] if job['load_balancing_strategy'] payload['load_balancing_strategy'] = job['load_balancing_strategy'] if job['load_balancing_strategy']
payload['dedup_wal_locations'] = job['dedup_wal_locations'] if job['dedup_wal_locations'].present?
if job_exception if job_exception
payload['message'] = "#{message}: fail: #{payload['duration_s']} sec" payload['message'] = "#{message}: fail: #{payload['duration_s']} sec"
......
...@@ -17,10 +17,26 @@ module Gitlab ...@@ -17,10 +17,26 @@ module Gitlab
# #
# When new jobs can be scheduled again, the strategy calls `#delete`. # When new jobs can be scheduled again, the strategy calls `#delete`.
class DuplicateJob class DuplicateJob
include Gitlab::Utils::StrongMemoize
DUPLICATE_KEY_TTL = 6.hours DUPLICATE_KEY_TTL = 6.hours
WAL_LOCATION_TTL = 60.seconds
MAX_REDIS_RETRIES = 5
DEFAULT_STRATEGY = :until_executing DEFAULT_STRATEGY = :until_executing
STRATEGY_NONE = :none STRATEGY_NONE = :none
LUA_SET_WAL_SCRIPT = <<~EOS
local key, wal, offset, ttl = KEYS[1], ARGV[1], tonumber(ARGV[2]), ARGV[3]
local existing_offset = redis.call("LINDEX", key, -1)
if existing_offset == false then
redis.call("RPUSH", key, wal, offset)
redis.call("EXPIRE", key, ttl)
elseif offset > tonumber(existing_offset) then
redis.call("LSET", key, 0, wal)
redis.call("LSET", key, -1, offset)
end
EOS
attr_reader :existing_jid attr_reader :existing_jid
def initialize(job, queue_name) def initialize(job, queue_name)
...@@ -44,22 +60,59 @@ module Gitlab ...@@ -44,22 +60,59 @@ module Gitlab
# This method will return the jid that was set in redis # This method will return the jid that was set in redis
def check!(expiry = DUPLICATE_KEY_TTL) def check!(expiry = DUPLICATE_KEY_TTL)
read_jid = nil read_jid = nil
read_wal_locations = {}
Sidekiq.redis do |redis| Sidekiq.redis do |redis|
redis.multi do |multi| redis.multi do |multi|
redis.set(idempotency_key, jid, ex: expiry, nx: true) redis.set(idempotency_key, jid, ex: expiry, nx: true)
read_wal_locations = check_existing_wal_locations!(redis, expiry)
read_jid = redis.get(idempotency_key) read_jid = redis.get(idempotency_key)
end end
end end
job['idempotency_key'] = idempotency_key job['idempotency_key'] = idempotency_key
# We need to fetch values since the read_wal_locations and read_jid were obtained inside transaction, under redis.multi command.
self.existing_wal_locations = read_wal_locations.transform_values(&:value)
self.existing_jid = read_jid.value self.existing_jid = read_jid.value
end end
def update_latest_wal_location!
return unless job_wal_locations.present?
Sidekiq.redis do |redis|
redis.multi do
job_wal_locations.each do |connection_name, location|
redis.eval(LUA_SET_WAL_SCRIPT, keys: [wal_location_key(connection_name)], argv: [location, pg_wal_lsn_diff(connection_name).to_i, WAL_LOCATION_TTL])
end
end
end
end
def latest_wal_locations
return {} unless job_wal_locations.present?
strong_memoize(:latest_wal_locations) do
read_wal_locations = {}
Sidekiq.redis do |redis|
redis.multi do
job_wal_locations.keys.each do |connection_name|
read_wal_locations[connection_name] = redis.lindex(wal_location_key(connection_name), 0)
end
end
end
read_wal_locations.transform_values(&:value).compact
end
end
def delete! def delete!
Sidekiq.redis do |redis| Sidekiq.redis do |redis|
redis.multi do |multi|
redis.del(idempotency_key) redis.del(idempotency_key)
delete_wal_locations!(redis)
end
end end
end end
...@@ -93,6 +146,7 @@ module Gitlab ...@@ -93,6 +146,7 @@ module Gitlab
private private
attr_accessor :existing_wal_locations
attr_reader :queue_name, :job attr_reader :queue_name, :job
attr_writer :existing_jid attr_writer :existing_jid
...@@ -100,6 +154,10 @@ module Gitlab ...@@ -100,6 +154,10 @@ module Gitlab
@worker_klass ||= worker_class_name.to_s.safe_constantize @worker_klass ||= worker_class_name.to_s.safe_constantize
end end
def pg_wal_lsn_diff(connection_name)
Gitlab::Database::DATABASES[connection_name].pg_wal_lsn_diff(job_wal_locations[connection_name], existing_wal_locations[connection_name])
end
def strategy def strategy
return DEFAULT_STRATEGY unless worker_klass return DEFAULT_STRATEGY unless worker_klass
return DEFAULT_STRATEGY unless worker_klass.respond_to?(:idempotent?) return DEFAULT_STRATEGY unless worker_klass.respond_to?(:idempotent?)
...@@ -120,6 +178,20 @@ module Gitlab ...@@ -120,6 +178,20 @@ module Gitlab
job['jid'] job['jid']
end end
def job_wal_locations
return {} unless preserve_wal_location?
job['wal_locations'] || {}
end
def existing_wal_location_key(connection_name)
"#{idempotency_key}:#{connection_name}:existing_wal_location"
end
def wal_location_key(connection_name)
"#{idempotency_key}:#{connection_name}:wal_location"
end
def idempotency_key def idempotency_key
@idempotency_key ||= job['idempotency_key'] || "#{namespace}:#{idempotency_hash}" @idempotency_key ||= job['idempotency_key'] || "#{namespace}:#{idempotency_hash}"
end end
...@@ -135,6 +207,29 @@ module Gitlab ...@@ -135,6 +207,29 @@ module Gitlab
def idempotency_string def idempotency_string
"#{worker_class_name}:#{Sidekiq.dump_json(arguments)}" "#{worker_class_name}:#{Sidekiq.dump_json(arguments)}"
end end
def delete_wal_locations!(redis)
job_wal_locations.keys.each do |connection_name|
redis.del(wal_location_key(connection_name))
redis.del(existing_wal_location_key(connection_name))
end
end
def check_existing_wal_locations!(redis, expiry)
read_wal_locations = {}
job_wal_locations.each do |connection_name, location|
key = existing_wal_location_key(connection_name)
redis.set(key, location, ex: expiry, nx: true)
read_wal_locations[connection_name] = redis.get(key)
end
read_wal_locations
end
def preserve_wal_location?
Feature.enabled?(:preserve_latest_wal_locations_for_idempotent_jobs, default_enabled: :yaml)
end
end end
end end
end end
......
...@@ -14,6 +14,8 @@ module Gitlab ...@@ -14,6 +14,8 @@ module Gitlab
job['duplicate-of'] = duplicate_job.existing_jid job['duplicate-of'] = duplicate_job.existing_jid
if duplicate_job.idempotent? if duplicate_job.idempotent?
duplicate_job.update_latest_wal_location!
Gitlab::SidekiqLogging::DeduplicationLogger.instance.log( Gitlab::SidekiqLogging::DeduplicationLogger.instance.log(
job, "dropped #{strategy_name}", duplicate_job.options) job, "dropped #{strategy_name}", duplicate_job.options)
return false return false
...@@ -23,8 +25,16 @@ module Gitlab ...@@ -23,8 +25,16 @@ module Gitlab
yield yield
end end
def perform(job)
update_job_wal_location!(job)
end
private private
def update_job_wal_location!(job)
job['dedup_wal_locations'] = duplicate_job.latest_wal_locations if duplicate_job.latest_wal_locations.present?
end
def deduplicatable_job? def deduplicatable_job?
!duplicate_job.scheduled? || duplicate_job.options[:including_scheduled] !duplicate_job.scheduled? || duplicate_job.options[:including_scheduled]
end end
......
...@@ -8,9 +8,14 @@ module Gitlab ...@@ -8,9 +8,14 @@ module Gitlab
# removes the lock after the job has executed preventing a new job to be queued # removes the lock after the job has executed preventing a new job to be queued
# while a job is still executing. # while a job is still executing.
class UntilExecuted < Base class UntilExecuted < Base
extend ::Gitlab::Utils::Override
include DeduplicatesWhenScheduling include DeduplicatesWhenScheduling
def perform(_job) override :perform
def perform(job)
super
yield yield
duplicate_job.delete! duplicate_job.delete!
......
...@@ -8,9 +8,13 @@ module Gitlab ...@@ -8,9 +8,13 @@ module Gitlab
# removes the lock before the job starts allowing a new job to be queued # removes the lock before the job starts allowing a new job to be queued
# while a job is still executing. # while a job is still executing.
class UntilExecuting < Base class UntilExecuting < Base
extend ::Gitlab::Utils::Override
include DeduplicatesWhenScheduling include DeduplicatesWhenScheduling
def perform(_job) override :perform
def perform(job)
super
duplicate_job.delete! duplicate_job.delete!
yield yield
......
...@@ -97,6 +97,16 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do ...@@ -97,6 +97,16 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do
it_behaves_like 'replica is up to date', 'replica' it_behaves_like 'replica is up to date', 'replica'
end end
context 'when deduplication wal location is set' do
let(:job) { { 'job_id' => 'a180b47c-3fd6-41b8-81e9-34da61c3400e', 'dedup_wal_locations' => wal_locations } }
before do
allow(load_balancer).to receive(:select_up_to_date_host).with(wal_locations[:main]).and_return(true)
end
it_behaves_like 'replica is up to date', 'replica'
end
context 'when legacy wal location is set' do context 'when legacy wal location is set' do
let(:job) { { 'job_id' => 'a180b47c-3fd6-41b8-81e9-34da61c3400e', 'database_write_location' => '0/D525E3A8' } } let(:job) { { 'job_id' => 'a180b47c-3fd6-41b8-81e9-34da61c3400e', 'database_write_location' => '0/D525E3A8' } }
......
...@@ -7,6 +7,10 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies::UntilExecut ...@@ -7,6 +7,10 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies::UntilExecut
describe '#perform' do describe '#perform' do
let(:proc) { -> {} } let(:proc) { -> {} }
before do
allow(fake_duplicate_job).to receive(:latest_wal_locations).and_return( {} )
end
it 'deletes the lock after executing' do it 'deletes the lock after executing' do
expect(proc).to receive(:call).ordered expect(proc).to receive(:call).ordered
expect(fake_duplicate_job).to receive(:delete!).ordered expect(fake_duplicate_job).to receive(:delete!).ordered
......
...@@ -7,6 +7,10 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies::UntilExecut ...@@ -7,6 +7,10 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies::UntilExecut
describe '#perform' do describe '#perform' do
let(:proc) { -> {} } let(:proc) { -> {} }
before do
allow(fake_duplicate_job).to receive(:latest_wal_locations).and_return( {} )
end
it 'deletes the lock before executing' do it 'deletes the lock before executing' do
expect(fake_duplicate_job).to receive(:delete!).ordered expect(fake_duplicate_job).to receive(:delete!).ordered
expect(proc).to receive(:call).ordered expect(proc).to receive(:call).ordered
......
...@@ -39,6 +39,7 @@ RSpec.shared_examples 'deduplicating jobs when scheduling' do |strategy_name| ...@@ -39,6 +39,7 @@ RSpec.shared_examples 'deduplicating jobs when scheduling' do |strategy_name|
allow(fake_duplicate_job).to receive(:scheduled?).and_return(false) allow(fake_duplicate_job).to receive(:scheduled?).and_return(false)
allow(fake_duplicate_job).to receive(:check!).and_return('the jid') allow(fake_duplicate_job).to receive(:check!).and_return('the jid')
allow(fake_duplicate_job).to receive(:idempotent?).and_return(true) allow(fake_duplicate_job).to receive(:idempotent?).and_return(true)
allow(fake_duplicate_job).to receive(:update_latest_wal_location!)
allow(fake_duplicate_job).to receive(:options).and_return({}) allow(fake_duplicate_job).to receive(:options).and_return({})
job_hash = {} job_hash = {}
...@@ -63,6 +64,7 @@ RSpec.shared_examples 'deduplicating jobs when scheduling' do |strategy_name| ...@@ -63,6 +64,7 @@ RSpec.shared_examples 'deduplicating jobs when scheduling' do |strategy_name|
.with(Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob::DUPLICATE_KEY_TTL) .with(Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob::DUPLICATE_KEY_TTL)
.and_return('the jid')) .and_return('the jid'))
allow(fake_duplicate_job).to receive(:idempotent?).and_return(true) allow(fake_duplicate_job).to receive(:idempotent?).and_return(true)
allow(fake_duplicate_job).to receive(:update_latest_wal_location!)
job_hash = {} job_hash = {}
expect(fake_duplicate_job).to receive(:duplicate?).and_return(true) expect(fake_duplicate_job).to receive(:duplicate?).and_return(true)
...@@ -83,6 +85,7 @@ RSpec.shared_examples 'deduplicating jobs when scheduling' do |strategy_name| ...@@ -83,6 +85,7 @@ RSpec.shared_examples 'deduplicating jobs when scheduling' do |strategy_name|
allow(fake_duplicate_job).to( allow(fake_duplicate_job).to(
receive(:check!).with(time_diff.to_i).and_return('the jid')) receive(:check!).with(time_diff.to_i).and_return('the jid'))
allow(fake_duplicate_job).to receive(:idempotent?).and_return(true) allow(fake_duplicate_job).to receive(:idempotent?).and_return(true)
allow(fake_duplicate_job).to receive(:update_latest_wal_location!)
job_hash = {} job_hash = {}
expect(fake_duplicate_job).to receive(:duplicate?).and_return(true) expect(fake_duplicate_job).to receive(:duplicate?).and_return(true)
...@@ -105,6 +108,13 @@ RSpec.shared_examples 'deduplicating jobs when scheduling' do |strategy_name| ...@@ -105,6 +108,13 @@ RSpec.shared_examples 'deduplicating jobs when scheduling' do |strategy_name|
allow(fake_duplicate_job).to receive(:options).and_return({}) allow(fake_duplicate_job).to receive(:options).and_return({})
allow(fake_duplicate_job).to receive(:existing_jid).and_return('the jid') allow(fake_duplicate_job).to receive(:existing_jid).and_return('the jid')
allow(fake_duplicate_job).to receive(:idempotent?).and_return(true) allow(fake_duplicate_job).to receive(:idempotent?).and_return(true)
allow(fake_duplicate_job).to receive(:update_latest_wal_location!)
end
it 'updates latest wal location' do
expect(fake_duplicate_job).to receive(:update_latest_wal_location!)
strategy.schedule({ 'jid' => 'new jid' }) {}
end end
it 'drops the job' do it 'drops the job' do
...@@ -136,4 +146,46 @@ RSpec.shared_examples 'deduplicating jobs when scheduling' do |strategy_name| ...@@ -136,4 +146,46 @@ RSpec.shared_examples 'deduplicating jobs when scheduling' do |strategy_name|
end end
end end
end end
describe '#perform' do
let(:proc) { -> {} }
let(:job) { { 'jid' => 'new jid', 'wal_locations' => { 'main' => '0/1234', 'ci' => '0/1234' } } }
let(:wal_locations) do
{
main: '0/D525E3A8',
ci: 'AB/12345'
}
end
before do
allow(fake_duplicate_job).to receive(:delete!)
allow(fake_duplicate_job).to receive(:latest_wal_locations).and_return( wal_locations )
end
it 'updates job hash with dedup_wal_locations' do
strategy.perform(job) do
proc.call
end
expect(job['dedup_wal_locations']).to eq(wal_locations)
end
shared_examples 'does not update job hash' do
it 'does not update job hash with dedup_wal_locations' do
strategy.perform(job) do
proc.call
end
expect(job).not_to include('dedup_wal_locations')
end
end
context 'when latest_wal_location is empty' do
before do
allow(fake_duplicate_job).to receive(:latest_wal_locations).and_return( {} )
end
include_examples 'does not update job hash'
end
end
end end
...@@ -35,16 +35,6 @@ RSpec.describe WorkerAttributes do ...@@ -35,16 +35,6 @@ RSpec.describe WorkerAttributes do
end end
end end
context 'when job is idempotent' do
context 'when data_consistency is not :always' do
it 'raise exception' do
worker.idempotent!
expect { worker.data_consistency(:sticky) }
.to raise_error("Class can't be marked as idempotent if data_consistency is not set to :always")
end
end
context 'when feature_flag is provided' do context 'when feature_flag is provided' do
before do before do
stub_feature_flags(test_feature_flag: false) stub_feature_flags(test_feature_flag: false)
...@@ -59,24 +49,6 @@ RSpec.describe WorkerAttributes do ...@@ -59,24 +49,6 @@ RSpec.describe WorkerAttributes do
end end
end end
end end
end
describe '.idempotent!' do
it 'sets `idempotent` attribute of the worker class to true' do
worker.idempotent!
expect(worker.send(:class_attributes)[:idempotent]).to eq(true)
end
context 'when data consistency is not :always' do
it 'raise exception' do
worker.data_consistency(:sticky)
expect { worker.idempotent! }
.to raise_error("Class can't be marked as idempotent if data_consistency is not set to :always")
end
end
end
describe '.idempotent?' do describe '.idempotent?' do
subject(:idempotent?) { worker.idempotent? } subject(:idempotent?) { worker.idempotent? }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment