Commit 4fea71a9 authored by Stan Hu's avatar Stan Hu

Enable logging of Sidekiq bulk job insertions

In https://gitlab.com/gitlab-com/gl-infra/production/-/issues/5808, we
haven't been able tell whether ProjectImportScheduleWorker jobs were
ever scheduled or that they were dropped somehow. This change will
enable logging of the Sidekiq bulk job insertions for these workers.

Changelog: added
parent dfa79015
...@@ -131,17 +131,27 @@ module ApplicationWorker ...@@ -131,17 +131,27 @@ module ApplicationWorker
end end
end end
def log_bulk_perform_async?
@log_bulk_perform_async
end
def log_bulk_perform_async!
@log_bulk_perform_async = true
end
def queue_size def queue_size
Sidekiq::Queue.new(queue).size Sidekiq::Queue.new(queue).size
end end
def bulk_perform_async(args_list) def bulk_perform_async(args_list)
if Feature.enabled?(:sidekiq_push_bulk_in_batches) if log_bulk_perform_async?
in_safe_limit_batches(args_list) do |args_batch, _| Sidekiq.logger.info('class' => self, 'args_list' => args_list, 'args_list_count' => args_list.length, 'message' => 'Inserting multiple jobs')
Sidekiq::Client.push_bulk('class' => self, 'args' => args_batch) end
do_push_bulk(args_list).tap do |job_ids|
if log_bulk_perform_async?
Sidekiq.logger.info('class' => self, 'jid_list' => job_ids, 'jid_list_count' => job_ids.length, 'message' => 'Completed JID insertion')
end end
else
Sidekiq::Client.push_bulk('class' => self, 'args' => args_list)
end end
end end
...@@ -188,6 +198,16 @@ module ApplicationWorker ...@@ -188,6 +198,16 @@ module ApplicationWorker
private private
def do_push_bulk(args_list)
if Feature.enabled?(:sidekiq_push_bulk_in_batches)
in_safe_limit_batches(args_list) do |args_batch, _|
Sidekiq::Client.push_bulk('class' => self, 'args' => args_batch)
end
else
Sidekiq::Client.push_bulk('class' => self, 'args' => args_list)
end
end
def in_safe_limit_batches(args_list, schedule_at = nil, safe_limit = SAFE_PUSH_BULK_LIMIT) def in_safe_limit_batches(args_list, schedule_at = nil, safe_limit = SAFE_PUSH_BULK_LIMIT)
# `schedule_at` could be one of # `schedule_at` could be one of
# - nil. # - nil.
......
...@@ -12,6 +12,7 @@ class ProjectImportScheduleWorker ...@@ -12,6 +12,7 @@ class ProjectImportScheduleWorker
feature_category :source_code_management feature_category :source_code_management
sidekiq_options retry: false sidekiq_options retry: false
loggable_arguments 1 # For the job waiter key loggable_arguments 1 # For the job waiter key
log_bulk_perform_async!
# UpdateAllMirrorsWorker depends on the queue size of this worker: # UpdateAllMirrorsWorker depends on the queue size of this worker:
# https://gitlab.com/gitlab-org/gitlab/-/issues/340630 # https://gitlab.com/gitlab-org/gitlab/-/issues/340630
......
...@@ -341,6 +341,7 @@ RSpec.describe ApplicationWorker do ...@@ -341,6 +341,7 @@ RSpec.describe ApplicationWorker do
it 'enqueues jobs in one go' do it 'enqueues jobs in one go' do
expect(Sidekiq::Client).to( expect(Sidekiq::Client).to(
receive(:push_bulk).with(hash_including('args' => args)).once.and_call_original) receive(:push_bulk).with(hash_including('args' => args)).once.and_call_original)
expect(Sidekiq.logger).not_to receive(:info)
perform_action perform_action
...@@ -349,6 +350,19 @@ RSpec.describe ApplicationWorker do ...@@ -349,6 +350,19 @@ RSpec.describe ApplicationWorker do
end end
end end
shared_examples_for 'logs bulk insertions' do
it 'logs arguments and job IDs' do
worker.log_bulk_perform_async!
expect(Sidekiq.logger).to(
receive(:info).with(hash_including('args_list' => args)).once.and_call_original)
expect(Sidekiq.logger).to(
receive(:info).with(hash_including('jid_list' => anything)).once.and_call_original)
perform_action
end
end
before do before do
stub_const(worker.name, worker) stub_const(worker.name, worker)
end end
...@@ -381,6 +395,7 @@ RSpec.describe ApplicationWorker do ...@@ -381,6 +395,7 @@ RSpec.describe ApplicationWorker do
include_context 'set safe limit beyond the number of jobs to be enqueued' include_context 'set safe limit beyond the number of jobs to be enqueued'
it_behaves_like 'enqueues jobs in one go' it_behaves_like 'enqueues jobs in one go'
it_behaves_like 'logs bulk insertions'
it_behaves_like 'returns job_id of all enqueued jobs' it_behaves_like 'returns job_id of all enqueued jobs'
it_behaves_like 'does not schedule the jobs for any specific time' it_behaves_like 'does not schedule the jobs for any specific time'
end end
...@@ -400,6 +415,7 @@ RSpec.describe ApplicationWorker do ...@@ -400,6 +415,7 @@ RSpec.describe ApplicationWorker do
include_context 'set safe limit beyond the number of jobs to be enqueued' include_context 'set safe limit beyond the number of jobs to be enqueued'
it_behaves_like 'enqueues jobs in one go' it_behaves_like 'enqueues jobs in one go'
it_behaves_like 'logs bulk insertions'
it_behaves_like 'returns job_id of all enqueued jobs' it_behaves_like 'returns job_id of all enqueued jobs'
it_behaves_like 'does not schedule the jobs for any specific time' it_behaves_like 'does not schedule the jobs for any specific time'
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment