Commit 2a1f2b7a authored by Matthias Käppler's avatar Matthias Käppler Committed by Fabio Pitino

Refine load_balancing_strategy metric label

parent b1163762
...@@ -5,27 +5,29 @@ module Gitlab ...@@ -5,27 +5,29 @@ module Gitlab
module LoadBalancing module LoadBalancing
class SidekiqClientMiddleware class SidekiqClientMiddleware
def call(worker_class, job, _queue, _redis_pool) def call(worker_class, job, _queue, _redis_pool)
# Mailers can't be constantized
worker_class = worker_class.to_s.safe_constantize worker_class = worker_class.to_s.safe_constantize
mark_data_consistency_location(worker_class, job) if load_balancing_enabled?(worker_class)
job['worker_data_consistency'] = worker_class.get_data_consistency
set_data_consistency_location!(job) unless location_already_provided?(job)
else
job['worker_data_consistency'] = ::WorkerAttributes::DEFAULT_DATA_CONSISTENCY
end
yield yield
end end
private private
def mark_data_consistency_location(worker_class, job) def load_balancing_enabled?(worker_class)
# Mailers can't be constantized worker_class &&
return unless worker_class worker_class.include?(::ApplicationWorker) &&
return unless worker_class.include?(::ApplicationWorker) worker_class.utilizes_load_balancing_capabilities? &&
return unless worker_class.get_data_consistency_feature_flag_enabled? worker_class.get_data_consistency_feature_flag_enabled?
end
return if location_already_provided?(job)
job['worker_data_consistency'] = worker_class.get_data_consistency
return unless worker_class.utilizes_load_balancing_capabilities?
def set_data_consistency_location!(job)
if Session.current.use_primary? if Session.current.use_primary?
job['database_write_location'] = load_balancer.primary_write_location job['database_write_location'] = load_balancer.primary_write_location
else else
......
...@@ -10,17 +10,14 @@ module Gitlab ...@@ -10,17 +10,14 @@ module Gitlab
worker_class = worker.class worker_class = worker.class
strategy = select_load_balancing_strategy(worker_class, job) strategy = select_load_balancing_strategy(worker_class, job)
# This is consumed by ServerMetrics and StructuredLogger to emit metrics so we only job['load_balancing_strategy'] = strategy.to_s
# make this available when load-balancing is actually utilized.
job['load_balancing_strategy'] = strategy.to_s if load_balancing_available?(worker_class)
case strategy if use_primary?(strategy)
when :primary, :retry_primary
Session.current.use_primary! Session.current.use_primary!
when :retry_replica elsif strategy == :retry
raise JobReplicaNotUpToDate, "Sidekiq job #{worker_class} JID-#{job['jid']} couldn't use the replica."\ raise JobReplicaNotUpToDate, "Sidekiq job #{worker_class} JID-#{job['jid']} couldn't use the replica."\
" Replica was not up to date." " Replica was not up to date."
when :replica else
# this means we selected an up-to-date replica, but there is nothing to do in this case. # this means we selected an up-to-date replica, but there is nothing to do in this case.
end end
...@@ -36,17 +33,24 @@ module Gitlab ...@@ -36,17 +33,24 @@ module Gitlab
Session.clear_session Session.clear_session
end end
def use_primary?(strategy)
strategy.start_with?('primary')
end
def select_load_balancing_strategy(worker_class, job) def select_load_balancing_strategy(worker_class, job)
return :primary unless load_balancing_available?(worker_class) return :primary unless load_balancing_available?(worker_class)
location = job['database_write_location'] || job['database_replica_location'] location = job['database_write_location'] || job['database_replica_location']
return :primary unless location return :primary_no_wal unless location
if replica_caught_up?(location) if replica_caught_up?(location)
:replica # Happy case: we can read from a replica.
elsif worker_class.get_data_consistency == :delayed retried_before?(worker_class, job) ? :replica_retried : :replica
not_yet_retried?(job) ? :retry_replica : :retry_primary elsif can_retry?(worker_class, job)
# Optimistic case: The worker allows retries and we have retries left.
:retry
else else
# Sad case: we need to fall back to the primary.
:primary :primary
end end
end end
...@@ -57,6 +61,14 @@ module Gitlab ...@@ -57,6 +61,14 @@ module Gitlab
worker_class.get_data_consistency_feature_flag_enabled? worker_class.get_data_consistency_feature_flag_enabled?
end end
def can_retry?(worker_class, job)
worker_class.get_data_consistency == :delayed && not_yet_retried?(job)
end
def retried_before?(worker_class, job)
worker_class.get_data_consistency == :delayed && !not_yet_retried?(job)
end
def not_yet_retried?(job) def not_yet_retried?(job)
# if `retry_count` is `nil` it indicates that this job was never retried # if `retry_count` is `nil` it indicates that this job was never retried
# the `0` indicates that this is a first retry # the `0` indicates that this is a first retry
......
...@@ -5,12 +5,27 @@ require 'spec_helper' ...@@ -5,12 +5,27 @@ require 'spec_helper'
RSpec.describe Gitlab::Database::LoadBalancing::SidekiqClientMiddleware do RSpec.describe Gitlab::Database::LoadBalancing::SidekiqClientMiddleware do
let(:middleware) { described_class.new } let(:middleware) { described_class.new }
let(:load_balancer) { double.as_null_object }
let(:worker_class) { 'TestDataConsistencyWorker' }
let(:job) { { "job_id" => "a180b47c-3fd6-41b8-81e9-34da61c3400e" } }
before do
skip_feature_flags_yaml_validation
skip_default_enabled_yaml_check
allow(::Gitlab::Database::LoadBalancing).to receive_message_chain(:proxy, :load_balancer).and_return(load_balancer)
end
after do after do
Gitlab::Database::LoadBalancing::Session.clear_session Gitlab::Database::LoadBalancing::Session.clear_session
end end
def run_middleware
middleware.call(worker_class, job, nil, nil) {}
end
describe '#call' do describe '#call' do
shared_context 'data consistency worker class' do |data_consistency, feature_flag| shared_context 'data consistency worker class' do |data_consistency, feature_flag|
let(:expected_consistency) { data_consistency }
let(:worker_class) do let(:worker_class) do
Class.new do Class.new do
def self.name def self.name
...@@ -31,13 +46,23 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqClientMiddleware do ...@@ -31,13 +46,23 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqClientMiddleware do
end end
end end
shared_examples_for 'job data consistency' do
it "sets job data consistency" do
run_middleware
expect(job['worker_data_consistency']).to eq(expected_consistency)
end
end
shared_examples_for 'does not pass database locations' do shared_examples_for 'does not pass database locations' do
it 'does not pass database locations', :aggregate_failures do it 'does not pass database locations', :aggregate_failures do
middleware.call(worker_class, job, double(:queue), redis_pool) { 10 } run_middleware
expect(job['database_replica_location']).to be_nil expect(job['database_replica_location']).to be_nil
expect(job['database_write_location']).to be_nil expect(job['database_write_location']).to be_nil
end end
include_examples 'job data consistency'
end end
shared_examples_for 'mark data consistency location' do |data_consistency| shared_examples_for 'mark data consistency location' do |data_consistency|
...@@ -45,7 +70,9 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqClientMiddleware do ...@@ -45,7 +70,9 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqClientMiddleware do
let(:location) { '0/D525E3A8' } let(:location) { '0/D525E3A8' }
context 'when feature flag load_balancing_for_sidekiq is disabled' do context 'when feature flag is disabled' do
let(:expected_consistency) { :always }
before do before do
stub_feature_flags(load_balancing_for_test_data_consistency_worker: false) stub_feature_flags(load_balancing_for_test_data_consistency_worker: false)
end end
...@@ -59,12 +86,14 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqClientMiddleware do ...@@ -59,12 +86,14 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqClientMiddleware do
end end
it 'passes database_replica_location' do it 'passes database_replica_location' do
expect(middleware).to receive_message_chain(:load_balancer, :host, "database_replica_location").and_return(location) expect(load_balancer).to receive_message_chain(:host, "database_replica_location").and_return(location)
middleware.call(worker_class, job, double(:queue), redis_pool) { 10 } run_middleware
expect(job['database_replica_location']).to eq(location) expect(job['database_replica_location']).to eq(location)
end end
include_examples 'job data consistency'
end end
context 'when write was performed' do context 'when write was performed' do
...@@ -73,12 +102,14 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqClientMiddleware do ...@@ -73,12 +102,14 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqClientMiddleware do
end end
it 'passes primary write location', :aggregate_failures do it 'passes primary write location', :aggregate_failures do
expect(middleware).to receive_message_chain(:load_balancer, :primary_write_location).and_return(location) expect(load_balancer).to receive(:primary_write_location).and_return(location)
middleware.call(worker_class, job, double(:queue), redis_pool) { 10 } run_middleware
expect(job['database_write_location']).to eq(location) expect(job['database_write_location']).to eq(location)
end end
include_examples 'job data consistency'
end end
end end
...@@ -89,7 +120,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqClientMiddleware do ...@@ -89,7 +120,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqClientMiddleware do
end end
it 'does not set database locations again' do it 'does not set database locations again' do
middleware.call(worker_class, job, double(:queue), redis_pool) { 10 } run_middleware
expect(job[provided_database_location]).to eq(old_location) expect(job[provided_database_location]).to eq(old_location)
expect(job[other_location]).to be_nil expect(job[other_location]).to be_nil
...@@ -101,8 +132,8 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqClientMiddleware do ...@@ -101,8 +132,8 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqClientMiddleware do
let(:job) { { "job_id" => "a180b47c-3fd6-41b8-81e9-34da61c3400e", provided_database_location => old_location } } let(:job) { { "job_id" => "a180b47c-3fd6-41b8-81e9-34da61c3400e", provided_database_location => old_location } }
before do before do
allow(middleware).to receive_message_chain(:load_balancer, :primary_write_location).and_return(new_location) allow(load_balancer).to receive(:primary_write_location).and_return(new_location)
allow(middleware).to receive_message_chain(:load_balancer, :database_replica_location).and_return(new_location) allow(load_balancer).to receive(:database_replica_location).and_return(new_location)
end end
context "when write was performed" do context "when write was performed" do
...@@ -114,24 +145,16 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqClientMiddleware do ...@@ -114,24 +145,16 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqClientMiddleware do
end end
end end
let(:queue) { 'default' }
let(:redis_pool) { Sidekiq.redis_pool }
let(:worker_class) { 'TestDataConsistencyWorker' }
let(:job) { { "job_id" => "a180b47c-3fd6-41b8-81e9-34da61c3400e" } }
before do
skip_feature_flags_yaml_validation
skip_default_enabled_yaml_check
end
context 'when worker cannot be constantized' do context 'when worker cannot be constantized' do
let(:worker_class) { 'ActionMailer::MailDeliveryJob' } let(:worker_class) { 'ActionMailer::MailDeliveryJob' }
let(:expected_consistency) { :always }
include_examples 'does not pass database locations' include_examples 'does not pass database locations'
end end
context 'when worker class does not include ApplicationWorker' do context 'when worker class does not include ApplicationWorker' do
let(:worker_class) { ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper } let(:worker_class) { ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper }
let(:expected_consistency) { :always }
include_examples 'does not pass database locations' include_examples 'does not pass database locations'
end end
......
...@@ -6,11 +6,16 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do ...@@ -6,11 +6,16 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do
let(:middleware) { described_class.new } let(:middleware) { described_class.new }
let(:load_balancer) { double.as_null_object } let(:load_balancer) { double.as_null_object }
let(:has_replication_lag) { false }
let(:worker) { worker_class.new }
let(:job) { { "retry" => 3, "job_id" => "a180b47c-3fd6-41b8-81e9-34da61c3400e", 'database_replica_location' => '0/D525E3A8' } }
before do before do
skip_feature_flags_yaml_validation
skip_default_enabled_yaml_check
allow(::Gitlab::Database::LoadBalancing).to receive_message_chain(:proxy, :load_balancer).and_return(load_balancer) allow(::Gitlab::Database::LoadBalancing).to receive_message_chain(:proxy, :load_balancer).and_return(load_balancer)
allow(load_balancer).to receive(:select_up_to_date_host).and_return(!has_replication_lag)
replication_lag!(false)
end end
after do after do
...@@ -39,24 +44,34 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do ...@@ -39,24 +44,34 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do
end end
end end
shared_examples_for 'stick to the primary' do shared_examples_for 'load balancing strategy' do |strategy|
it "sets load balancing strategy to #{strategy}" do
run_middleware do
expect(job['load_balancing_strategy']).to eq(strategy)
end
end
end
shared_examples_for 'stick to the primary' do |expected_strategy|
it 'sticks to the primary' do it 'sticks to the primary' do
middleware.call(worker, job, double(:queue)) do run_middleware do
expect(Gitlab::Database::LoadBalancing::Session.current.use_primary?).to be_truthy expect(Gitlab::Database::LoadBalancing::Session.current.use_primary?).to be_truthy
end end
end end
include_examples 'load balancing strategy', expected_strategy
end end
shared_examples_for 'replica is up to date' do |location| shared_examples_for 'replica is up to date' do |location, expected_strategy|
it 'does not stick to the primary', :aggregate_failures do it 'does not stick to the primary', :aggregate_failures do
expect(middleware).to receive(:replica_caught_up?).with(location).and_return(true) expect(middleware).to receive(:replica_caught_up?).with(location).and_return(true)
middleware.call(worker, job, double(:queue)) do run_middleware do
expect(Gitlab::Database::LoadBalancing::Session.current.use_primary?).not_to be_truthy expect(Gitlab::Database::LoadBalancing::Session.current.use_primary?).not_to be_truthy
end end
expect(job['load_balancing_strategy']).to eq('replica')
end end
include_examples 'load balancing strategy', expected_strategy
end end
shared_examples_for 'sticks based on data consistency' do |data_consistency| shared_examples_for 'sticks based on data consistency' do |data_consistency|
...@@ -67,7 +82,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do ...@@ -67,7 +82,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do
stub_feature_flags(load_balancing_for_test_data_consistency_worker: false) stub_feature_flags(load_balancing_for_test_data_consistency_worker: false)
end end
include_examples 'stick to the primary' include_examples 'stick to the primary', 'primary'
end end
context 'when database replica location is set' do context 'when database replica location is set' do
...@@ -77,7 +92,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do ...@@ -77,7 +92,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do
allow(middleware).to receive(:replica_caught_up?).and_return(true) allow(middleware).to receive(:replica_caught_up?).and_return(true)
end end
it_behaves_like 'replica is up to date', '0/D525E3A8' it_behaves_like 'replica is up to date', '0/D525E3A8', 'replica'
end end
context 'when database primary location is set' do context 'when database primary location is set' do
...@@ -87,46 +102,35 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do ...@@ -87,46 +102,35 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do
allow(middleware).to receive(:replica_caught_up?).and_return(true) allow(middleware).to receive(:replica_caught_up?).and_return(true)
end end
it_behaves_like 'replica is up to date', '0/D525E3A8' it_behaves_like 'replica is up to date', '0/D525E3A8', 'replica'
end end
context 'when database location is not set' do context 'when database location is not set' do
let(:job) { { 'job_id' => 'a180b47c-3fd6-41b8-81e9-34da61c3400e' } } let(:job) { { 'job_id' => 'a180b47c-3fd6-41b8-81e9-34da61c3400e' } }
it_behaves_like 'stick to the primary' it_behaves_like 'stick to the primary', 'primary_no_wal'
end end
end end
let(:queue) { 'default' }
let(:redis_pool) { Sidekiq.redis_pool }
let(:worker) { worker_class.new }
let(:job) { { "retry" => 3, "job_id" => "a180b47c-3fd6-41b8-81e9-34da61c3400e", 'database_replica_location' => '0/D525E3A8' } }
let(:block) { 10 }
before do
skip_feature_flags_yaml_validation
skip_default_enabled_yaml_check
allow(middleware).to receive(:clear)
allow(Gitlab::Database::LoadBalancing::Session.current).to receive(:performed_write?).and_return(true)
end
context 'when worker class does not include ApplicationWorker' do context 'when worker class does not include ApplicationWorker' do
let(:worker) { ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper.new } let(:worker) { ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper.new }
include_examples 'stick to the primary' include_examples 'stick to the primary', 'primary'
end end
context 'when worker data consistency is :always' do context 'when worker data consistency is :always' do
include_context 'data consistency worker class', :always, :load_balancing_for_test_data_consistency_worker include_context 'data consistency worker class', :always, :load_balancing_for_test_data_consistency_worker
include_examples 'stick to the primary' include_examples 'stick to the primary', 'primary'
end end
context 'when worker data consistency is :delayed' do context 'when worker data consistency is :delayed' do
include_examples 'sticks based on data consistency', :delayed include_examples 'sticks based on data consistency', :delayed
context 'when replica is not up to date' do context 'when replica is not up to date' do
let(:has_replication_lag) { true } before do
replication_lag!(true)
end
around do |example| around do |example|
with_sidekiq_server_middleware do |chain| with_sidekiq_server_middleware do |chain|
...@@ -136,24 +140,34 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do ...@@ -136,24 +140,34 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do
end end
context 'when job is executed first' do context 'when job is executed first' do
it 'raise an error and retries', :aggregate_failures do it 'raises an error and retries', :aggregate_failures do
expect do expect do
process_job(job) process_job(job)
end.to raise_error(Sidekiq::JobRetry::Skip) end.to raise_error(Sidekiq::JobRetry::Skip)
expect(job['error_class']).to eq('Gitlab::Database::LoadBalancing::SidekiqServerMiddleware::JobReplicaNotUpToDate') expect(job['error_class']).to eq('Gitlab::Database::LoadBalancing::SidekiqServerMiddleware::JobReplicaNotUpToDate')
expect(job['load_balancing_strategy']).to eq('retry_replica')
end end
include_examples 'load balancing strategy', 'retry'
end end
context 'when job is retried' do context 'when job is retried' do
it 'stick to the primary', :aggregate_failures do before do
expect do expect do
process_job(job) process_job(job)
end.to raise_error(Sidekiq::JobRetry::Skip) end.to raise_error(Sidekiq::JobRetry::Skip)
end
process_job(job) context 'and replica still lagging behind' do
expect(job['load_balancing_strategy']).to eq('retry_primary') include_examples 'stick to the primary', 'primary'
end
context 'and replica is now up-to-date' do
before do
replication_lag!(false)
end
it_behaves_like 'replica is up to date', '0/D525E3A8', 'replica_retried'
end end
end end
end end
...@@ -167,20 +181,24 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do ...@@ -167,20 +181,24 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do
allow(middleware).to receive(:replica_caught_up?).and_return(false) allow(middleware).to receive(:replica_caught_up?).and_return(false)
end end
include_examples 'stick to the primary' include_examples 'stick to the primary', 'primary'
it 'updates job hash with primary database chosen', :aggregate_failures do
middleware.call(worker, job, double(:queue)) do
expect(job['load_balancing_strategy']).to eq('primary')
end
end
end end
end end
end end
def process_job(job) def process_job(job)
Sidekiq::JobRetry.new.local(worker_class, job, queue) do Sidekiq::JobRetry.new.local(worker_class, job, 'default') do
worker_class.process_job(job) worker_class.process_job(job)
end end
end end
def run_middleware
middleware.call(worker, job, double(:queue)) { yield }
rescue described_class::JobReplicaNotUpToDate
# we silence errors here that cause the job to retry
end
def replication_lag!(exists)
allow(load_balancer).to receive(:select_up_to_date_host).and_return(!exists)
end
end end
...@@ -260,7 +260,7 @@ RSpec.describe Gitlab::SidekiqMiddleware::ServerMetrics do ...@@ -260,7 +260,7 @@ RSpec.describe Gitlab::SidekiqMiddleware::ServerMetrics do
context 'when worker declares data consistency' do context 'when worker declares data consistency' do
include_context 'worker declaring data consistency' include_context 'worker declaring data consistency'
it 'increments load balancing counter' do it 'increments load balancing counter with defined data consistency' do
process_job process_job
expect(load_balancing_metric).to have_received(:increment).with( expect(load_balancing_metric).to have_received(:increment).with(
...@@ -272,10 +272,14 @@ RSpec.describe Gitlab::SidekiqMiddleware::ServerMetrics do ...@@ -272,10 +272,14 @@ RSpec.describe Gitlab::SidekiqMiddleware::ServerMetrics do
end end
context 'when worker does not declare data consistency' do context 'when worker does not declare data consistency' do
it 'does not increment load balancing counter' do it 'increments load balancing counter with default data consistency' do
process_job process_job
expect(load_balancing_metric).not_to have_received(:increment) expect(load_balancing_metric).to have_received(:increment).with(
a_hash_including(
data_consistency: :always,
load_balancing_strategy: 'primary'
), 1)
end end
end end
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment