Commit 74729c56 authored by Sean McGivern's avatar Sean McGivern

Merge branch '329357-move-rebalancing-outside-transaction' into 'master'

Rebalance without transaction

See merge request gitlab-org/gitlab!68746
parents a2639ccd 8623992f
......@@ -323,6 +323,13 @@ class Issue < ApplicationRecord
)
end
def self.column_order_id_asc
Gitlab::Pagination::Keyset::ColumnOrderDefinition.new(
attribute_name: 'id',
order_expression: arel_table[:id].asc
)
end
def self.to_branch_name(*args)
branch_name = args.map(&:to_s).each_with_index.map do |arg, i|
arg.parameterize(preserve_case: i == 0).presence
......
......@@ -526,6 +526,7 @@ class Project < ApplicationRecord
scope :sorted_by_stars_desc, -> { reorder(self.arel_table['star_count'].desc) }
scope :sorted_by_stars_asc, -> { reorder(self.arel_table['star_count'].asc) }
# Sometimes queries (e.g. using CTEs) require explicit disambiguation with table name
scope :projects_order_id_asc, -> { reorder(self.arel_table['id'].asc) }
scope :projects_order_id_desc, -> { reorder(self.arel_table['id'].desc) }
scope :sorted_by_similarity_desc, -> (search, include_in_select: false) do
......
# frozen_string_literal: true
class IssueRebalancingService
MAX_ISSUE_COUNT = 10_000
BATCH_SIZE = 100
SMALLEST_BATCH_SIZE = 5
RETRIES_LIMIT = 3
TooManyIssues = Class.new(StandardError)
TIMING_CONFIGURATION = [
[0.1.seconds, 0.05.seconds], # short timings, lock_timeout: 100ms, sleep after LockWaitTimeout: 50ms
[0.5.seconds, 0.05.seconds],
[1.second, 0.5.seconds],
[1.second, 0.5.seconds],
[5.seconds, 1.second]
].freeze
def initialize(projects_collection)
@root_namespace = projects_collection.take.root_namespace # rubocop:disable CodeReuse/ActiveRecord
@base = Issue.in_projects(projects_collection)
end
def execute
return unless Feature.enabled?(:rebalance_issues, root_namespace)
raise TooManyIssues, "#{issue_count} issues" if issue_count > MAX_ISSUE_COUNT
start = RelativePositioning::START_POSITION - (gaps / 2) * gap_size
if Feature.enabled?(:issue_rebalancing_optimization)
Issue.transaction do
assign_positions(start, indexed_ids)
.sort_by(&:first)
.each_slice(BATCH_SIZE) do |pairs_with_position|
if Feature.enabled?(:issue_rebalancing_with_retry)
update_positions_with_retry(pairs_with_position, 'rebalance issue positions in batches ordered by id')
else
update_positions(pairs_with_position, 'rebalance issue positions in batches ordered by id')
end
end
end
else
Issue.transaction do
indexed_ids.each_slice(BATCH_SIZE) do |pairs|
pairs_with_position = assign_positions(start, pairs)
if Feature.enabled?(:issue_rebalancing_with_retry)
update_positions_with_retry(pairs_with_position, 'rebalance issue positions')
else
update_positions(pairs_with_position, 'rebalance issue positions')
end
end
end
end
end
private
attr_reader :root_namespace, :base
# rubocop: disable CodeReuse/ActiveRecord
def indexed_ids
base.reorder(:relative_position, :id).pluck(:id).each_with_index
end
# rubocop: enable CodeReuse/ActiveRecord
def assign_positions(start, pairs)
pairs.map do |id, index|
[id, start + (index * gap_size)]
end
end
def update_positions_with_retry(pairs_with_position, query_name)
retries = 0
batch_size = pairs_with_position.size
until pairs_with_position.empty?
begin
update_positions(pairs_with_position.first(batch_size), query_name)
pairs_with_position = pairs_with_position.drop(batch_size)
retries = 0
rescue ActiveRecord::StatementTimeout, ActiveRecord::QueryCanceled => ex
raise ex if batch_size < SMALLEST_BATCH_SIZE
if (retries += 1) == RETRIES_LIMIT
# shrink the batch size in half when RETRIES limit is reached and update still fails perhaps because batch size is still too big
batch_size = (batch_size / 2).to_i
retries = 0
end
retry
end
end
end
def update_positions(pairs_with_position, query_name)
values = pairs_with_position.map do |id, index|
"(#{id}, #{index})"
end.join(', ')
Gitlab::Database::WithLockRetries.new(timing_configuration: TIMING_CONFIGURATION, klass: self.class).run do
run_update_query(values, query_name)
end
end
def run_update_query(values, query_name)
Issue.connection.exec_query(<<~SQL, query_name)
WITH cte(cte_id, new_pos) AS #{Gitlab::Database::AsWithMaterialized.materialized_if_supported} (
SELECT *
FROM (VALUES #{values}) as t (id, pos)
)
UPDATE #{Issue.table_name}
SET relative_position = cte.new_pos
FROM cte
WHERE cte_id = id
SQL
end
def issue_count
@issue_count ||= base.count
end
def gaps
issue_count - 1
end
def gap_size
# We could try to split the available range over the number of gaps we need,
# but IDEAL_DISTANCE * MAX_ISSUE_COUNT is only 0.1% of the available range,
# so we are guaranteed not to exhaust it by using this static value.
#
# If we raise MAX_ISSUE_COUNT or IDEAL_DISTANCE significantly, this may
# change!
RelativePositioning::IDEAL_DISTANCE
end
end
# frozen_string_literal: true
module Issues
class RelativePositionRebalancingService
UPDATE_BATCH_SIZE = 100
PREFETCH_ISSUES_BATCH_SIZE = 10_000
SMALLEST_BATCH_SIZE = 5
RETRIES_LIMIT = 3
TooManyConcurrentRebalances = Class.new(StandardError)
def initialize(projects)
@projects_collection = (projects.is_a?(Array) ? Project.id_in(projects) : projects).projects_order_id_asc
@root_namespace = @projects_collection.take.root_namespace # rubocop:disable CodeReuse/ActiveRecord
@caching = ::Gitlab::Issues::Rebalancing::State.new(@root_namespace, @projects_collection)
end
def execute
return unless Feature.enabled?(:rebalance_issues, root_namespace)
# Given can_start_rebalance? and track_new_running_rebalance are not atomic
# it can happen that we end up with more than Rebalancing::State::MAX_NUMBER_OF_CONCURRENT_REBALANCES running.
# Considering the number of allowed Rebalancing::State::MAX_NUMBER_OF_CONCURRENT_REBALANCES is small we should be ok,
# but should be something to consider if we'd want to scale this up.
error_message = "#{caching.concurrent_running_rebalances_count} concurrent re-balances currently running"
raise TooManyConcurrentRebalances, error_message unless caching.can_start_rebalance?
block_issue_repositioning! unless root_namespace.issue_repositioning_disabled?
caching.track_new_running_rebalance
index = caching.get_current_index
loop do
issue_ids = get_issue_ids(index, PREFETCH_ISSUES_BATCH_SIZE)
pairs_with_index = assign_indexes(issue_ids, index)
pairs_with_index.each_slice(UPDATE_BATCH_SIZE) do |pairs_batch|
update_positions_with_retry(pairs_batch, 're-balance issue positions in batches ordered by position')
end
index = caching.get_current_index
break if index >= caching.issue_count - 1
end
caching.cleanup_cache
unblock_issue_repositioning!
end
private
attr_reader :root_namespace, :projects_collection, :caching
def block_issue_repositioning!
Feature.enable(:block_issue_repositioning, root_namespace)
end
def unblock_issue_repositioning!
Feature.disable(:block_issue_repositioning, root_namespace)
end
def get_issue_ids(index, limit)
issue_ids = caching.get_cached_issue_ids(index, limit)
# if we have a list of cached issues and no current project id cached,
# then we successfully cached issues for all projects
return issue_ids if issue_ids.any? && caching.get_current_project_id.blank?
# if we got no issue ids at the start of re-balancing then we did not cache any issue ids yet
preload_issue_ids
caching.get_cached_issue_ids(index, limit)
end
# rubocop: disable CodeReuse/ActiveRecord
def preload_issue_ids
index = 0
cached_project_id = caching.get_current_project_id
collection = projects_collection
collection = projects_collection.where(Project.arel_table[:id].gteq(cached_project_id.to_i)) if cached_project_id.present?
collection.each do |project|
caching.cache_current_project_id(project.id)
index += 1
scope = Issue.in_projects(project).reorder(custom_reorder).select(:id, :relative_position)
with_retry(PREFETCH_ISSUES_BATCH_SIZE, 100) do |batch_size|
Gitlab::Pagination::Keyset::Iterator.new(scope: scope).each_batch(of: batch_size) do |batch|
caching.cache_issue_ids(batch)
end
end
end
caching.remove_current_project_id_cache
end
# rubocop: enable CodeReuse/ActiveRecord
def assign_indexes(ids, start_index)
ids.each_with_index.map do |id, idx|
[id, start_index + idx]
end
end
# The method runs in a loop where we try for RETRIES_LIMIT=3 times, to run the update statement on
# a number of records(batch size). Method gets an array of (id, value) pairs as argument that is used
# to build the update query matching by id and updating relative_position = value. If we get a statement
# timeout, we split the batch size in half and try(for 3 times again) to batch update on a smaller number of records.
# On success, because we know the batch size and we always pick from the beginning of the array param,
# we can remove first batch_size number of items from array and continue with the successful batch_size for next batches.
# On failures we continue to split batch size to a SMALLEST_BATCH_SIZE limit, which is now set at 5.
#
# e.g.
# 0. items | previous batch size|new batch size | comment
# 1. 100 | 100 | 100 | 3 failures -> split the batch size in half
# 2. 100 | 100 | 50 | 3 failures -> split the batch size in half again
# 3. 100 | 50 | 25 | 3 succeed -> so we drop 25 items 3 times, 4th fails -> split the batch size in half again
# 5. 25 | 25 | 12 | 3 failures -> split the batch size in half
# 6. 25 | 12 | 6 | 3 failures -> we exit because smallest batch size is 5 and we'll be at 3 if we split again
def update_positions_with_retry(pairs_with_index, query_name)
retry_batch_size = pairs_with_index.size
until pairs_with_index.empty?
with_retry(retry_batch_size, SMALLEST_BATCH_SIZE) do |batch_size|
retry_batch_size = batch_size
update_positions(pairs_with_index.first(batch_size), query_name)
# pairs_with_index[batch_size - 1] - can be nil for last batch
# if last batch is smaller than batch_size, so we just get the last pair.
last_pair_in_batch = pairs_with_index[batch_size - 1] || pairs_with_index.last
caching.cache_current_index(last_pair_in_batch.last + 1)
pairs_with_index = pairs_with_index.drop(batch_size)
end
end
end
def update_positions(pairs_with_position, query_name)
values = pairs_with_position.map do |id, index|
"(#{id}, #{start_position + (index * gap_size)})"
end.join(', ')
run_update_query(values, query_name)
end
def run_update_query(values, query_name)
Issue.connection.exec_query(<<~SQL, query_name)
WITH cte(cte_id, new_pos) AS #{Gitlab::Database::AsWithMaterialized.materialized_if_supported} (
SELECT *
FROM (VALUES #{values}) as t (id, pos)
)
UPDATE #{Issue.table_name}
SET relative_position = cte.new_pos
FROM cte
WHERE cte_id = id
SQL
end
def gaps
caching.issue_count - 1
end
def gap_size
RelativePositioning::MAX_GAP
end
def start_position
@start_position ||= (RelativePositioning::START_POSITION - (gaps / 2) * gap_size).to_i
end
def custom_reorder
::Gitlab::Pagination::Keyset::Order.build([Issue.column_order_relative_position, Issue.column_order_id_asc])
end
def with_retry(initial_batch_size, exit_batch_size)
retries = 0
batch_size = initial_batch_size
begin
yield batch_size
retries = 0
rescue ActiveRecord::StatementTimeout, ActiveRecord::QueryCanceled => ex
raise ex if batch_size < exit_batch_size
if (retries += 1) == RETRIES_LIMIT
# shrink the batch size in half when RETRIES limit is reached and update still fails perhaps because batch size is still too big
batch_size = (batch_size / 2).to_i
retries = 0
end
retry
end
end
end
end
......@@ -32,12 +32,8 @@ class IssueRebalancingWorker
return
end
# Temporary disable rebalancing for performance reasons
# For more information check https://gitlab.com/gitlab-com/gl-infra/production/-/issues/4321
return if projects_to_rebalance.take&.root_namespace&.issue_repositioning_disabled? # rubocop:disable CodeReuse/ActiveRecord
IssueRebalancingService.new(projects_to_rebalance).execute
rescue IssueRebalancingService::TooManyIssues => e
Issues::RelativePositionRebalancingService.new(projects_to_rebalance).execute
rescue Issues::RelativePositionRebalancingService::TooManyConcurrentRebalances => e
Gitlab::ErrorTracking.log_exception(e, root_namespace_id: root_namespace_id, project_id: project_id)
end
......
---
name: issue_rebalancing_optimization
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/53384
rollout_issue_url:
milestone: '13.9'
type: development
group: group::project management
default_enabled: false
---
name: issue_rebalancing_with_retry
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/59744
rollout_issue_url:
milestone: '13.11'
type: development
group: group::project management
default_enabled: false
......@@ -418,7 +418,7 @@ p.create_wiki ### creates the wiki project on the filesystem
```ruby
p = Project.find_by_full_path('PROJECT PATH')
IssueRebalancingService.new(p.issues.take).execute
Issues::RelativePositionRebalancingService.new(p.root_namespace.all_projects).execute
```
## Imports / Exports
......
......@@ -55,7 +55,7 @@ module Gitlab
private
def create_issue
Issues::CreateService.new(
::Issues::CreateService.new(
project: project,
current_user: author,
params: {
......
......@@ -71,7 +71,7 @@ module Gitlab
end
def create_issue!
@issue = Issues::CreateService.new(
@issue = ::Issues::CreateService.new(
project: project,
current_user: User.support_bot,
params: {
......
# frozen_string_literal: true
module Gitlab
module Issues
module Rebalancing
class State
REDIS_EXPIRY_TIME = 10.days
MAX_NUMBER_OF_CONCURRENT_REBALANCES = 5
NAMESPACE = 1
PROJECT = 2
def initialize(root_namespace, projects)
@root_namespace = root_namespace
@projects = projects
@rebalanced_container_type = @root_namespace.is_a?(Group) ? NAMESPACE : PROJECT
@rebalanced_container_id = @rebalanced_container_type == NAMESPACE ? @root_namespace.id : projects.take.id # rubocop:disable CodeReuse/ActiveRecord
end
def track_new_running_rebalance
with_redis do |redis|
redis.multi do |multi|
# we trigger re-balance for namespaces(groups) or specific user project
value = "#{rebalanced_container_type}/#{rebalanced_container_id}"
multi.sadd(concurrent_running_rebalances_key, value)
multi.expire(concurrent_running_rebalances_key, REDIS_EXPIRY_TIME)
end
end
end
def concurrent_running_rebalances_count
with_redis { |redis| redis.scard(concurrent_running_rebalances_key).to_i }
end
def rebalance_in_progress?
all_rebalanced_containers = with_redis { |redis| redis.smembers(concurrent_running_rebalances_key) }
is_running = case rebalanced_container_type
when NAMESPACE
namespace_ids = all_rebalanced_containers.map {|string| string.split("#{NAMESPACE}/").second.to_i }.compact
namespace_ids.include?(root_namespace.id)
when PROJECT
project_ids = all_rebalanced_containers.map {|string| string.split("#{PROJECT}/").second.to_i }.compact
project_ids.include?(projects.take.id) # rubocop:disable CodeReuse/ActiveRecord
else
false
end
refresh_keys_expiration if is_running
is_running
end
def can_start_rebalance?
rebalance_in_progress? || too_many_rebalances_running?
end
def cache_issue_ids(issue_ids)
with_redis do |redis|
values = issue_ids.map { |issue| [issue.relative_position, issue.id] }
redis.multi do |multi|
multi.zadd(issue_ids_key, values) unless values.blank?
multi.expire(issue_ids_key, REDIS_EXPIRY_TIME)
end
end
end
def get_cached_issue_ids(index, limit)
with_redis do |redis|
redis.zrange(issue_ids_key, index, index + limit - 1)
end
end
def cache_current_index(index)
with_redis { |redis| redis.set(current_index_key, index, ex: REDIS_EXPIRY_TIME) }
end
def get_current_index
with_redis { |redis| redis.get(current_index_key).to_i }
end
def cache_current_project_id(project_id)
with_redis { |redis| redis.set(current_project_key, project_id, ex: REDIS_EXPIRY_TIME) }
end
def get_current_project_id
with_redis { |redis| redis.get(current_project_key) }
end
def issue_count
@issue_count ||= with_redis { |redis| redis.zcard(issue_ids_key)}
end
def remove_current_project_id_cache
with_redis { |redis| redis.del(current_project_key)}
end
def refresh_keys_expiration
with_redis do |redis|
redis.multi do |multi|
multi.expire(issue_ids_key, REDIS_EXPIRY_TIME)
multi.expire(current_index_key, REDIS_EXPIRY_TIME)
multi.expire(current_project_key, REDIS_EXPIRY_TIME)
multi.expire(concurrent_running_rebalances_key, REDIS_EXPIRY_TIME)
end
end
end
def cleanup_cache
with_redis do |redis|
redis.multi do |multi|
multi.del(issue_ids_key)
multi.del(current_index_key)
multi.del(current_project_key)
multi.srem(concurrent_running_rebalances_key, "#{rebalanced_container_type}/#{rebalanced_container_id}")
end
end
end
private
attr_accessor :root_namespace, :projects, :rebalanced_container_type, :rebalanced_container_id
def too_many_rebalances_running?
concurrent_running_rebalances_count <= MAX_NUMBER_OF_CONCURRENT_REBALANCES
end
def redis_key_prefix
"gitlab:issues-position-rebalances"
end
def issue_ids_key
"#{redis_key_prefix}:#{root_namespace.id}"
end
def current_index_key
"#{issue_ids_key}:current_index"
end
def current_project_key
"#{issue_ids_key}:current_project_id"
end
def concurrent_running_rebalances_key
"#{redis_key_prefix}:running_rebalances"
end
def with_redis(&blk)
Gitlab::Redis::SharedState.with(&blk) # rubocop: disable CodeReuse/ActiveRecord
end
end
end
end
end
......@@ -219,7 +219,7 @@ module Gitlab
column_definition.column_expression.dup.as(column_definition.attribute_name).to_sql
end
scope = scope.select(*scope.arel.projections, *additional_projections) if additional_projections
scope = scope.reselect(*scope.arel.projections, *additional_projections) unless additional_projections.blank?
scope
end
......
......@@ -267,7 +267,7 @@ module Gitlab
private
def zoom_link_service
Issues::ZoomLinkService.new(project: quick_action_target.project, current_user: current_user, params: { issue: quick_action_target })
::Issues::ZoomLinkService.new(project: quick_action_target.project, current_user: current_user, params: { issue: quick_action_target })
end
end
end
......
......@@ -29,7 +29,7 @@ module Gitlab
private
def close_issue(issue:)
Issues::CloseService.new(project: project, current_user: current_user).execute(issue)
::Issues::CloseService.new(project: project, current_user: current_user).execute(issue)
end
def presenter(issue)
......
......@@ -29,11 +29,11 @@ module Gitlab
return Gitlab::SlashCommands::Presenters::Access.new.not_found
end
new_issue = Issues::MoveService.new(project: project, current_user: current_user)
new_issue = ::Issues::MoveService.new(project: project, current_user: current_user)
.execute(old_issue, target_project)
presenter(new_issue).present(old_issue)
rescue Issues::MoveService::MoveError => e
rescue ::Issues::MoveService::MoveError => e
presenter(old_issue).display_move_error(e.message)
end
......
......@@ -33,7 +33,7 @@ module Gitlab
private
def create_issue(title:, description:)
Issues::CreateService.new(project: project, current_user: current_user, params: { title: title, description: description }, spam_params: nil).execute
::Issues::CreateService.new(project: project, current_user: current_user, params: { title: title, description: description }, spam_params: nil).execute
end
def presenter(issue)
......
......@@ -918,7 +918,7 @@ module Gitlab
jira: count(::JiraImportState.where(time_period)), # rubocop: disable CodeReuse/ActiveRecord
fogbugz: projects_imported_count('fogbugz', time_period),
phabricator: projects_imported_count('phabricator', time_period),
csv: count(Issues::CsvImport.where(time_period)) # rubocop: disable CodeReuse/ActiveRecord
csv: count(::Issues::CsvImport.where(time_period)) # rubocop: disable CodeReuse/ActiveRecord
}
end
......@@ -934,7 +934,7 @@ module Gitlab
project_imports = distinct_count(::Project.where(time_period).where.not(import_type: nil), :creator_id)
bulk_imports = distinct_count(::BulkImport.where(time_period), :user_id)
jira_issue_imports = distinct_count(::JiraImportState.where(time_period), :user_id)
csv_issue_imports = distinct_count(Issues::CsvImport.where(time_period), :user_id)
csv_issue_imports = distinct_count(::Issues::CsvImport.where(time_period), :user_id)
group_imports = distinct_count(::GroupImportState.where(time_period), :user_id)
add(project_imports, bulk_imports, jira_issue_imports, csv_issue_imports, group_imports)
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::Issues::Rebalancing::State, :clean_gitlab_redis_shared_state do
shared_examples 'issues rebalance caching' do
describe '#track_new_running_rebalance' do
it 'caches a project id to track caching in progress' do
expect { rebalance_caching.track_new_running_rebalance }.to change { rebalance_caching.concurrent_running_rebalances_count }.from(0).to(1)
end
end
describe '#set and get current_index' do
it 'returns zero as current index when index not cached' do
expect(rebalance_caching.get_current_index).to eq(0)
end
it 'returns cached current index' do
expect { rebalance_caching.cache_current_index(123) }.to change { rebalance_caching.get_current_index }.from(0).to(123)
end
end
describe '#set and get current_project' do
it 'returns nil if there is no project_id cached' do
expect(rebalance_caching.get_current_project_id).to be_nil
end
it 'returns cached current project_id' do
expect { rebalance_caching.cache_current_project_id(456) }.to change { rebalance_caching.get_current_project_id }.from(nil).to('456')
end
end
describe "#rebalance_in_progress?" do
it 'return zero if no re-balances are running' do
expect(rebalance_caching.concurrent_running_rebalances_count).to eq(0)
end
it 'return false if no re-balances are running' do
expect(rebalance_caching.rebalance_in_progress?).to be false
end
it 'return true a re-balance for given project/namespace is running' do
rebalance_caching.track_new_running_rebalance
expect(rebalance_caching.rebalance_in_progress?).to be true
end
end
context 'caching issue ids' do
context 'with no issue ids cached' do
it 'returns zero when there are no cached issue ids' do
expect(rebalance_caching.issue_count).to eq(0)
end
it 'returns empty array when there are no cached issue ids' do
expect(rebalance_caching.get_cached_issue_ids(0, 100)).to eq([])
end
end
context 'with cached issue ids' do
before do
generate_and_cache_issues_ids(count: 3)
end
it 'returns count of cached issue ids' do
expect(rebalance_caching.issue_count).to eq(3)
end
it 'returns array of issue ids' do
expect(rebalance_caching.get_cached_issue_ids(0, 100)).to eq(%w(1 2 3))
end
it 'limits returned values' do
expect(rebalance_caching.get_cached_issue_ids(0, 2)).to eq(%w(1 2))
end
context 'when caching duplicate issue_ids' do
before do
generate_and_cache_issues_ids(count: 3, position_offset: 3, position_direction: -1)
end
it 'does not cache duplicate issues' do
expect(rebalance_caching.issue_count).to eq(3)
end
it 'returns cached issues with latest scores' do
expect(rebalance_caching.get_cached_issue_ids(0, 100)).to eq(%w(3 2 1))
end
end
end
end
context 'when setting expiration' do
context 'when tracking new rebalance' do
it 'returns as expired for non existent key' do
::Gitlab::Redis::SharedState.with do |redis|
expect(redis.ttl(rebalance_caching.send(:concurrent_running_rebalances_key))).to be < 0
end
end
it 'has expiration set' do
rebalance_caching.track_new_running_rebalance
::Gitlab::Redis::SharedState.with do |redis|
expect(redis.ttl(rebalance_caching.send(:concurrent_running_rebalances_key))).to be_between(0, described_class::REDIS_EXPIRY_TIME.ago.to_i)
end
end
end
context 'when setting current index' do
it 'returns as expiring for non existent key' do
::Gitlab::Redis::SharedState.with do |redis|
expect(redis.ttl(rebalance_caching.send(:current_index_key))).to be < 0
end
end
it 'has expiration set' do
rebalance_caching.cache_current_index(123)
::Gitlab::Redis::SharedState.with do |redis|
expect(redis.ttl(rebalance_caching.send(:current_index_key))).to be_between(0, described_class::REDIS_EXPIRY_TIME.ago.to_i)
end
end
end
context 'when setting current project id' do
it 'returns as expired for non existent key' do
::Gitlab::Redis::SharedState.with do |redis|
expect(redis.ttl(rebalance_caching.send(:current_project_key))).to be < 0
end
end
it 'has expiration set' do
rebalance_caching.cache_current_project_id(456)
::Gitlab::Redis::SharedState.with do |redis|
expect(redis.ttl(rebalance_caching.send(:current_project_key))).to be_between(0, described_class::REDIS_EXPIRY_TIME.ago.to_i)
end
end
end
context 'when setting cached issue ids' do
it 'returns as expired for non existent key' do
::Gitlab::Redis::SharedState.with do |redis|
expect(redis.ttl(rebalance_caching.send(:issue_ids_key))).to be < 0
end
end
it 'has expiration set' do
generate_and_cache_issues_ids(count: 3)
::Gitlab::Redis::SharedState.with do |redis|
expect(redis.ttl(rebalance_caching.send(:issue_ids_key))).to be_between(0, described_class::REDIS_EXPIRY_TIME.ago.to_i)
end
end
end
end
context 'cleanup cache' do
before do
generate_and_cache_issues_ids(count: 3)
rebalance_caching.cache_current_index(123)
rebalance_caching.cache_current_project_id(456)
rebalance_caching.track_new_running_rebalance
end
it 'removes cache keys' do
expect(check_existing_keys).to eq(4)
rebalance_caching.cleanup_cache
expect(check_existing_keys).to eq(0)
end
end
end
context 'rebalancing issues in namespace' do
let_it_be(:group) { create(:group, :private) }
let_it_be(:project) { create(:project, namespace: group) }
subject(:rebalance_caching) { described_class.new(group, group.projects) }
it { expect(rebalance_caching.send(:rebalanced_container_type)).to eq(described_class::NAMESPACE) }
it_behaves_like 'issues rebalance caching'
end
context 'rebalancing issues in a project' do
let_it_be(:project) { create(:project) }
subject(:rebalance_caching) { described_class.new(project.namespace, Project.where(id: project)) }
it { expect(rebalance_caching.send(:rebalanced_container_type)).to eq(described_class::PROJECT) }
it_behaves_like 'issues rebalance caching'
end
# count - how many issue ids to generate, issue ids will start at 1
# position_offset - if you'd want to offset generated relative_position for the issue ids,
# relative_position is generated as = issue id * 10 + position_offset
# position_direction - (1) for positive relative_positions, (-1) for negative relative_positions
def generate_and_cache_issues_ids(count:, position_offset: 0, position_direction: 1)
issues = []
count.times do |idx|
id = idx + 1
issues << double(relative_position: position_direction * (id * 10 + position_offset), id: id)
end
rebalance_caching.cache_issue_ids(issues)
end
def check_existing_keys
index = 0
index += 1 if rebalance_caching.get_current_index > 0
index += 1 if rebalance_caching.get_current_project_id.present?
index += 1 if rebalance_caching.get_cached_issue_ids(0, 100).present?
index += 1 if rebalance_caching.rebalance_in_progress?
index
end
end
......@@ -538,6 +538,47 @@ RSpec.describe Gitlab::Pagination::Keyset::Order do
end
it_behaves_like 'cursor attribute examples'
context 'with projections' do
context 'when additional_projections is empty' do
let(:scope) { Project.select(:id, :namespace_id) }
subject(:sql) { order.apply_cursor_conditions(scope, { id: '100' }).to_sql }
it 'has correct projections' do
is_expected.to include('SELECT "projects"."id", "projects"."namespace_id" FROM "projects"')
end
end
context 'when there are additional_projections' do
let(:order) do
order = Gitlab::Pagination::Keyset::Order.build([
Gitlab::Pagination::Keyset::ColumnOrderDefinition.new(
attribute_name: 'created_at_field',
column_expression: Project.arel_table[:created_at],
order_expression: Project.arel_table[:created_at].desc,
order_direction: :desc,
distinct: false,
add_to_projections: true
),
Gitlab::Pagination::Keyset::ColumnOrderDefinition.new(
attribute_name: 'id',
order_expression: Project.arel_table[:id].desc
)
])
order
end
let(:scope) { Project.select(:id, :namespace_id).reorder(order) }
subject(:sql) { order.apply_cursor_conditions(scope).to_sql }
it 'has correct projections' do
is_expected.to include('SELECT "projects"."id", "projects"."namespace_id", "projects"."created_at" AS created_at_field FROM "projects"')
end
end
end
end
end
end
......
......@@ -2,7 +2,7 @@
require 'spec_helper'
RSpec.describe IssueRebalancingService do
RSpec.describe Issues::RelativePositionRebalancingService, :clean_gitlab_redis_shared_state do
let_it_be(:project, reload: true) { create(:project) }
let_it_be(:user) { project.creator }
let_it_be(:start) { RelativePositioning::START_POSITION }
......@@ -36,10 +36,11 @@ RSpec.describe IssueRebalancingService do
project.reload.issues.reorder(relative_position: :asc).to_a
end
shared_examples 'IssueRebalancingService shared examples' do
it 'rebalances a set of issues with clumps at the end and start' do
subject(:service) { described_class.new(Project.id_in(project)) }
context 'execute' do
it 're-balances a set of issues with clumps at the end and start' do
all_issues = start_clump + unclumped + end_clump.reverse
service = described_class.new(Project.id_in([project.id]))
expect { service.execute }.not_to change { issues_in_position_order.map(&:id) }
......@@ -52,11 +53,10 @@ RSpec.describe IssueRebalancingService do
expect(gaps).to all(be > RelativePositioning::MIN_GAP)
expect(all_issues.first.relative_position).to be > (RelativePositioning::MIN_POSITION * 0.9999)
expect(all_issues.last.relative_position).to be < (RelativePositioning::MAX_POSITION * 0.9999)
expect(project.root_namespace.issue_repositioning_disabled?).to be false
end
it 'is idempotent' do
service = described_class.new(Project.id_in(project))
expect do
service.execute
service.execute
......@@ -70,9 +70,9 @@ RSpec.describe IssueRebalancingService do
issue.project.group
old_pos = issue.relative_position
service = described_class.new(Project.id_in(project))
expect { service.execute }.not_to exceed_query_limit(0)
# fetching root namespace in the initializer triggers 2 queries:
# for fetching a random project from collection and fetching the root namespace.
expect { service.execute }.not_to exceed_query_limit(2)
expect(old_pos).to eq(issue.reload.relative_position)
end
......@@ -80,8 +80,6 @@ RSpec.describe IssueRebalancingService do
issue = create(:issue, project: project, author: user, relative_position: max_pos)
stub_feature_flags(rebalance_issues: project.root_namespace)
service = described_class.new(Project.id_in(project))
expect { service.execute }.to change { issue.reload.relative_position }
end
......@@ -90,84 +88,79 @@ RSpec.describe IssueRebalancingService do
project.update!(group: create(:group))
stub_feature_flags(rebalance_issues: issue.project.group)
service = described_class.new(Project.id_in(project))
expect { service.execute }.to change { issue.reload.relative_position }
end
it 'aborts if there are too many issues' do
base = double(count: 10_001)
it 'aborts if there are too many rebalances running' do
caching = service.send(:caching)
allow(caching).to receive(:rebalance_in_progress?).and_return(false)
allow(caching).to receive(:concurrent_running_rebalances_count).and_return(10)
allow(service).to receive(:caching).and_return(caching)
allow(Issue).to receive(:in_projects).and_return(base)
expect { described_class.new(Project.id_in(project)).execute }.to raise_error(described_class::TooManyIssues)
expect { service.execute }.to raise_error(Issues::RelativePositionRebalancingService::TooManyConcurrentRebalances)
expect(project.root_namespace.issue_repositioning_disabled?).to be false
end
end
shared_examples 'rebalancing is retried on statement timeout exceptions' do
subject { described_class.new(Project.id_in(project)) }
it 'retries update statement' do
call_count = 0
allow(subject).to receive(:run_update_query) do
call_count += 1
if call_count < 13
raise(ActiveRecord::QueryCanceled)
else
call_count = 0 if call_count == 13 + 16 # 16 = 17 sub-batches - 1 call that succeeded as part of 5th batch
true
end
it 'resumes a started rebalance even if there are already too many rebalances running' do
Gitlab::Redis::SharedState.with do |redis|
redis.sadd("gitlab:issues-position-rebalances:running_rebalances", "#{::Gitlab::Issues::Rebalancing::State::PROJECT}/#{project.id}")
redis.sadd("gitlab:issues-position-rebalances:running_rebalances", "1/100")
end
# call math:
# batches start at 100 and are split in half after every 3 retries if ActiveRecord::StatementTimeout exception is raised.
# We raise ActiveRecord::StatementTimeout exception for 13 calls:
# 1. 100 => 3 calls
# 2. 100/2=50 => 3 calls + 3 above = 6 calls, raise ActiveRecord::StatementTimeout
# 3. 50/2=25 => 3 calls + 6 above = 9 calls, raise ActiveRecord::StatementTimeout
# 4. 25/2=12 => 3 calls + 9 above = 12 calls, raise ActiveRecord::StatementTimeout
# 5. 12/2=6 => 1 call + 12 above = 13 calls, run successfully
#
# so out of 100 elements we created batches of 6 items => 100/6 = 17 sub-batches of 6 or less elements
#
# project.issues.count: 900 issues, so 9 batches of 100 => 9 * (13+16) = 261
expect(subject).to receive(:update_positions).exactly(261).times.and_call_original
subject.execute
end
end
caching = service.send(:caching)
allow(caching).to receive(:concurrent_running_rebalances_count).and_return(10)
allow(service).to receive(:caching).and_return(caching)
context 'when issue_rebalancing_optimization feature flag is on' do
before do
stub_feature_flags(issue_rebalancing_optimization: true)
expect { service.execute }.not_to raise_error(Issues::RelativePositionRebalancingService::TooManyConcurrentRebalances)
end
it_behaves_like 'IssueRebalancingService shared examples'
context 're-balancing is retried on statement timeout exceptions' do
subject { service }
it 'retries update statement' do
call_count = 0
allow(subject).to receive(:run_update_query) do
call_count += 1
if call_count < 13
raise(ActiveRecord::QueryCanceled)
else
call_count = 0 if call_count == 13 + 16 # 16 = 17 sub-batches - 1 call that succeeded as part of 5th batch
true
end
end
context 'when issue_rebalancing_with_retry feature flag is on' do
before do
stub_feature_flags(issue_rebalancing_with_retry: true)
# call math:
# batches start at 100 and are split in half after every 3 retries if ActiveRecord::StatementTimeout exception is raised.
# We raise ActiveRecord::StatementTimeout exception for 13 calls:
# 1. 100 => 3 calls
# 2. 100/2=50 => 3 calls + 3 above = 6 calls, raise ActiveRecord::StatementTimeout
# 3. 50/2=25 => 3 calls + 6 above = 9 calls, raise ActiveRecord::StatementTimeout
# 4. 25/2=12 => 3 calls + 9 above = 12 calls, raise ActiveRecord::StatementTimeout
# 5. 12/2=6 => 1 call + 12 above = 13 calls, run successfully
#
# so out of 100 elements we created batches of 6 items => 100/6 = 17 sub-batches of 6 or less elements
#
# project.issues.count: 900 issues, so 9 batches of 100 => 9 * (13+16) = 261
expect(subject).to receive(:update_positions).exactly(261).times.and_call_original
subject.execute
end
it_behaves_like 'IssueRebalancingService shared examples'
it_behaves_like 'rebalancing is retried on statement timeout exceptions'
end
end
context 'when issue_rebalancing_optimization feature flag is off' do
before do
stub_feature_flags(issue_rebalancing_optimization: false)
end
it_behaves_like 'IssueRebalancingService shared examples'
context 'when issue_rebalancing_with_retry feature flag is on' do
context 'when resuming a stopped rebalance' do
before do
stub_feature_flags(issue_rebalancing_with_retry: true)
service.send(:preload_issue_ids)
expect(service.send(:caching).get_cached_issue_ids(0, 300)).not_to be_empty
# simulate we already rebalanced half the issues
index = clump_size * 3 / 2 + 1
service.send(:caching).cache_current_index(index)
end
it_behaves_like 'IssueRebalancingService shared examples'
it_behaves_like 'rebalancing is retried on statement timeout exceptions'
it 'rebalances the other half of issues' do
expect(subject).to receive(:update_positions_with_retry).exactly(5).and_call_original
subject.execute
end
end
end
end
......@@ -8,41 +8,29 @@ RSpec.describe IssueRebalancingWorker do
let_it_be(:project) { create(:project, group: group) }
let_it_be(:issue) { create(:issue, project: project) }
context 'when block_issue_repositioning is enabled' do
before do
stub_feature_flags(block_issue_repositioning: group)
end
it 'does not run an instance of IssueRebalancingService' do
expect(IssueRebalancingService).not_to receive(:new)
described_class.new.perform(nil, issue.project_id)
end
end
shared_examples 'running the worker' do
it 'runs an instance of IssueRebalancingService' do
it 'runs an instance of Issues::RelativePositionRebalancingService' do
service = double(execute: nil)
service_param = arguments.second.present? ? kind_of(Project.id_in([project]).class) : kind_of(group&.all_projects.class)
expect(IssueRebalancingService).to receive(:new).with(service_param).and_return(service)
expect(Issues::RelativePositionRebalancingService).to receive(:new).with(service_param).and_return(service)
described_class.new.perform(*arguments)
end
it 'anticipates there being too many issues' do
it 'anticipates there being too many concurent rebalances' do
service = double
service_param = arguments.second.present? ? kind_of(Project.id_in([project]).class) : kind_of(group&.all_projects.class)
allow(service).to receive(:execute).and_raise(IssueRebalancingService::TooManyIssues)
expect(IssueRebalancingService).to receive(:new).with(service_param).and_return(service)
expect(Gitlab::ErrorTracking).to receive(:log_exception).with(IssueRebalancingService::TooManyIssues, include(project_id: arguments.second, root_namespace_id: arguments.third))
allow(service).to receive(:execute).and_raise(Issues::RelativePositionRebalancingService::TooManyConcurrentRebalances)
expect(Issues::RelativePositionRebalancingService).to receive(:new).with(service_param).and_return(service)
expect(Gitlab::ErrorTracking).to receive(:log_exception).with(Issues::RelativePositionRebalancingService::TooManyConcurrentRebalances, include(project_id: arguments.second, root_namespace_id: arguments.third))
described_class.new.perform(*arguments)
end
it 'takes no action if the value is nil' do
expect(IssueRebalancingService).not_to receive(:new)
expect(Issues::RelativePositionRebalancingService).not_to receive(:new)
expect(Gitlab::ErrorTracking).not_to receive(:log_exception)
described_class.new.perform # all arguments are nil
......@@ -52,7 +40,7 @@ RSpec.describe IssueRebalancingWorker do
shared_examples 'safely handles non-existent ids' do
it 'anticipates the inability to find the issue' do
expect(Gitlab::ErrorTracking).to receive(:log_exception).with(ArgumentError, include(project_id: arguments.second, root_namespace_id: arguments.third))
expect(IssueRebalancingService).not_to receive(:new)
expect(Issues::RelativePositionRebalancingService).not_to receive(:new)
described_class.new.perform(*arguments)
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment