Commit 22e8bc0a authored by Sean McGivern's avatar Sean McGivern

Update Sidekiq to 6.3.1

This means we can remove our patches to use pipelining instead of multi
and to use an atomic Lua scheduler, because both have been upstreamed!

Changelog: other
parent 3bbc4ef3
......@@ -196,7 +196,7 @@ gem 'state_machines-activerecord', '~> 0.8.0'
gem 'acts-as-taggable-on', '~> 7.0'
# Background jobs
gem 'sidekiq', '~> 6.2.2'
gem 'sidekiq', '~> 6.3'
gem 'sidekiq-cron', '~> 1.0'
gem 'redis-namespace', '~> 1.8.1'
gem 'gitlab-sidekiq-fetcher', '0.8.0', require: 'sidekiq-reliable-fetch'
......
......@@ -1172,7 +1172,7 @@ GEM
shellany (0.0.1)
shoulda-matchers (4.0.1)
activesupport (>= 4.2.0)
sidekiq (6.2.2)
sidekiq (6.3.1)
connection_pool (>= 2.2.2)
rack (~> 2.0)
redis (>= 4.2.0)
......@@ -1616,7 +1616,7 @@ DEPENDENCIES
sentry-raven (~> 3.1)
settingslogic (~> 2.0.9)
shoulda-matchers (~> 4.0.1)
sidekiq (~> 6.2.2)
sidekiq (~> 6.3)
sidekiq-cron (~> 1.0)
simple_po_parser (~> 1.1.2)
simplecov (~> 0.18.5)
......
---
name: atomic_sidekiq_scheduler
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/72380
rollout_issue_url:
milestone: '14.5'
type: development
group: group::project management
default_enabled: false
......@@ -29,7 +29,6 @@ use_sidekiq_legacy_memory_killer = !use_sidekiq_daemon_memory_killer
Sidekiq.configure_server do |config|
config.options[:strict] = false
config.options[:queues] = Gitlab::SidekiqConfig.expand_queues(config.options[:queues])
config.options[:scheduled_enq] = Gitlab::SidekiqEnq
Sidekiq.logger.info "Listening on queues #{config.options[:queues].uniq.sort}"
......@@ -115,5 +114,5 @@ Sidekiq.configure_client do |config|
config.client_middleware(&Gitlab::SidekiqMiddleware.client_configurator)
end
Sidekiq::Client.prepend Gitlab::Patch::SidekiqClient
Sidekiq::Cron::Poller.prepend Gitlab::Patch::SidekiqCronPoller
Sidekiq::Scheduled::Poller.prepend Gitlab::Patch::SidekiqPoller
Sidekiq::Cron::Poller.prepend Gitlab::Patch::SidekiqPoller
# frozen_string_literal: true
module Gitlab
module Patch
module SidekiqClient
private
# This is a copy of https://github.com/mperham/sidekiq/blob/v6.2.2/lib/sidekiq/client.rb#L187-L194
# but using `conn.pipelined` instead of `conn.multi`. The multi call isn't needed here because in
# the case of scheduled jobs, only one Redis call is made. For other jobs, we don't really need
# the commands to be atomic.
def raw_push(payloads)
@redis_pool.with do |conn| # rubocop:disable Gitlab/ModuleWithInstanceVariables
conn.pipelined do
atomic_push(conn, payloads)
end
end
true
end
end
end
end
......@@ -2,7 +2,7 @@
module Gitlab
module Patch
module SidekiqCronPoller
module SidekiqPoller
def enqueue
Rails.application.reloader.wrap do
::Gitlab::WithRequestStore.with_request_store do
......
# frozen_string_literal: true
module Gitlab
class SidekiqEnq
LUA_ZPOPBYSCORE = <<~EOS
local key, now = KEYS[1], ARGV[1]
local jobs = redis.call("zrangebyscore", key, "-inf", now, "limit", 0, 1)
if jobs[1] then
redis.call("zrem", key, jobs[1])
return jobs[1]
end
EOS
LUA_ZPOPBYSCORE_SHA = Digest::SHA1.hexdigest(LUA_ZPOPBYSCORE)
def enqueue_jobs(now = Time.now.to_f.to_s, sorted_sets = Sidekiq::Scheduled::SETS)
Rails.application.reloader.wrap do
::Gitlab::WithRequestStore.with_request_store do
if Feature.enabled?(:atomic_sidekiq_scheduler, default_enabled: :yaml)
atomic_find_jobs_and_enqueue(now, sorted_sets)
else
find_jobs_and_enqueue(now, sorted_sets)
end
ensure
::Gitlab::Database::LoadBalancing.release_hosts
end
end
end
private
# This is a copy of https://github.com/mperham/sidekiq/blob/32c55e31659a1e6bd42f98334cca5eef2863de8d/lib/sidekiq/scheduled.rb#L11-L34
#
# It effectively reverts
# https://github.com/mperham/sidekiq/commit/9b75467b33759888753191413eddbc15c37a219e
# because we observe that the extra ZREMs caused by this change can lead to high
# CPU usage on Redis at peak times:
# https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1179
#
def find_jobs_and_enqueue(now, sorted_sets)
# A job's "score" in Redis is the time at which it should be processed.
# Just check Redis for the set of jobs with a timestamp before now.
Sidekiq.redis do |conn|
sorted_sets.each do |sorted_set|
start_time = ::Gitlab::Metrics::System.monotonic_time
jobs = redundant_jobs = 0
Sidekiq.logger.info(message: 'Enqueuing scheduled jobs', status: 'start', sorted_set: sorted_set)
# Get the next item in the queue if it's score (time to execute) is <= now.
# We need to go through the list one at a time to reduce the risk of something
# going wrong between the time jobs are popped from the scheduled queue and when
# they are pushed onto a work queue and losing the jobs.
while job = conn.zrangebyscore(sorted_set, "-inf", now, limit: [0, 1]).first
# Pop item off the queue and add it to the work queue. If the job can't be popped from
# the queue, it's because another process already popped it so we can move on to the
# next one.
if conn.zrem(sorted_set, job)
jobs += 1
Sidekiq::Client.push(Sidekiq.load_json(job))
else
redundant_jobs += 1
end
end
end_time = ::Gitlab::Metrics::System.monotonic_time
Sidekiq.logger.info(message: 'Enqueuing scheduled jobs',
status: 'done',
sorted_set: sorted_set,
jobs_count: jobs,
redundant_jobs_count: redundant_jobs,
duration_s: end_time - start_time)
end
end
end
def atomic_find_jobs_and_enqueue(now, sorted_sets)
Sidekiq.redis do |conn|
sorted_sets.each do |sorted_set|
start_time = ::Gitlab::Metrics::System.monotonic_time
jobs = 0
Sidekiq.logger.info(message: 'Atomically enqueuing scheduled jobs', status: 'start', sorted_set: sorted_set)
while job = redis_eval_lua(conn, LUA_ZPOPBYSCORE, LUA_ZPOPBYSCORE_SHA, keys: [sorted_set], argv: [now])
jobs += 1
Sidekiq::Client.push(Sidekiq.load_json(job))
end
end_time = ::Gitlab::Metrics::System.monotonic_time
Sidekiq.logger.info(message: 'Atomically enqueuing scheduled jobs',
status: 'done',
sorted_set: sorted_set,
jobs_count: jobs,
duration_s: end_time - start_time)
end
end
end
def redis_eval_lua(conn, script, sha, keys: nil, argv: nil)
conn.evalsha(sha, keys: keys, argv: argv)
rescue ::Redis::CommandError => e
if e.message.start_with?('NOSCRIPT')
conn.eval(script, keys: keys, argv: argv)
else
raise
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::SidekiqEnq, :clean_gitlab_redis_queues do
let(:retry_set) { Sidekiq::Scheduled::SETS.first }
let(:schedule_set) { Sidekiq::Scheduled::SETS.last }
around do |example|
freeze_time { example.run }
end
shared_examples 'finds jobs that are due and enqueues them' do
before do
Sidekiq.redis do |redis|
redis.zadd(retry_set, (Time.current - 1.day).to_f.to_s, '{"jid": 1}')
redis.zadd(retry_set, Time.current.to_f.to_s, '{"jid": 2}')
redis.zadd(retry_set, (Time.current + 1.day).to_f.to_s, '{"jid": 3}')
redis.zadd(schedule_set, (Time.current - 1.day).to_f.to_s, '{"jid": 4}')
redis.zadd(schedule_set, Time.current.to_f.to_s, '{"jid": 5}')
redis.zadd(schedule_set, (Time.current + 1.day).to_f.to_s, '{"jid": 6}')
end
end
it 'enqueues jobs that are due' do
expect(Sidekiq::Client).to receive(:push).with({ 'jid' => 1 })
expect(Sidekiq::Client).to receive(:push).with({ 'jid' => 2 })
expect(Sidekiq::Client).to receive(:push).with({ 'jid' => 4 })
expect(Sidekiq::Client).to receive(:push).with({ 'jid' => 5 })
Gitlab::SidekiqEnq.new.enqueue_jobs
Sidekiq.redis do |redis|
expect(redis.zscan_each(retry_set).map(&:first)).to contain_exactly('{"jid": 3}')
expect(redis.zscan_each(schedule_set).map(&:first)).to contain_exactly('{"jid": 6}')
end
end
end
context 'when atomic_sidekiq_scheduler is disabled' do
before do
stub_feature_flags(atomic_sidekiq_scheduler: false)
end
it_behaves_like 'finds jobs that are due and enqueues them'
context 'when ZRANGEBYSCORE returns a job that is already removed by another process' do
before do
Sidekiq.redis do |redis|
redis.zadd(schedule_set, Time.current.to_f.to_s, '{"jid": 1}')
allow(redis).to receive(:zrangebyscore).and_wrap_original do |m, *args, **kwargs|
m.call(*args, **kwargs).tap do |jobs|
redis.zrem(schedule_set, jobs.first) if args[0] == schedule_set && jobs.first
end
end
end
end
it 'calls ZREM but does not enqueue the job' do
Sidekiq.redis do |redis|
expect(redis).to receive(:zrem).with(schedule_set, '{"jid": 1}').twice.and_call_original
end
expect(Sidekiq::Client).not_to receive(:push)
Gitlab::SidekiqEnq.new.enqueue_jobs
end
end
end
context 'when atomic_sidekiq_scheduler is enabled' do
before do
stub_feature_flags(atomic_sidekiq_scheduler: true)
end
context 'when Lua script is not yet loaded' do
before do
Gitlab::Redis::Queues.with { |redis| redis.script(:flush) }
end
it_behaves_like 'finds jobs that are due and enqueues them'
end
context 'when Lua script is already loaded' do
before do
Gitlab::SidekiqEnq.new.enqueue_jobs
end
it_behaves_like 'finds jobs that are due and enqueues them'
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment