Commit eb6eefd6 authored by Manoj M J's avatar Manoj M J Committed by Bob Van Landuyt

Resolve "Allow periodic project authorization refresh jobs to use database...

Resolve "Allow periodic project authorization refresh jobs to use database replica for reads" [RUN ALL RSPEC] [RUN AS-IF-FOSS]
parent c85a1468
...@@ -54,6 +54,12 @@ module AuthorizedProjectUpdate ...@@ -54,6 +54,12 @@ module AuthorizedProjectUpdate
[remove, add] [remove, add]
end end
def needs_refresh?
remove, add = execute
remove.present? || add.present?
end
def fresh_access_levels_per_project def fresh_access_levels_per_project
fresh_authorizations.each_with_object({}) do |row, hash| fresh_authorizations.each_with_object({}) do |row, hash|
hash[row.project_id] = row.access_level hash[row.project_id] = row.access_level
......
...@@ -25,7 +25,7 @@ ...@@ -25,7 +25,7 @@
:urgency: :low :urgency: :low
:resource_boundary: :unknown :resource_boundary: :unknown
:weight: 1 :weight: 1
:idempotent: true :idempotent:
:tags: [] :tags: []
- :name: authorized_project_update:authorized_project_update_user_refresh_with_low_urgency - :name: authorized_project_update:authorized_project_update_user_refresh_with_low_urgency
:feature_category: :authentication_and_authorization :feature_category: :authentication_and_authorization
......
# frozen_string_literal: true # frozen_string_literal: true
module AuthorizedProjectUpdate module AuthorizedProjectUpdate
class UserRefreshOverUserRangeWorker class UserRefreshOverUserRangeWorker # rubocop:disable Scalability/IdempotentWorker
# When the feature flag named `periodic_project_authorization_update_via_replica` is enabled,
# this worker checks if a specific user requires an update to their project_authorizations records.
# This check is done via the data read from the database replica (and not from the primary).
# If this check returns true, a completely new Sidekiq job is enqueued for this specific user
# so as to update its project_authorizations records.
# There is a possibility that the data in the replica is lagging behind the primary
# and hence it becomes very important that we check if an update is indeed required for this user
# once again via the primary database, which is the reason why we enqueue a completely new Sidekiq job
# via `UserRefreshWithLowUrgencyWorker` for this user.
include ApplicationWorker include ApplicationWorker
feature_category :authentication_and_authorization feature_category :authentication_and_authorization
urgency :low urgency :low
queue_namespace :authorized_project_update queue_namespace :authorized_project_update
# This job will not be deduplicated since it is marked with
# `data_consistency :delayed` and not `idempotent!`
# See https://gitlab.com/gitlab-org/gitlab/-/issues/325291
deduplicate :until_executing, including_scheduled: true deduplicate :until_executing, including_scheduled: true
data_consistency :delayed, feature_flag: :periodic_project_authorization_update_via_replica
idempotent!
def perform(start_user_id, end_user_id) def perform(start_user_id, end_user_id)
AuthorizedProjectUpdate::RecalculateForUserRangeService.new(start_user_id, end_user_id).execute if Feature.enabled?(:periodic_project_authorization_update_via_replica)
User.where(id: start_user_id..end_user_id).find_each do |user| # rubocop: disable CodeReuse/ActiveRecord
enqueue_project_authorizations_refresh(user) if project_authorizations_needs_refresh?(user)
end
else
AuthorizedProjectUpdate::RecalculateForUserRangeService.new(start_user_id, end_user_id).execute
end
end
private
def project_authorizations_needs_refresh?(user)
AuthorizedProjectUpdate::FindRecordsDueForRefreshService.new(user).needs_refresh?
end
def enqueue_project_authorizations_refresh(user)
with_context(user: user) do
AuthorizedProjectUpdate::UserRefreshWithLowUrgencyWorker.perform_async(user.id)
end
end end
end end
end end
---
name: periodic_project_authorization_update_via_replica
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/58752
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/327092
milestone: '13.11'
type: development
group: group::access
default_enabled: false
...@@ -94,6 +94,41 @@ RSpec.describe AuthorizedProjectUpdate::FindRecordsDueForRefreshService do ...@@ -94,6 +94,41 @@ RSpec.describe AuthorizedProjectUpdate::FindRecordsDueForRefreshService do
end end
end end
describe '#needs_refresh?' do
subject { service.needs_refresh? }
context 'when there are records due for either removal or addition' do
context 'when there are both removals and additions to be made' do
before do
user.project_authorizations.delete_all
create(:project_authorization, user: user)
end
it { is_expected.to eq(true) }
end
context 'when there are no removals, but there are additions to be made' do
before do
user.project_authorizations.delete_all
end
it { is_expected.to eq(true) }
end
context 'when there are no additions, but there are removals to be made' do
before do
create(:project_authorization, user: user)
end
it { is_expected.to eq(true) }
end
end
context 'when there are no additions or removals to be made' do
it { is_expected.to eq(false) }
end
end
describe '#fresh_access_levels_per_project' do describe '#fresh_access_levels_per_project' do
let(:hash) { service.fresh_access_levels_per_project } let(:hash) { service.fresh_access_levels_per_project }
......
...@@ -3,16 +3,67 @@ ...@@ -3,16 +3,67 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe AuthorizedProjectUpdate::UserRefreshOverUserRangeWorker do RSpec.describe AuthorizedProjectUpdate::UserRefreshOverUserRangeWorker do
let(:start_user_id) { 42 } let(:project) { create(:project) }
let(:end_user_id) { 4242 } let(:user) { project.namespace.owner }
let(:start_user_id) { user.id }
let(:end_user_id) { start_user_id }
let(:execute_worker) { subject.perform(start_user_id, end_user_id) }
it_behaves_like 'worker with data consistency',
described_class,
feature_flag: :periodic_project_authorization_update_via_replica,
data_consistency: :delayed
describe '#perform' do describe '#perform' do
it 'calls AuthorizedProjectUpdate::RecalculateForUserRangeService' do context 'when the feature flag `periodic_project_authorization_update_via_replica` is enabled' do
expect_next_instance_of(AuthorizedProjectUpdate::RecalculateForUserRangeService) do |service| before do
expect(service).to receive(:execute) stub_feature_flags(periodic_project_authorization_update_via_replica: true)
end
context 'checks if project authorization update is required' do
it 'checks if a project_authorization refresh is needed for each of the users' do
User.where(id: start_user_id..end_user_id).each do |user|
expect(AuthorizedProjectUpdate::FindRecordsDueForRefreshService).to(
receive(:new).with(user).and_call_original)
end
execute_worker
end
end
context 'when there are project authorization records due for either removal or addition for a specific user' do
before do
user.project_authorizations.delete_all
end
it 'enqueues a new project authorization update job for the user' do
expect(AuthorizedProjectUpdate::UserRefreshWithLowUrgencyWorker).to receive(:perform_async).with(user.id)
execute_worker
end
end end
subject.perform(start_user_id, end_user_id) context 'when there are no additions or removals to be made to project authorizations for a specific user' do
it 'does not enqueue a new project authorization update job for the user' do
expect(AuthorizedProjectUpdate::UserRefreshWithLowUrgencyWorker).not_to receive(:perform_async)
execute_worker
end
end
end
context 'when the feature flag `periodic_project_authorization_update_via_replica` is disabled' do
before do
stub_feature_flags(periodic_project_authorization_update_via_replica: false)
end
it 'calls AuthorizedProjectUpdate::RecalculateForUserRangeService' do
expect_next_instance_of(AuthorizedProjectUpdate::RecalculateForUserRangeService, start_user_id, end_user_id) do |service|
expect(service).to receive(:execute)
end
execute_worker
end
end end
end end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment