Commit c872e7b5 authored by Douglas Barbosa Alexandre's avatar Douglas Barbosa Alexandre

Merge branch '332673-sidekiq-refactor-db-strategy' into 'master'

Refactor database selection in SidekiqServerMiddleware

See merge request gitlab-org/gitlab!63304
parents b58b59e5 18501e37
...@@ -12,6 +12,7 @@ module WorkerAttributes ...@@ -12,6 +12,7 @@ module WorkerAttributes
VALID_URGENCIES = [:high, :low, :throttled].freeze VALID_URGENCIES = [:high, :low, :throttled].freeze
VALID_DATA_CONSISTENCIES = [:always, :sticky, :delayed].freeze VALID_DATA_CONSISTENCIES = [:always, :sticky, :delayed].freeze
DEFAULT_DATA_CONSISTENCY = :always
NAMESPACE_WEIGHTS = { NAMESPACE_WEIGHTS = {
auto_devops: 2, auto_devops: 2,
...@@ -110,7 +111,7 @@ module WorkerAttributes ...@@ -110,7 +111,7 @@ module WorkerAttributes
end end
def get_data_consistency def get_data_consistency
class_attributes[:data_consistency] || :always class_attributes[:data_consistency] || DEFAULT_DATA_CONSISTENCY
end end
def get_data_consistency_feature_flag_enabled? def get_data_consistency_feature_flag_enabled?
......
...@@ -265,7 +265,7 @@ The following metrics are available: ...@@ -265,7 +265,7 @@ The following metrics are available:
| Metric | Type | Since | Description | Labels | | Metric | Type | Since | Description | Labels |
|:--------------------------------- |:--------- |:------------------------------------------------------------- |:-------------------------------------- |:--------------------------------------------------------- | |:--------------------------------- |:--------- |:------------------------------------------------------------- |:-------------------------------------- |:--------------------------------------------------------- |
| `db_load_balancing_hosts` | Gauge | [12.3](https://gitlab.com/gitlab-org/gitlab/-/issues/13630) | Current number of load balancing hosts | | | `db_load_balancing_hosts` | Gauge | [12.3](https://gitlab.com/gitlab-org/gitlab/-/issues/13630) | Current number of load balancing hosts | |
| `sidekiq_load_balancing_count` | Counter | 13.11 | Sidekiq jobs using load balancing with data consistency set to :sticky or :delayed | `queue`, `boundary`, `external_dependencies`, `feature_category`, `job_status`, `urgency`, `data_consistency`, `database_chosen` | | `sidekiq_load_balancing_count` | Counter | 13.11 | Sidekiq jobs using load balancing with data consistency set to :sticky or :delayed | `queue`, `boundary`, `external_dependencies`, `feature_category`, `job_status`, `urgency`, `data_consistency`, `load_balancing_strategy` |
## Database partitioning metrics **(PREMIUM SELF)** ## Database partitioning metrics **(PREMIUM SELF)**
......
...@@ -7,8 +7,21 @@ module Gitlab ...@@ -7,8 +7,21 @@ module Gitlab
JobReplicaNotUpToDate = Class.new(StandardError) JobReplicaNotUpToDate = Class.new(StandardError)
def call(worker, job, _queue) def call(worker, job, _queue)
if requires_primary?(worker.class, job) worker_class = worker.class
strategy = select_load_balancing_strategy(worker_class, job)
# This is consumed by ServerMetrics and StructuredLogger to emit metrics so we only
# make this available when load-balancing is actually utilized.
job['load_balancing_strategy'] = strategy.to_s if load_balancing_available?(worker_class)
case strategy
when :primary, :retry_primary
Session.current.use_primary! Session.current.use_primary!
when :retry_replica
raise JobReplicaNotUpToDate, "Sidekiq job #{worker_class} JID-#{job['jid']} couldn't use the replica."\
" Replica was not up to date."
when :replica
# this means we selected an up-to-date replica, but there is nothing to do in this case.
end end
yield yield
...@@ -23,31 +36,27 @@ module Gitlab ...@@ -23,31 +36,27 @@ module Gitlab
Session.clear_session Session.clear_session
end end
def requires_primary?(worker_class, job) def select_load_balancing_strategy(worker_class, job)
return true unless worker_class.include?(::ApplicationWorker) return :primary unless load_balancing_available?(worker_class)
return true unless worker_class.utilizes_load_balancing_capabilities?
return true unless worker_class.get_data_consistency_feature_flag_enabled?
location = job['database_write_location'] || job['database_replica_location'] location = job['database_write_location'] || job['database_replica_location']
return :primary unless location
return true unless location
job_data_consistency = worker_class.get_data_consistency
job[:data_consistency] = job_data_consistency.to_s
if replica_caught_up?(location) if replica_caught_up?(location)
job[:database_chosen] = 'replica' :replica
false elsif worker_class.get_data_consistency == :delayed
elsif job_data_consistency == :delayed && not_yet_retried?(job) not_yet_retried?(job) ? :retry_replica : :retry_primary
job[:database_chosen] = 'retry'
raise JobReplicaNotUpToDate, "Sidekiq job #{worker_class} JID-#{job['jid']} couldn't use the replica."\
" Replica was not up to date."
else else
job[:database_chosen] = 'primary' :primary
true
end end
end end
def load_balancing_available?(worker_class)
worker_class.include?(::ApplicationWorker) &&
worker_class.utilizes_load_balancing_capabilities? &&
worker_class.get_data_consistency_feature_flag_enabled?
end
def not_yet_retried?(job) def not_yet_retried?(job)
# if `retry_count` is `nil` it indicates that this job was never retried # if `retry_count` is `nil` it indicates that this job was never retried
# the `0` indicates that this is a first retry # the `0` indicates that this is a first retry
......
...@@ -68,7 +68,7 @@ module Gitlab ...@@ -68,7 +68,7 @@ module Gitlab
message = base_message(payload) message = base_message(payload)
payload['database_chosen'] = job[:database_chosen] if job[:database_chosen] payload['load_balancing_strategy'] = job['load_balancing_strategy'] if job['load_balancing_strategy']
if job_exception if job_exception
payload['message'] = "#{message}: fail: #{payload['duration_s']} sec" payload['message'] = "#{message}: fail: #{payload['duration_s']} sec"
......
...@@ -74,10 +74,10 @@ module Gitlab ...@@ -74,10 +74,10 @@ module Gitlab
@metrics[:sidekiq_elasticsearch_requests_total].increment(labels, get_elasticsearch_calls(instrumentation)) @metrics[:sidekiq_elasticsearch_requests_total].increment(labels, get_elasticsearch_calls(instrumentation))
@metrics[:sidekiq_elasticsearch_requests_duration_seconds].observe(labels, get_elasticsearch_time(instrumentation)) @metrics[:sidekiq_elasticsearch_requests_duration_seconds].observe(labels, get_elasticsearch_time(instrumentation))
if ::Gitlab::Database::LoadBalancing.enable? && job[:database_chosen] with_load_balancing_settings(job) do |settings|
load_balancing_labels = { load_balancing_labels = {
database_chosen: job[:database_chosen], load_balancing_strategy: settings['load_balancing_strategy'],
data_consistency: job[:data_consistency] data_consistency: settings['worker_data_consistency']
} }
@metrics[:sidekiq_load_balancing_count].increment(labels.merge(load_balancing_labels), 1) @metrics[:sidekiq_load_balancing_count].increment(labels.merge(load_balancing_labels), 1)
...@@ -105,6 +105,15 @@ module Gitlab ...@@ -105,6 +105,15 @@ module Gitlab
private private
def with_load_balancing_settings(job)
return unless ::Gitlab::Database::LoadBalancing.enable?
keys = %w[load_balancing_strategy worker_data_consistency]
return unless keys.all? { |k| job.key?(k) }
yield job.slice(*keys)
end
def get_thread_cputime def get_thread_cputime
defined?(Process::CLOCK_THREAD_CPUTIME_ID) ? Process.clock_gettime(Process::CLOCK_THREAD_CPUTIME_ID) : 0 defined?(Process::CLOCK_THREAD_CPUTIME_ID) ? Process.clock_gettime(Process::CLOCK_THREAD_CPUTIME_ID) : 0
end end
......
...@@ -5,6 +5,14 @@ require 'spec_helper' ...@@ -5,6 +5,14 @@ require 'spec_helper'
RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do
let(:middleware) { described_class.new } let(:middleware) { described_class.new }
let(:load_balancer) { double.as_null_object }
let(:has_replication_lag) { false }
before do
allow(::Gitlab::Database::LoadBalancing).to receive_message_chain(:proxy, :load_balancer).and_return(load_balancer)
allow(load_balancer).to receive(:select_up_to_date_host).and_return(!has_replication_lag)
end
after do after do
Gitlab::Database::LoadBalancing::Session.clear_session Gitlab::Database::LoadBalancing::Session.clear_session
end end
...@@ -39,7 +47,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do ...@@ -39,7 +47,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do
end end
end end
shared_examples_for 'replica is up to date' do |location, data_consistency| shared_examples_for 'replica is up to date' do |location|
it 'does not stick to the primary', :aggregate_failures do it 'does not stick to the primary', :aggregate_failures do
expect(middleware).to receive(:replica_caught_up?).with(location).and_return(true) expect(middleware).to receive(:replica_caught_up?).with(location).and_return(true)
...@@ -47,13 +55,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do ...@@ -47,13 +55,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do
expect(Gitlab::Database::LoadBalancing::Session.current.use_primary?).not_to be_truthy expect(Gitlab::Database::LoadBalancing::Session.current.use_primary?).not_to be_truthy
end end
expect(job[:database_chosen]).to eq('replica') expect(job['load_balancing_strategy']).to eq('replica')
end
it "updates job hash with data_consistency :#{data_consistency}" do
middleware.call(worker, job, double(:queue)) do
expect(job).to include(data_consistency: data_consistency.to_s)
end
end end
end end
...@@ -75,7 +77,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do ...@@ -75,7 +77,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do
allow(middleware).to receive(:replica_caught_up?).and_return(true) allow(middleware).to receive(:replica_caught_up?).and_return(true)
end end
it_behaves_like 'replica is up to date', '0/D525E3A8', data_consistency it_behaves_like 'replica is up to date', '0/D525E3A8'
end end
context 'when database primary location is set' do context 'when database primary location is set' do
...@@ -85,13 +87,13 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do ...@@ -85,13 +87,13 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do
allow(middleware).to receive(:replica_caught_up?).and_return(true) allow(middleware).to receive(:replica_caught_up?).and_return(true)
end end
it_behaves_like 'replica is up to date', '0/D525E3A8', data_consistency it_behaves_like 'replica is up to date', '0/D525E3A8'
end end
context 'when database location is not set' do context 'when database location is not set' do
let(:job) { { 'job_id' => 'a180b47c-3fd6-41b8-81e9-34da61c3400e' } } let(:job) { { 'job_id' => 'a180b47c-3fd6-41b8-81e9-34da61c3400e' } }
it_behaves_like 'stick to the primary', nil it_behaves_like 'stick to the primary'
end end
end end
...@@ -124,10 +126,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do ...@@ -124,10 +126,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do
include_examples 'sticks based on data consistency', :delayed include_examples 'sticks based on data consistency', :delayed
context 'when replica is not up to date' do context 'when replica is not up to date' do
before do let(:has_replication_lag) { true }
allow(::Gitlab::Database::LoadBalancing).to receive_message_chain(:proxy, :load_balancer, :release_host)
allow(::Gitlab::Database::LoadBalancing).to receive_message_chain(:proxy, :load_balancer, :select_up_to_date_host).and_return(false)
end
around do |example| around do |example|
with_sidekiq_server_middleware do |chain| with_sidekiq_server_middleware do |chain|
...@@ -143,7 +142,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do ...@@ -143,7 +142,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do
end.to raise_error(Sidekiq::JobRetry::Skip) end.to raise_error(Sidekiq::JobRetry::Skip)
expect(job['error_class']).to eq('Gitlab::Database::LoadBalancing::SidekiqServerMiddleware::JobReplicaNotUpToDate') expect(job['error_class']).to eq('Gitlab::Database::LoadBalancing::SidekiqServerMiddleware::JobReplicaNotUpToDate')
expect(job[:database_chosen]).to eq('retry') expect(job['load_balancing_strategy']).to eq('retry_replica')
end end
end end
...@@ -154,7 +153,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do ...@@ -154,7 +153,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do
end.to raise_error(Sidekiq::JobRetry::Skip) end.to raise_error(Sidekiq::JobRetry::Skip)
process_job(job) process_job(job)
expect(job[:database_chosen]).to eq('primary') expect(job['load_balancing_strategy']).to eq('retry_primary')
end end
end end
...@@ -163,7 +162,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do ...@@ -163,7 +162,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do
stub_feature_flags(sidekiq_load_balancing_rotate_up_to_date_replica: false) stub_feature_flags(sidekiq_load_balancing_rotate_up_to_date_replica: false)
end end
it 'uses different implmentation' do it 'uses different implementation' do
expect(::Gitlab::Database::LoadBalancing).to receive_message_chain(:proxy, :load_balancer, :host, :caught_up?).and_return(false) expect(::Gitlab::Database::LoadBalancing).to receive_message_chain(:proxy, :load_balancer, :host, :caught_up?).and_return(false)
expect do expect do
...@@ -185,9 +184,9 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do ...@@ -185,9 +184,9 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do
include_examples 'stick to the primary' include_examples 'stick to the primary'
it 'updates job hash with primary database chosen', :aggregate_failures do it 'updates job hash with primary database chosen', :aggregate_failures do
expect { |b| middleware.call(worker, job, double(:queue), &b) }.to yield_control middleware.call(worker, job, double(:queue)) do
expect(job['load_balancing_strategy']).to eq('primary')
expect(job[:database_chosen]).to eq('primary') end
end end
end end
end end
......
...@@ -342,7 +342,7 @@ RSpec.describe Gitlab::SidekiqLogging::StructuredLogger do ...@@ -342,7 +342,7 @@ RSpec.describe Gitlab::SidekiqLogging::StructuredLogger do
end end
context 'when the job uses load balancing capabilities' do context 'when the job uses load balancing capabilities' do
let(:expected_payload) { { 'database_chosen' => 'retry' } } let(:expected_payload) { { 'load_balancing_strategy' => 'retry' } }
before do before do
allow(Time).to receive(:now).and_return(timestamp) allow(Time).to receive(:now).and_return(timestamp)
...@@ -354,7 +354,7 @@ RSpec.describe Gitlab::SidekiqLogging::StructuredLogger do ...@@ -354,7 +354,7 @@ RSpec.describe Gitlab::SidekiqLogging::StructuredLogger do
expect(logger).to receive(:info).with(include(expected_payload)).ordered expect(logger).to receive(:info).with(include(expected_payload)).ordered
call_subject(job, 'test_queue') do call_subject(job, 'test_queue') do
job[:database_chosen] = 'retry' job['load_balancing_strategy'] = 'retry'
end end
end end
end end
......
...@@ -109,22 +109,20 @@ RSpec.describe Gitlab::SidekiqMiddleware::ServerMetrics do ...@@ -109,22 +109,20 @@ RSpec.describe Gitlab::SidekiqMiddleware::ServerMetrics do
end end
context 'DB load balancing' do context 'DB load balancing' do
using RSpec::Parameterized::TableSyntax
subject { described_class.new } subject { described_class.new }
let(:queue) { :test } let(:queue) { :test }
let(:worker_class) { worker.class } let(:worker_class) { worker.class }
let(:job) { {} } let(:worker) { TestWorker.new }
let(:job_status) { :done } let(:client_middleware) { Gitlab::Database::LoadBalancing::SidekiqClientMiddleware.new }
let(:labels_with_job_status) { default_labels.merge(job_status: job_status.to_s) } let(:load_balancer) { double.as_null_object }
let(:default_labels) do let(:load_balancing_metric) { double('load balancing metric') }
{ queue: queue.to_s, let(:job) { { "retry" => 3, "job_id" => "a180b47c-3fd6-41b8-81e9-34da61c3400e" } }
worker: worker_class.to_s,
boundary: "", def process_job
external_dependencies: "no", client_middleware.call(worker_class, job, queue, double) do
feature_category: "", worker_class.process_job(job)
urgency: "low" } end
end end
before do before do
...@@ -132,84 +130,93 @@ RSpec.describe Gitlab::SidekiqMiddleware::ServerMetrics do ...@@ -132,84 +130,93 @@ RSpec.describe Gitlab::SidekiqMiddleware::ServerMetrics do
TestWorker.class_eval do TestWorker.class_eval do
include Sidekiq::Worker include Sidekiq::Worker
include WorkerAttributes include WorkerAttributes
def perform(*args)
end end
end end
let(:worker) { TestWorker.new } allow(::Gitlab::Database::LoadBalancing).to receive_message_chain(:proxy, :load_balancer).and_return(load_balancer)
allow(load_balancing_metric).to receive(:increment)
include_context 'server metrics with mocked prometheus'
context 'when load_balancing is enabled' do
let(:load_balancing_metric) { double('load balancing metric') }
include_context 'clear DB Load Balancing configuration'
before do
allow(::Gitlab::Database::LoadBalancing).to receive(:enable?).and_return(true)
allow(Gitlab::Metrics).to receive(:counter).with(:sidekiq_load_balancing_count, anything).and_return(load_balancing_metric) allow(Gitlab::Metrics).to receive(:counter).with(:sidekiq_load_balancing_count, anything).and_return(load_balancing_metric)
end end
describe '#initialize' do around do |example|
it 'sets load_balancing metrics' do with_sidekiq_server_middleware do |chain|
expect(Gitlab::Metrics).to receive(:counter).with(:sidekiq_load_balancing_count, anything).and_return(load_balancing_metric) chain.add Gitlab::Database::LoadBalancing::SidekiqServerMiddleware
chain.add described_class
subject Sidekiq::Testing.inline! { example.run }
end end
end end
describe '#call' do include_context 'server metrics with mocked prometheus'
include_context 'server metrics call' include_context 'server metrics call'
include_context 'clear DB Load Balancing configuration'
context 'when :database_chosen is provided' do shared_context 'worker declaring data consistency' do
where(:database_chosen) do let(:worker_class) { LBTestWorker }
%w[primary retry replica]
end before do
stub_const('LBTestWorker', Class.new(TestWorker))
LBTestWorker.class_eval do
include ApplicationWorker
with_them do data_consistency :delayed
context "when #{params[:database_chosen]} is used" do end
let(:labels_with_load_balancing) do end
labels_with_job_status.merge(database_chosen: database_chosen, data_consistency: 'delayed')
end end
context 'when load_balancing is enabled' do
before do before do
job[:database_chosen] = database_chosen allow(::Gitlab::Database::LoadBalancing).to receive(:enable?).and_return(true)
job[:data_consistency] = 'delayed'
allow(load_balancing_metric).to receive(:increment)
end end
it 'increment sidekiq_load_balancing_count' do describe '#call' do
expect(load_balancing_metric).to receive(:increment).with(labels_with_load_balancing, 1) context 'when worker declares data consistency' do
include_context 'worker declaring data consistency'
described_class.new.call(worker, job, :test) { nil } it 'increments load balancing counter' do
end process_job
end
expect(load_balancing_metric).to have_received(:increment).with(
a_hash_including(
data_consistency: :delayed,
load_balancing_strategy: 'replica'
), 1)
end end
end end
context 'when :database_chosen is not provided' do context 'when worker does not declare data consistency' do
it 'does not increment sidekiq_load_balancing_count' do it 'does not increment load balancing counter' do
expect(load_balancing_metric).not_to receive(:increment) process_job
described_class.new.call(worker, job, :test) { nil } expect(load_balancing_metric).not_to have_received(:increment)
end end
end end
end end
end end
context 'when load_balancing is disabled' do context 'when load_balancing is disabled' do
include_context 'clear DB Load Balancing configuration' include_context 'worker declaring data consistency'
before do before do
allow(::Gitlab::Database::LoadBalancing).to receive(:enable?).and_return(false) allow(::Gitlab::Database::LoadBalancing).to receive(:enable?).and_return(false)
end end
describe '#initialize' do describe '#initialize' do
it 'doesnt set load_balancing metrics' do it 'does not set load_balancing metrics' do
expect(Gitlab::Metrics).not_to receive(:counter).with(:sidekiq_load_balancing_count, anything) expect(Gitlab::Metrics).not_to receive(:counter).with(:sidekiq_load_balancing_count, anything)
subject subject
end end
end end
describe '#call' do
it 'does not increment load balancing counter' do
process_job
expect(load_balancing_metric).not_to have_received(:increment)
end
end
end end
end end
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment