Commit fb078636 authored by Kamil Trzciński's avatar Kamil Trzciński

Rewind IID on Ci::Pipelines

If no pipeline is created we currently have IID gap.
This is due to fact that we generate IID not on save,
but rather ahead of time. This results, us,
losing IIDs.
parent ef82859d
...@@ -53,6 +53,20 @@ module AtomicInternalId ...@@ -53,6 +53,20 @@ module AtomicInternalId
value value
end end
define_method("reset_#{scope}_#{column}") do
if value = read_attribute(column)
scope_value = association(scope).reader
scope_attrs = { scope_value.class.table_name.singularize.to_sym => scope_value }
usage = self.class.table_name.to_sym
if InternalId.reset(self, scope_attrs, usage, value)
write_attribute(column, nil)
end
end
read_attribute(column)
end
end end
end end
end end
...@@ -55,7 +55,8 @@ class InternalId < ApplicationRecord ...@@ -55,7 +55,8 @@ class InternalId < ApplicationRecord
def track_greatest(subject, scope, usage, new_value, init) def track_greatest(subject, scope, usage, new_value, init)
return new_value unless available? return new_value unless available?
InternalIdGenerator.new(subject, scope, usage, init).track_greatest(new_value) InternalIdGenerator.new(subject, scope, usage)
.track_greatest(init, new_value)
end end
def generate_next(subject, scope, usage, init) def generate_next(subject, scope, usage, init)
...@@ -63,7 +64,15 @@ class InternalId < ApplicationRecord ...@@ -63,7 +64,15 @@ class InternalId < ApplicationRecord
# This can be the case in other (unrelated) migration specs # This can be the case in other (unrelated) migration specs
return (init.call(subject) || 0) + 1 unless available? return (init.call(subject) || 0) + 1 unless available?
InternalIdGenerator.new(subject, scope, usage, init).generate InternalIdGenerator.new(subject, scope, usage)
.generate(init)
end
def reset(subject, scope, usage, value)
return false unless available?
InternalIdGenerator.new(subject, scope, usage)
.reset(value)
end end
# Flushing records is generally safe in a sense that those # Flushing records is generally safe in a sense that those
...@@ -103,14 +112,11 @@ class InternalId < ApplicationRecord ...@@ -103,14 +112,11 @@ class InternalId < ApplicationRecord
# subject: The instance we're generating an internal id for. Gets passed to init if called. # subject: The instance we're generating an internal id for. Gets passed to init if called.
# scope: Attributes that define the scope for id generation. # scope: Attributes that define the scope for id generation.
# usage: Symbol to define the usage of the internal id, see InternalId.usages # usage: Symbol to define the usage of the internal id, see InternalId.usages
# init: Block that gets called to initialize InternalId record if not present attr_reader :subject, :scope, :scope_attrs, :usage
# Make sure to not throw exceptions in the absence of records (if this is expected).
attr_reader :subject, :scope, :init, :scope_attrs, :usage
def initialize(subject, scope, usage, init) def initialize(subject, scope, usage)
@subject = subject @subject = subject
@scope = scope @scope = scope
@init = init
@usage = usage @usage = usage
raise ArgumentError, 'Scope is not well-defined, need at least one column for scope (given: 0)' if scope.empty? raise ArgumentError, 'Scope is not well-defined, need at least one column for scope (given: 0)' if scope.empty?
...@@ -121,23 +127,40 @@ class InternalId < ApplicationRecord ...@@ -121,23 +127,40 @@ class InternalId < ApplicationRecord
end end
# Generates next internal id and returns it # Generates next internal id and returns it
def generate # init: Block that gets called to initialize InternalId record if not present
# Make sure to not throw exceptions in the absence of records (if this is expected).
def generate(init)
subject.transaction do subject.transaction do
# Create a record in internal_ids if one does not yet exist # Create a record in internal_ids if one does not yet exist
# and increment its last value # and increment its last value
# #
# Note this will acquire a ROW SHARE lock on the InternalId record # Note this will acquire a ROW SHARE lock on the InternalId record
(lookup || create_record).increment_and_save! (lookup || create_record(init)).increment_and_save!
end end
end end
# Reset tries to rewind to `value-1`. This will only succeed,
# if `value` stored in database is equal to `last_value`.
# value: The expected last_value to decrement
def reset(value)
return false unless value
updated =
InternalId
.where(**scope, usage: usage_value)
.where(last_value: value)
.update_all('last_value = last_value - 1')
updated > 0
end
# Create a record in internal_ids if one does not yet exist # Create a record in internal_ids if one does not yet exist
# and set its new_value if it is higher than the current last_value # and set its new_value if it is higher than the current last_value
# #
# Note this will acquire a ROW SHARE lock on the InternalId record # Note this will acquire a ROW SHARE lock on the InternalId record
def track_greatest(new_value) def track_greatest(init, new_value)
subject.transaction do subject.transaction do
(lookup || create_record).track_greatest_and_save!(new_value) (lookup || create_record(init)).track_greatest_and_save!(new_value)
end end
end end
...@@ -158,7 +181,7 @@ class InternalId < ApplicationRecord ...@@ -158,7 +181,7 @@ class InternalId < ApplicationRecord
# was faster in doing this, we'll realize once we hit the unique key constraint # was faster in doing this, we'll realize once we hit the unique key constraint
# violation. We can safely roll-back the nested transaction and perform # violation. We can safely roll-back the nested transaction and perform
# a lookup instead to retrieve the record. # a lookup instead to retrieve the record.
def create_record def create_record(init)
subject.transaction(requires_new: true) do subject.transaction(requires_new: true) do
InternalId.create!( InternalId.create!(
**scope, **scope,
......
...@@ -55,6 +55,10 @@ module Ci ...@@ -55,6 +55,10 @@ module Ci
end end
end end
# If pipeline is not persisted, try to recover IID
pipeline.reset_project_iid unless pipeline.persisted? ||
Feature.disabled?(:ci_pipeline_rewind_iid, project, default_enabled: true)
pipeline pipeline
end end
......
---
title: Rewind IID on Ci::Pipelines
merge_request: 26490
author:
type: fixed
...@@ -107,6 +107,57 @@ describe InternalId do ...@@ -107,6 +107,57 @@ describe InternalId do
end end
end end
describe '.reset' do
subject { described_class.reset(issue, scope, usage, value) }
context 'in the absence of a record' do
let(:value) { 2 }
it 'does not revert back the value' do
expect { subject }.not_to change { described_class.count }
expect(subject).to be_falsey
end
end
context 'when valid iid is used to reset' do
let!(:value) { generate_next }
context 'and iid is a latest one' do
it 'does rewind and next generated value is the same' do
expect(subject).to be_truthy
expect(generate_next).to eq(value)
end
end
context 'and iid is not a latest one' do
it 'does not rewind' do
generate_next
expect(subject).to be_falsey
expect(generate_next).to be > value
end
end
def generate_next
described_class.generate_next(issue, scope, usage, init)
end
end
context 'with an insufficient schema version' do
let(:value) { 2 }
before do
described_class.reset_column_information
# Project factory will also call the current_version
expect(ActiveRecord::Migrator).to receive(:current_version).twice.and_return(InternalId::REQUIRED_SCHEMA_VERSION - 1)
end
it 'does not reset any of the iids' do
expect(subject).to be_falsey
end
end
end
describe '.track_greatest' do describe '.track_greatest' do
let(:value) { 9001 } let(:value) { 9001 }
subject { described_class.track_greatest(issue, scope, usage, value, init) } subject { described_class.track_greatest(issue, scope, usage, value, init) }
......
...@@ -25,7 +25,8 @@ describe Ci::CreatePipelineService do ...@@ -25,7 +25,8 @@ describe Ci::CreatePipelineService do
merge_request: nil, merge_request: nil,
push_options: nil, push_options: nil,
source_sha: nil, source_sha: nil,
target_sha: nil) target_sha: nil,
save_on_errors: true)
params = { ref: ref, params = { ref: ref,
before: '00000000', before: '00000000',
after: after, after: after,
...@@ -36,7 +37,7 @@ describe Ci::CreatePipelineService do ...@@ -36,7 +37,7 @@ describe Ci::CreatePipelineService do
target_sha: target_sha } target_sha: target_sha }
described_class.new(project, user, params).execute( described_class.new(project, user, params).execute(
source, trigger_request: trigger_request, merge_request: merge_request) source, save_on_errors: save_on_errors, trigger_request: trigger_request, merge_request: merge_request)
end end
# rubocop:enable Metrics/ParameterLists # rubocop:enable Metrics/ParameterLists
...@@ -57,6 +58,7 @@ describe Ci::CreatePipelineService do ...@@ -57,6 +58,7 @@ describe Ci::CreatePipelineService do
expect(pipeline).to eq(project.ci_pipelines.last) expect(pipeline).to eq(project.ci_pipelines.last)
expect(pipeline).to have_attributes(user: user) expect(pipeline).to have_attributes(user: user)
expect(pipeline).to have_attributes(status: 'pending') expect(pipeline).to have_attributes(status: 'pending')
expect(pipeline.iid).not_to be_nil
expect(pipeline.repository_source?).to be true expect(pipeline.repository_source?).to be true
expect(pipeline.builds.first).to be_kind_of(Ci::Build) expect(pipeline.builds.first).to be_kind_of(Ci::Build)
end end
...@@ -446,6 +448,43 @@ describe Ci::CreatePipelineService do ...@@ -446,6 +448,43 @@ describe Ci::CreatePipelineService do
expect(Ci::Build.all).to be_empty expect(Ci::Build.all).to be_empty
expect(Ci::Pipeline.count).to eq(0) expect(Ci::Pipeline.count).to eq(0)
end end
describe '#iid' do
let(:internal_id) do
InternalId.find_by(project_id: project.id, usage: :ci_pipelines)
end
before do
expect_any_instance_of(Ci::Pipeline).to receive(:ensure_project_iid!)
.and_call_original
end
context 'when ci_pipeline_rewind_iid is enabled' do
before do
stub_feature_flags(ci_pipeline_rewind_iid: true)
end
it 'rewinds iid' do
result = execute_service
expect(result).not_to be_persisted
expect(internal_id.last_value).to eq(0)
end
end
context 'when ci_pipeline_rewind_iid is disabled' do
before do
stub_feature_flags(ci_pipeline_rewind_iid: false)
end
it 'does not rewind iid' do
result = execute_service
expect(result).not_to be_persisted
expect(internal_id.last_value).to eq(1)
end
end
end
end end
context 'with manual actions' do context 'with manual actions' do
......
...@@ -52,20 +52,20 @@ shared_examples_for 'AtomicInternalId' do |validate_presence: true| ...@@ -52,20 +52,20 @@ shared_examples_for 'AtomicInternalId' do |validate_presence: true|
expect(InternalId).to receive(:generate_next).with(instance, scope_attrs, usage, any_args).and_return(iid) expect(InternalId).to receive(:generate_next).with(instance, scope_attrs, usage, any_args).and_return(iid)
subject subject
expect(instance.public_send(internal_id_attribute)).to eq(iid) expect(read_internal_id).to eq(iid)
end end
it 'does not overwrite an existing internal id' do it 'does not overwrite an existing internal id' do
instance.public_send("#{internal_id_attribute}=", 4711) write_internal_id(4711)
expect { subject }.not_to change { instance.public_send(internal_id_attribute) } expect { subject }.not_to change { read_internal_id }
end end
context 'when the instance has an internal ID set' do context 'when the instance has an internal ID set' do
let(:internal_id) { 9001 } let(:internal_id) { 9001 }
it 'calls InternalId.update_last_value and sets the `last_value` to that of the instance' do it 'calls InternalId.update_last_value and sets the `last_value` to that of the instance' do
instance.send("#{internal_id_attribute}=", internal_id) write_internal_id(internal_id)
expect(InternalId) expect(InternalId)
.to receive(:track_greatest) .to receive(:track_greatest)
...@@ -75,5 +75,39 @@ shared_examples_for 'AtomicInternalId' do |validate_presence: true| ...@@ -75,5 +75,39 @@ shared_examples_for 'AtomicInternalId' do |validate_presence: true|
end end
end end
end end
describe "#reset_scope_internal_id_attribute" do
it 'rewinds the allocated IID' do
expect { ensure_scope_attribute! }.not_to raise_error
expect(read_internal_id).not_to be_nil
expect(reset_scope_attribute).to be_nil
expect(read_internal_id).to be_nil
end
it 'allocates the same IID' do
internal_id = ensure_scope_attribute!
reset_scope_attribute
expect(read_internal_id).to be_nil
expect(ensure_scope_attribute!).to eq(internal_id)
end
end
def ensure_scope_attribute!
instance.public_send(:"ensure_#{scope}_#{internal_id_attribute}!")
end
def reset_scope_attribute
instance.public_send(:"reset_#{scope}_#{internal_id_attribute}")
end
def read_internal_id
instance.public_send(internal_id_attribute)
end
def write_internal_id(value)
instance.public_send(:"#{internal_id_attribute}=", value)
end
end end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment