Commit 56134706 authored by pbair's avatar pbair

Use direct SQL to generate internal ids

To reduce the overhead of generating internal ids, move away from the
previous model of locking then updating the record, and instead favor
doing a direct update of the record, returning the updated last_value.

This change is behind a feature flag so we can better measure the
performance impact of the new approach.
parent 80bea93f
......@@ -159,9 +159,8 @@ module AtomicInternalId
# Defines class methods:
#
# - with_{scope}_{column}_supply
# This method can be used to allocate a block of IID values during
# bulk operations (importing/copying, etc). This can be more efficient
# than creating instances one-by-one.
# This method can be used to allocate a stream of IID values during
# bulk operations (importing/copying, etc).
#
# Pass in a block that receives a `Supply` instance. To allocate a new
# IID value, call `Supply#next_value`.
......@@ -181,14 +180,8 @@ module AtomicInternalId
scope_attrs = ::AtomicInternalId.scope_attrs(scope_value)
usage = ::AtomicInternalId.scope_usage(self)
generator = InternalId::InternalIdGenerator.new(subject, scope_attrs, usage, init)
generator.with_lock do
supply = Supply.new(generator.record.last_value)
supply = Supply.new(-> { InternalId.generate_next(subject, scope_attrs, usage, init) })
block.call(supply)
ensure
generator.track_greatest(supply.current_value) if supply
end
end
end
end
......@@ -236,14 +229,14 @@ module AtomicInternalId
end
class Supply
attr_reader :current_value
attr_reader :generator
def initialize(start_value)
@current_value = start_value
def initialize(generator)
@generator = generator
end
def next_value
@current_value += 1
@generator.call
end
end
end
......@@ -16,7 +16,7 @@
# * Add `usage` value to enum
# * (Optionally) add columns to `internal_ids` if needed for scope.
class InternalId < ApplicationRecord
include Gitlab::Utils::StrongMemoize
extend Gitlab::Utils::StrongMemoize
belongs_to :project
belongs_to :namespace
......@@ -25,6 +25,10 @@ class InternalId < ApplicationRecord
validates :usage, presence: true
scope :filter_by, -> (scope, usage) do
where(**scope, usage: usage)
end
# Increments #last_value and saves the record
#
# The operation locks the record and gathers a `ROW SHARE` lock (in PostgreSQL).
......@@ -53,18 +57,15 @@ class InternalId < ApplicationRecord
class << self
def track_greatest(subject, scope, usage, new_value, init)
InternalIdGenerator.new(subject, scope, usage, init)
.track_greatest(new_value)
build_generator(subject, scope, usage, init).track_greatest(new_value)
end
def generate_next(subject, scope, usage, init)
InternalIdGenerator.new(subject, scope, usage, init)
.generate
build_generator(subject, scope, usage, init).generate
end
def reset(subject, scope, usage, value)
InternalIdGenerator.new(subject, scope, usage)
.reset(value)
build_generator(subject, scope, usage).reset(value)
end
# Flushing records is generally safe in a sense that those
......@@ -77,11 +78,36 @@ class InternalId < ApplicationRecord
where(filter).delete_all
end
def internal_id_transactions_increment(operation:, usage:)
self.internal_id_transactions_total.increment(
operation: operation,
usage: usage.to_s,
in_transaction: ActiveRecord::Base.connection.transaction_open?.to_s
)
end
class InternalIdGenerator
extend Gitlab::Utils::StrongMemoize
def internal_id_transactions_total
strong_memoize(:internal_id_transactions_total) do
name = :gitlab_internal_id_transactions_total
comment = 'Counts all the internal ids happening within transaction'
Gitlab::Metrics.counter(name, comment)
end
end
private
def build_generator(subject, scope, usage, init = nil)
if Feature.enabled?(:generate_iids_without_explicit_locking)
ImplicitlyLockingInternalIdGenerator.new(subject, scope, usage, init)
else
InternalIdGenerator.new(subject, scope, usage, init)
end
end
end
class InternalIdGenerator
# Generate next internal id for a given scope and usage.
#
# For currently supported usages, see #usage enum.
......@@ -117,7 +143,7 @@ class InternalId < ApplicationRecord
# init: Block that gets called to initialize InternalId record if not present
# Make sure to not throw exceptions in the absence of records (if this is expected).
def generate
self.class.internal_id_transactions_increment(operation: :generate, usage: usage)
InternalId.internal_id_transactions_increment(operation: :generate, usage: usage)
subject.transaction do
# Create a record in internal_ids if one does not yet exist
......@@ -134,7 +160,7 @@ class InternalId < ApplicationRecord
def reset(value)
return false unless value
self.class.internal_id_transactions_increment(operation: :reset, usage: usage)
InternalId.internal_id_transactions_increment(operation: :reset, usage: usage)
updated =
InternalId
......@@ -149,8 +175,9 @@ class InternalId < ApplicationRecord
# and set its new_value if it is higher than the current last_value
#
# Note this will acquire a ROW SHARE lock on the InternalId record
def track_greatest(new_value)
self.class.internal_id_transactions_increment(operation: :track_greatest, usage: usage)
InternalId.internal_id_transactions_increment(operation: :track_greatest, usage: usage)
subject.transaction do
record.track_greatest_and_save!(new_value)
......@@ -162,7 +189,7 @@ class InternalId < ApplicationRecord
end
def with_lock(&block)
self.class.internal_id_transactions_increment(operation: :with_lock, usage: usage)
InternalId.internal_id_transactions_increment(operation: :with_lock, usage: usage)
record.with_lock(&block)
end
......@@ -199,22 +226,118 @@ class InternalId < ApplicationRecord
rescue ActiveRecord::RecordNotUnique
lookup
end
end
def self.internal_id_transactions_increment(operation:, usage:)
self.internal_id_transactions_total.increment(
operation: operation,
usage: usage.to_s,
in_transaction: ActiveRecord::Base.connection.transaction_open?.to_s
)
class ImplicitlyLockingInternalIdGenerator
# Generate next internal id for a given scope and usage.
#
# For currently supported usages, see #usage enum.
#
# The method implements a locking scheme that has the following properties:
# 1) Generated sequence of internal ids is unique per (scope and usage)
# 2) The method is thread-safe and may be used in concurrent threads/processes.
# 3) The generated sequence is gapless.
# 4) In the absence of a record in the internal_ids table, one will be created
# and last_value will be calculated on the fly.
#
# subject: The instance or class we're generating an internal id for.
# scope: Attributes that define the scope for id generation.
# Valid keys are `project/project_id` and `namespace/namespace_id`.
# usage: Symbol to define the usage of the internal id, see InternalId.usages
# init: Proc that accepts the subject and the scope and returns Integer|NilClass
attr_reader :subject, :scope, :scope_attrs, :usage, :init
def initialize(subject, scope, usage, init = nil)
@subject = subject
@scope = scope
@usage = usage
@init = init
raise ArgumentError, 'Scope is not well-defined, need at least one column for scope (given: 0)' if scope.empty?
unless InternalId.usages.has_key?(usage.to_s)
raise ArgumentError, "Usage '#{usage}' is unknown. Supported values are #{InternalId.usages.keys} from InternalId.usages"
end
end
def self.internal_id_transactions_total
strong_memoize(:internal_id_transactions_total) do
name = :gitlab_internal_id_transactions_total
comment = 'Counts all the internal ids happening within transaction'
# Generates next internal id and returns it
# init: Block that gets called to initialize InternalId record if not present
# Make sure to not throw exceptions in the absence of records (if this is expected).
def generate
InternalId.internal_id_transactions_increment(operation: :generate, usage: usage)
Gitlab::Metrics.counter(name, comment)
next_iid = update_record!(subject, scope, usage, arel_table[:last_value] + 1)
return next_iid if next_iid
create_record!(subject, scope, usage, init) do |iid|
iid.last_value += 1
end
rescue ActiveRecord::RecordNotUnique
retry
end
# Reset tries to rewind to `value-1`. This will only succeed,
# if `value` stored in database is equal to `last_value`.
# value: The expected last_value to decrement
def reset(value)
return false unless value
InternalId.internal_id_transactions_increment(operation: :reset, usage: usage)
iid = update_record!(subject, scope.merge(last_value: value), usage, arel_table[:last_value] - 1)
iid == value - 1
end
# Create a record in internal_ids if one does not yet exist
# and set its new_value if it is higher than the current last_value
def track_greatest(new_value)
InternalId.internal_id_transactions_increment(operation: :track_greatest, usage: usage)
function = Arel::Nodes::NamedFunction.new('GREATEST', [
arel_table[:last_value],
new_value.to_i
])
next_iid = update_record!(subject, scope, usage, function)
return next_iid if next_iid
create_record!(subject, scope, usage, init) do |object|
object.last_value = [object.last_value, new_value].max
end
rescue ActiveRecord::RecordNotUnique
retry
end
private
def update_record!(subject, scope, usage, new_value)
stmt = Arel::UpdateManager.new
stmt.table(arel_table)
stmt.set(arel_table[:last_value] => new_value)
stmt.wheres = InternalId.filter_by(scope, usage).arel.constraints
ActiveRecord::Base.connection.insert(stmt, 'Update InternalId', 'last_value')
end
def create_record!(subject, scope, usage, init)
raise ArgumentError, 'Cannot initialize without init!' unless init
instance = subject.is_a?(::Class) ? nil : subject
subject.transaction(requires_new: true) do
last_value = init.call(instance, scope) || 0
internal_id = InternalId.create!(**scope, usage: usage, last_value: last_value) do |subject|
yield subject if block_given?
end
internal_id.last_value
end
end
def arel_table
InternalId.arel_table
end
end
end
---
name: generate_iids_without_explicit_locking
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/65590
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/335431
milestone: '14.2'
type: development
group: group::database
default_enabled: false
......@@ -240,18 +240,12 @@ RSpec.describe AtomicInternalId do
end
describe '.with_project_iid_supply' do
let(:iid) { 100 }
it 'wraps generate and track_greatest in a concurrency-safe lock' do
expect_next_instance_of(InternalId::InternalIdGenerator) do |g|
expect(g).to receive(:with_lock).and_call_original
expect(g.record).to receive(:last_value).and_return(iid)
expect(g).to receive(:track_greatest).with(iid + 4)
end
it 'supplies a stream of iid values' do
expect do
::Milestone.with_project_iid_supply(milestone.project) do |supply|
4.times { supply.next_value }
end
end.to change { InternalId.find_by(project: milestone.project, usage: :milestones)&.last_value.to_i }.by(4)
end
end
end
......@@ -39,6 +39,7 @@ RSpec.describe InternalId do
end
end
shared_examples_for 'a monotonically increasing id generator' do
describe '.generate_next' do
subject { described_class.generate_next(id_subject, scope, usage, init) }
......@@ -66,19 +67,6 @@ RSpec.describe InternalId do
expect(subject).to eq(project.issues.size + 1)
end
end
context 'with concurrent inserts on table' do
it 'looks up the record if it was created concurrently' do
args = { **scope, usage: described_class.usages[usage.to_s] }
record = double
expect(described_class).to receive(:find_by).with(args).and_return(nil) # first call, record not present
expect(described_class).to receive(:find_by).with(args).and_return(record) # second call, record was created by another process
expect(described_class).to receive(:create!).and_raise(ActiveRecord::RecordNotUnique, 'record not unique')
expect(record).to receive(:increment_and_save!)
subject
end
end
end
it 'generates a strictly monotone, gapless sequence' do
......@@ -102,7 +90,7 @@ RSpec.describe InternalId do
it 'increments counter with in_transaction: "false"' do
allow(ActiveRecord::Base.connection).to receive(:transaction_open?) { false }
expect(InternalId::InternalIdGenerator.internal_id_transactions_total).to receive(:increment)
expect(InternalId.internal_id_transactions_total).to receive(:increment)
.with(operation: :generate, usage: 'issues', in_transaction: 'false').and_call_original
subject
......@@ -111,7 +99,7 @@ RSpec.describe InternalId do
context 'when executed within transaction' do
it 'increments counter with in_transaction: "true"' do
expect(InternalId::InternalIdGenerator.internal_id_transactions_total).to receive(:increment)
expect(InternalId.internal_id_transactions_total).to receive(:increment)
.with(operation: :generate, usage: 'issues', in_transaction: 'true').and_call_original
InternalId.transaction { subject }
......@@ -161,7 +149,7 @@ RSpec.describe InternalId do
it 'increments counter with in_transaction: "false"' do
allow(ActiveRecord::Base.connection).to receive(:transaction_open?) { false }
expect(InternalId::InternalIdGenerator.internal_id_transactions_total).to receive(:increment)
expect(InternalId.internal_id_transactions_total).to receive(:increment)
.with(operation: :reset, usage: 'issues', in_transaction: 'false').and_call_original
subject
......@@ -172,7 +160,7 @@ RSpec.describe InternalId do
let(:value) { 2 }
it 'increments counter with in_transaction: "true"' do
expect(InternalId::InternalIdGenerator.internal_id_transactions_total).to receive(:increment)
expect(InternalId.internal_id_transactions_total).to receive(:increment)
.with(operation: :reset, usage: 'issues', in_transaction: 'true').and_call_original
InternalId.transaction { subject }
......@@ -232,7 +220,7 @@ RSpec.describe InternalId do
it 'increments counter with in_transaction: "false"' do
allow(ActiveRecord::Base.connection).to receive(:transaction_open?) { false }
expect(InternalId::InternalIdGenerator.internal_id_transactions_total).to receive(:increment)
expect(InternalId.internal_id_transactions_total).to receive(:increment)
.with(operation: :track_greatest, usage: 'issues', in_transaction: 'false').and_call_original
subject
......@@ -241,13 +229,26 @@ RSpec.describe InternalId do
context 'when executed within transaction' do
it 'increments counter with in_transaction: "true"' do
expect(InternalId::InternalIdGenerator.internal_id_transactions_total).to receive(:increment)
expect(InternalId.internal_id_transactions_total).to receive(:increment)
.with(operation: :track_greatest, usage: 'issues', in_transaction: 'true').and_call_original
InternalId.transaction { subject }
end
end
end
end
context 'when the feature flag is disabled' do
stub_feature_flags(generate_iids_without_explicit_locking: false)
it_behaves_like 'a monotonically increasing id generator'
end
context 'when the feature flag is enabled' do
stub_feature_flags(generate_iids_without_explicit_locking: true)
it_behaves_like 'a monotonically increasing id generator'
end
describe '#increment_and_save!' do
let(:id) { create(:internal_id) }
......
......@@ -165,9 +165,9 @@ RSpec.shared_examples 'AtomicInternalId' do |validate_presence: true|
3.times { supply.next_value }
end
current_value = described_class.public_send(method_name, scope_value, &:current_value)
expect(current_value).to eq(iid + 3)
described_class.public_send(method_name, scope_value) do |supply|
expect(supply.next_value).to eq(iid + 4)
end
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment