Commit c310003d authored by Bob Van Landuyt's avatar Bob Van Landuyt Committed by Alessio Caiazza

Clear some fields from Cron workers

Cron workers, regardless from how they are schedules, can't have a
context pointing to a user, project or a namespace. Since they don't
run with arguments, they are never scoped to a single resource.

So clear the context, to make sure that every job that gets scheduled
from within a cron worker doesn't inherit it's context.

This also adds a `with_context` helper to the workers to facilitate
scheduling a context for workers that are spun off from a cron worker.
parent 869fc479
...@@ -7,14 +7,14 @@ class Import::BaseController < ApplicationController ...@@ -7,14 +7,14 @@ class Import::BaseController < ApplicationController
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def find_already_added_projects(import_type) def find_already_added_projects(import_type)
current_user.created_projects.where(import_type: import_type).includes(:import_state) current_user.created_projects.where(import_type: import_type).with_import_state
end end
# rubocop: enable CodeReuse/ActiveRecord # rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def find_jobs(import_type) def find_jobs(import_type)
current_user.created_projects current_user.created_projects
.includes(:import_state) .with_import_state
.where(import_type: import_type) .where(import_type: import_type)
.to_json(only: [:id], methods: [:import_status]) .to_json(only: [:id], methods: [:import_status])
end end
......
...@@ -82,7 +82,7 @@ class Import::BitbucketServerController < Import::BaseController ...@@ -82,7 +82,7 @@ class Import::BitbucketServerController < Import::BaseController
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def filter_added_projects(import_type, import_sources) def filter_added_projects(import_type, import_sources)
current_user.created_projects.where(import_type: import_type, import_source: import_sources).includes(:import_state) current_user.created_projects.where(import_type: import_type, import_source: import_sources).with_import_state
end end
# rubocop: enable CodeReuse/ActiveRecord # rubocop: enable CodeReuse/ActiveRecord
......
...@@ -87,7 +87,7 @@ class Import::ManifestController < Import::BaseController ...@@ -87,7 +87,7 @@ class Import::ManifestController < Import::BaseController
group.all_projects group.all_projects
.where(import_type: 'manifest') .where(import_type: 'manifest')
.where(creator_id: current_user) .where(creator_id: current_user)
.includes(:import_state) .with_import_state
end end
# rubocop: enable CodeReuse/ActiveRecord # rubocop: enable CodeReuse/ActiveRecord
......
...@@ -12,16 +12,14 @@ class ContributedProjectsFinder < UnionFinder ...@@ -12,16 +12,14 @@ class ContributedProjectsFinder < UnionFinder
# visible by this user. # visible by this user.
# #
# Returns an ActiveRecord::Relation. # Returns an ActiveRecord::Relation.
# rubocop: disable CodeReuse/ActiveRecord
def execute(current_user = nil) def execute(current_user = nil)
# Do not show contributed projects if the user profile is private. # Do not show contributed projects if the user profile is private.
return Project.none unless can_read_profile?(current_user) return Project.none unless can_read_profile?(current_user)
segments = all_projects(current_user) segments = all_projects(current_user)
find_union(segments, Project).includes(:namespace).order_id_desc find_union(segments, Project).with_namespace.order_id_desc
end end
# rubocop: enable CodeReuse/ActiveRecord
private private
......
...@@ -17,15 +17,13 @@ class PersonalProjectsFinder < UnionFinder ...@@ -17,15 +17,13 @@ class PersonalProjectsFinder < UnionFinder
# min_access_level: integer # min_access_level: integer
# #
# Returns an ActiveRecord::Relation. # Returns an ActiveRecord::Relation.
# rubocop: disable CodeReuse/ActiveRecord
def execute(current_user = nil) def execute(current_user = nil)
return Project.none unless can?(current_user, :read_user_profile, @user) return Project.none unless can?(current_user, :read_user_profile, @user)
segments = all_projects(current_user) segments = all_projects(current_user)
find_union(segments, Project).includes(:namespace).order_updated_desc find_union(segments, Project).with_namespace.order_updated_desc
end end
# rubocop: enable CodeReuse/ActiveRecord
private private
......
...@@ -411,6 +411,8 @@ class Project < ApplicationRecord ...@@ -411,6 +411,8 @@ class Project < ApplicationRecord
scope :with_project_feature, -> { joins('LEFT JOIN project_features ON projects.id = project_features.project_id') } scope :with_project_feature, -> { joins('LEFT JOIN project_features ON projects.id = project_features.project_id') }
scope :inc_routes, -> { includes(:route, namespace: :route) } scope :inc_routes, -> { includes(:route, namespace: :route) }
scope :with_statistics, -> { includes(:statistics) } scope :with_statistics, -> { includes(:statistics) }
scope :with_namespace, -> { includes(:namespace) }
scope :with_import_state, -> { includes(:import_state) }
scope :with_service, ->(service) { joins(service).eager_load(service) } scope :with_service, ->(service) { joins(service).eager_load(service) }
scope :with_shared_runners, -> { where(shared_runners_enabled: true) } scope :with_shared_runners, -> { where(shared_runners_enabled: true) }
scope :with_container_registry, -> { where(container_registry_enabled: true) } scope :with_container_registry, -> { where(container_registry_enabled: true) }
......
...@@ -9,6 +9,7 @@ module ApplicationWorker ...@@ -9,6 +9,7 @@ module ApplicationWorker
include Sidekiq::Worker # rubocop:disable Cop/IncludeSidekiqWorker include Sidekiq::Worker # rubocop:disable Cop/IncludeSidekiqWorker
include WorkerAttributes include WorkerAttributes
include WorkerContext
included do included do
set_queue set_queue
......
...@@ -8,5 +8,6 @@ module CronjobQueue ...@@ -8,5 +8,6 @@ module CronjobQueue
included do included do
queue_namespace :cronjob queue_namespace :cronjob
sidekiq_options retry: false sidekiq_options retry: false
worker_context project: nil, namespace: nil, user: nil
end end
end end
# frozen_string_literal: true
module WorkerContext
extend ActiveSupport::Concern
class_methods do
def worker_context(attributes)
@worker_context = Gitlab::ApplicationContext.new(attributes)
end
def get_worker_context
@worker_context || superclass_context
end
private
def superclass_context
return unless superclass.include?(WorkerContext)
superclass.get_worker_context
end
end
def with_context(context, &block)
Gitlab::ApplicationContext.new(context).use(&block)
end
end
...@@ -9,14 +9,14 @@ class ProjectImportScheduleWorker ...@@ -9,14 +9,14 @@ class ProjectImportScheduleWorker
feature_category :importers feature_category :importers
sidekiq_options retry: false sidekiq_options retry: false
# rubocop: disable CodeReuse/ActiveRecord
def perform(project_id) def perform(project_id)
return if Gitlab::Database.read_only? return if Gitlab::Database.read_only?
import_state = ProjectImportState.find_by(project_id: project_id) project = Project.with_route.with_import_state.with_namespace.find_by_id(project_id)
raise ImportStateNotFound unless import_state raise ImportStateNotFound unless project&.import_state
import_state.schedule with_context(project: project) do
project.import_state.schedule
end
end end
# rubocop: enable CodeReuse/ActiveRecord
end end
...@@ -14,24 +14,22 @@ class UpdateAllMirrorsWorker ...@@ -14,24 +14,22 @@ class UpdateAllMirrorsWorker
def perform def perform
return if Gitlab::Database.read_only? return if Gitlab::Database.read_only?
Gitlab::ApplicationContext.with_context({ user: nil, project: nil, namespace: nil }) do scheduled = 0
scheduled = 0 with_lease do
with_lease do scheduled = schedule_mirrors!
scheduled = schedule_mirrors!
end
# If we didn't get the lease, or no updates were scheduled, exit early
break unless scheduled > 0
# Wait to give some jobs a chance to complete
Kernel.sleep(RESCHEDULE_WAIT)
# If there's capacity left now (some jobs completed),
# reschedule this job to enqueue more work.
#
# This is in addition to the regular (cron-like) scheduling of this job.
UpdateAllMirrorsWorker.perform_async if Gitlab::Mirror.reschedule_immediately?
end end
# If we didn't get the lease, or no updates were scheduled, exit early
return unless scheduled > 0
# Wait to give some jobs a chance to complete
Kernel.sleep(RESCHEDULE_WAIT)
# If there's capacity left now (some jobs completed),
# reschedule this job to enqueue more work.
#
# This is in addition to the regular (cron-like) scheduling of this job.
UpdateAllMirrorsWorker.perform_async if Gitlab::Mirror.reschedule_immediately?
end end
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
...@@ -97,7 +95,7 @@ class UpdateAllMirrorsWorker ...@@ -97,7 +95,7 @@ class UpdateAllMirrorsWorker
.mirrors_to_sync(freeze_at) .mirrors_to_sync(freeze_at)
.reorder('import_state.next_execution_timestamp') .reorder('import_state.next_execution_timestamp')
.limit(batch_size) .limit(batch_size)
.includes(:namespace) # Used by `project.mirror?` .with_namespace # Used by `project.mirror?`
relation = relation.where('import_state.next_execution_timestamp > ?', offset_at) if offset_at relation = relation.where('import_state.next_execution_timestamp > ?', offset_at) if offset_at
......
...@@ -16,7 +16,7 @@ module Gitlab ...@@ -16,7 +16,7 @@ module Gitlab
def self.with_context(args, &block) def self.with_context(args, &block)
application_context = new(**args) application_context = new(**args)
Labkit::Context.with_context(application_context.to_lazy_hash, &block) application_context.use(&block)
end end
def self.push(args) def self.push(args)
...@@ -42,6 +42,10 @@ module Gitlab ...@@ -42,6 +42,10 @@ module Gitlab
end end
end end
def use
Labkit::Context.with_context(to_lazy_hash) { yield }
end
private private
attr_reader :set_values attr_reader :set_values
......
...@@ -18,6 +18,7 @@ module Gitlab ...@@ -18,6 +18,7 @@ module Gitlab
chain.add Labkit::Middleware::Sidekiq::Server chain.add Labkit::Middleware::Sidekiq::Server
chain.add Gitlab::SidekiqMiddleware::InstrumentationLogger chain.add Gitlab::SidekiqMiddleware::InstrumentationLogger
chain.add Gitlab::SidekiqStatus::ServerMiddleware chain.add Gitlab::SidekiqStatus::ServerMiddleware
chain.add Gitlab::SidekiqMiddleware::WorkerContext::Server
end end
end end
......
# frozen_string_literal: true
module Gitlab
module SidekiqMiddleware
module WorkerContext
class Server
def call(worker, job, _queue, &block)
worker_class = worker.class
# This is not a worker we know about, perhaps from a gem
return yield unless worker_class.respond_to?(:get_worker_context)
# Use the context defined on the class level as a base context
wrap_in_optional_context(worker_class.get_worker_context, &block)
end
private
def wrap_in_optional_context(context, &block)
return yield unless context
context.use(&block)
end
end
end
end
end
...@@ -79,4 +79,18 @@ describe Gitlab::ApplicationContext do ...@@ -79,4 +79,18 @@ describe Gitlab::ApplicationContext do
.to include(project: project.full_path, root_namespace: project.full_path_components.first) .to include(project: project.full_path, root_namespace: project.full_path_components.first)
end end
end end
describe '#use' do
let(:context) { described_class.new(user: build(:user)) }
it 'yields control' do
expect { |b| context.use(&b) }.to yield_control
end
it 'passes the expected context on to labkit' do
expect(Labkit::Context).to receive(:with_context).with(a_hash_including(user: duck_type(:call)))
context.use {}
end
end
end end
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::SidekiqMiddleware::WorkerContext::Server do
let(:worker_class) do
Class.new do
def self.name
"TestWorker"
end
# To keep track of the context that was active for certain arguments
cattr_accessor(:contexts) { {} }
include ApplicationWorker
worker_context user: nil
def perform(identifier, *args)
self.class.contexts.merge!(identifier => Labkit::Context.current.to_h)
end
end
end
let(:other_worker) do
Class.new do
def self.name
"OtherWorker"
end
include Sidekiq::Worker
def perform
end
end
end
before do
stub_const("TestWorker", worker_class)
stub_const("OtherWorker", other_worker)
end
around do |example|
Sidekiq::Testing.inline! { example.run }
end
before(:context) do
Sidekiq::Testing.server_middleware do |chain|
chain.add described_class
end
end
after(:context) do
Sidekiq::Testing.server_middleware do |chain|
chain.remove described_class
end
end
describe "#call" do
it 'applies a class context' do
Gitlab::ApplicationContext.with_context(user: build_stubbed(:user)) do
TestWorker.perform_async("identifier", 1)
end
expect(TestWorker.contexts['identifier'].keys).not_to include('meta.user')
end
it "doesn't fail for unknown workers" do
expect { OtherWorker.perform_async }.not_to raise_error
end
end
end
...@@ -44,7 +44,8 @@ describe Gitlab::SidekiqMiddleware do ...@@ -44,7 +44,8 @@ describe Gitlab::SidekiqMiddleware do
Gitlab::SidekiqMiddleware::ServerMetrics, Gitlab::SidekiqMiddleware::ServerMetrics,
Gitlab::SidekiqMiddleware::ArgumentsLogger, Gitlab::SidekiqMiddleware::ArgumentsLogger,
Gitlab::SidekiqMiddleware::MemoryKiller, Gitlab::SidekiqMiddleware::MemoryKiller,
Gitlab::SidekiqMiddleware::RequestStoreMiddleware Gitlab::SidekiqMiddleware::RequestStoreMiddleware,
Gitlab::SidekiqMiddleware::WorkerContext::Server
] ]
end end
let(:enabled_sidekiq_middlewares) { all_sidekiq_middlewares - disabled_sidekiq_middlewares } let(:enabled_sidekiq_middlewares) { all_sidekiq_middlewares - disabled_sidekiq_middlewares }
......
...@@ -21,4 +21,12 @@ describe CronjobQueue do ...@@ -21,4 +21,12 @@ describe CronjobQueue do
it 'disables retrying of failed jobs' do it 'disables retrying of failed jobs' do
expect(worker.sidekiq_options['retry']).to eq(false) expect(worker.sidekiq_options['retry']).to eq(false)
end end
it 'automatically clears project, user and namespace from the context', :aggregate_failues do
worker_context = worker.get_worker_context.to_lazy_hash.transform_values(&:call)
expect(worker_context[:user]).to be_nil
expect(worker_context[:root_namespace]).to be_nil
expect(worker_context[:project]).to be_nil
end
end end
# frozen_string_literal: true
require 'spec_helper'
describe WorkerContext do
let(:worker) do
Class.new do
include WorkerContext
end
end
describe '.worker_context' do
it 'allows modifying the context for the entire worker' do
worker.worker_context(user: build_stubbed(:user))
expect(worker.get_worker_context).to be_a(Gitlab::ApplicationContext)
end
it 'allows fetches the context from a superclass if none was defined' do
worker.worker_context(user: build_stubbed(:user))
subclass = Class.new(worker)
expect(subclass.get_worker_context).to eq(worker.get_worker_context)
end
end
describe '#with_context' do
it 'allows modifying context when the job is running' do
worker.new.with_context(user: build_stubbed(:user, username: 'jane-doe')) do
expect(Labkit::Context.current.to_h).to include('meta.user' => 'jane-doe')
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment