Commit 44cf24c1 authored by Thong Kuah's avatar Thong Kuah

Merge branch 'bvl-context-for-batch-scheduling' into 'master'

Allow scheduling jobs in batch with contexts

See merge request gitlab-org/gitlab!23519
parents 1e7a034e 01b394f0
......@@ -12,8 +12,46 @@ module WorkerContext
@worker_context || superclass_context
end
def bulk_perform_async_with_contexts(objects, arguments_proc:, context_proc:)
with_batch_contexts(objects, arguments_proc, context_proc) do |arguments|
bulk_perform_async(arguments)
end
end
def bulk_perform_in_with_contexts(delay, objects, arguments_proc:, context_proc:)
with_batch_contexts(objects, arguments_proc, context_proc) do |arguments|
bulk_perform_in(delay, arguments)
end
end
def context_for_arguments(args)
batch_context&.context_for(args)
end
private
BATCH_CONTEXT_KEY = "#{name}_batch_context"
def batch_context
Thread.current[BATCH_CONTEXT_KEY]
end
def batch_context=(value)
Thread.current[BATCH_CONTEXT_KEY] = value
end
def with_batch_contexts(objects, arguments_proc, context_proc)
self.batch_context = Gitlab::BatchWorkerContext.new(
objects,
arguments_proc: arguments_proc,
context_proc: context_proc
)
yield(batch_context.arguments)
ensure
self.batch_context = nil
end
def superclass_context
return unless superclass.include?(WorkerContext)
......
......@@ -47,11 +47,12 @@ class UpdateAllMirrorsWorker
projects = pull_mirrors_batch(freeze_at: now, batch_size: batch_size, offset_at: last).to_a
break if projects.empty?
project_ids = projects.lazy.select(&:mirror?).take(capacity).map(&:id).force
capacity -= project_ids.length
projects_to_schedule = projects.lazy.select(&:mirror?).take(capacity).force
capacity -= projects_to_schedule.size
ProjectImportScheduleWorker.bulk_perform_async(project_ids.map { |id| [id] })
scheduled += project_ids.length
schedule_projects_in_batch(projects_to_schedule)
scheduled += projects_to_schedule.length
# If fewer than `batch_size` projects were returned, we don't need to query again
break if projects.length < batch_size
......@@ -95,6 +96,7 @@ class UpdateAllMirrorsWorker
.mirrors_to_sync(freeze_at)
.reorder('import_state.next_execution_timestamp')
.limit(batch_size)
.with_route
.with_namespace # Used by `project.mirror?`
relation = relation.where('import_state.next_execution_timestamp > ?', offset_at) if offset_at
......@@ -102,4 +104,12 @@ class UpdateAllMirrorsWorker
relation
end
# rubocop: enable CodeReuse/ActiveRecord
def schedule_projects_in_batch(projects)
ProjectImportScheduleWorker.bulk_perform_async_with_contexts(
projects,
arguments_proc: -> (project) { project.id },
context_proc: -> (project) { { project: project } }
)
end
end
# frozen_string_literal: true
module Gitlab
class BatchWorkerContext
def initialize(objects, arguments_proc:, context_proc:)
@objects = objects
@arguments_proc = arguments_proc
@context_proc = context_proc
end
def arguments
context_by_arguments.keys
end
def context_for(arguments)
context_by_arguments[arguments]
end
private
attr_reader :objects, :arguments_proc, :context_proc
def context_by_arguments
@context_by_arguments ||= objects.each_with_object({}) do |object, result|
arguments = Array.wrap(arguments_proc.call(object))
context = Gitlab::ApplicationContext.new(context_proc.call(object))
result[arguments] = context
end
end
end
end
......@@ -29,6 +29,7 @@ module Gitlab
lambda do |chain|
chain.add Gitlab::SidekiqStatus::ClientMiddleware
chain.add Gitlab::SidekiqMiddleware::ClientMetrics
chain.add Gitlab::SidekiqMiddleware::WorkerContext::Client # needs to be before the Labkit middleware
chain.add Labkit::Middleware::Sidekiq::Client
end
end
......
# frozen_string_literal: true
module Gitlab
module SidekiqMiddleware
module WorkerContext
private
def wrap_in_optional_context(context_or_nil, &block)
return yield unless context_or_nil
context_or_nil.use(&block)
end
end
end
end
# frozen_string_literal: true
module Gitlab
module SidekiqMiddleware
module WorkerContext
class Client
include Gitlab::SidekiqMiddleware::WorkerContext
def call(worker_class_or_name, job, _queue, _redis_pool, &block)
worker_class = worker_class_or_name.to_s.safe_constantize
# Mailers can't be constantized like this
return yield unless worker_class
return yield unless worker_class.include?(::ApplicationWorker)
context_for_args = worker_class.context_for_arguments(job['args'])
wrap_in_optional_context(context_for_args, &block)
end
end
end
end
end
......@@ -4,6 +4,8 @@ module Gitlab
module SidekiqMiddleware
module WorkerContext
class Server
include Gitlab::SidekiqMiddleware::WorkerContext
def call(worker, job, _queue, &block)
worker_class = worker.class
......@@ -13,14 +15,6 @@ module Gitlab
# Use the context defined on the class level as a base context
wrap_in_optional_context(worker_class.get_worker_context, &block)
end
private
def wrap_in_optional_context(context, &block)
return yield unless context
context.use(&block)
end
end
end
end
......
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::BatchWorkerContext do
subject(:batch_context) do
described_class.new(
%w(hello world),
arguments_proc: -> (word) { word },
context_proc: -> (word) { { user: build_stubbed(:user, username: word) } }
)
end
describe "#arguments" do
it "returns all the expected arguments in arrays" do
expect(batch_context.arguments).to eq([%w(hello), %w(world)])
end
end
describe "#context_for" do
it "returns the correct application context for the arguments" do
context = batch_context.context_for(%w(world))
expect(context).to be_a(Gitlab::ApplicationContext)
expect(context.to_lazy_hash[:user].call).to eq("world")
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::SidekiqMiddleware::WorkerContext::Client do
let(:worker_class) do
Class.new do
def self.name
'TestWithContextWorker'
end
include ApplicationWorker
def self.job_for_args(args)
jobs.find { |job| job['args'] == args }
end
def perform(*args)
end
end
end
before do
stub_const('TestWithContextWorker', worker_class)
end
describe "#call" do
it 'applies a context for jobs scheduled in batch' do
user_per_job = { 'job1' => build_stubbed(:user, username: 'user-1'),
'job2' => build_stubbed(:user, username: 'user-2') }
TestWithContextWorker.bulk_perform_async_with_contexts(
%w(job1 job2),
arguments_proc: -> (name) { [name, 1, 2, 3] },
context_proc: -> (name) { { user: user_per_job[name] } }
)
job1 = TestWithContextWorker.job_for_args(['job1', 1, 2, 3])
job2 = TestWithContextWorker.job_for_args(['job2', 1, 2, 3])
expect(job1['meta.user']).to eq(user_per_job['job1'].username)
expect(job2['meta.user']).to eq(user_per_job['job2'].username)
end
end
end
......@@ -110,6 +110,14 @@ describe Gitlab::SidekiqMiddleware do
let(:queue) { 'default' }
let(:redis_pool) { Sidekiq.redis_pool }
let(:middleware_expected_args) { [worker_class_arg, job, queue, redis_pool] }
let(:expected_middlewares) do
[
Gitlab::SidekiqStatus::ClientMiddleware,
Gitlab::SidekiqMiddleware::ClientMetrics,
Gitlab::SidekiqMiddleware::WorkerContext::Client,
Labkit::Middleware::Sidekiq::Client
]
end
before do
described_class.client_configurator.call(chain)
......@@ -120,8 +128,9 @@ describe Gitlab::SidekiqMiddleware do
# this will prevent the full middleware chain from being executed.
# This test ensures that this does not happen
it "invokes the chain" do
expect_any_instance_of(Gitlab::SidekiqStatus::ClientMiddleware).to receive(:call).with(*middleware_expected_args).once.and_call_original
expect_any_instance_of(Labkit::Middleware::Sidekiq::Client).to receive(:call).with(*middleware_expected_args).once.and_call_original
expected_middlewares do |middleware|
expect_any_instance_of(middleware).to receive(:call).with(*middleware_expected_args).once.ordered.and_call_original
end
expect { |b| chain.invoke(worker_class_arg, job, queue, redis_pool, &b) }.to yield_control.once
end
......
......@@ -5,7 +5,11 @@ require 'spec_helper'
describe WorkerContext do
let(:worker) do
Class.new do
include WorkerContext
def self.name
"TestWorker"
end
include ApplicationWorker
end
end
......@@ -24,6 +28,78 @@ describe WorkerContext do
end
end
shared_examples 'tracking bulk scheduling contexts' do
describe "context contents" do
before do
# stub clearing the contexts, so we can check what's inside
allow(worker).to receive(:batch_context=).and_call_original
allow(worker).to receive(:batch_context=).with(nil)
end
it 'keeps track of the context per key to schedule' do
subject
expect(worker.context_for_arguments(["hello"])).to be_a(Gitlab::ApplicationContext)
end
it 'does not share contexts across threads' do
t1_context = nil
t2_context = nil
Thread.new do
subject
t1_context = worker.context_for_arguments(["hello"])
end.join
Thread.new do
t2_context = worker.context_for_arguments(["hello"])
end.join
expect(t1_context).to be_a(Gitlab::ApplicationContext)
expect(t2_context).to be_nil
end
end
it 'clears the contexts' do
subject
expect(worker.__send__(:batch_context)).to be_nil
end
end
describe '.bulk_perform_async_with_contexts' do
subject do
worker.bulk_perform_async_with_contexts(%w(hello world),
context_proc: -> (_) { { user: build_stubbed(:user) } },
arguments_proc: -> (word) { word })
end
it 'calls bulk_perform_async with the arguments' do
expect(worker).to receive(:bulk_perform_async).with([["hello"], ["world"]])
subject
end
it_behaves_like 'tracking bulk scheduling contexts'
end
describe '.bulk_perform_in_with_contexts' do
subject do
worker.bulk_perform_in_with_contexts(10.minutes,
%w(hello world),
context_proc: -> (_) { { user: build_stubbed(:user) } },
arguments_proc: -> (word) { word })
end
it 'calls bulk_perform_in with the arguments and delay' do
expect(worker).to receive(:bulk_perform_in).with(10.minutes, [["hello"], ["world"]])
subject
end
it_behaves_like 'tracking bulk scheduling contexts'
end
describe '#with_context' do
it 'allows modifying context when the job is running' do
worker.new.with_context(user: build_stubbed(:user, username: 'jane-doe')) do
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment