Commit 601004a8 authored by Adam Hegyi's avatar Adam Hegyi

Merge branch 'support-bulk-on-duplicate-policy' into 'master'

Support `bulk_upsert!` and `skip_duplicates`

See merge request gitlab-org/gitlab!27215
parents 5acd1001 75e43646
...@@ -61,12 +61,13 @@ module BulkInsertSafe ...@@ -61,12 +61,13 @@ module BulkInsertSafe
super super
end end
# Inserts the given ActiveRecord [items] to the table mapped to this class via [InsertAll]. # Inserts the given ActiveRecord [items] to the table mapped to this class.
# Items will be inserted in batches of a given size, where insertion semantics are # Items will be inserted in batches of a given size, where insertion semantics are
# "atomic across all batches", i.e. either all items will be inserted or none. # "atomic across all batches".
# #
# @param [Boolean] validate Whether validations should run on [items] # @param [Boolean] validate Whether validations should run on [items]
# @param [Integer] batch_size How many items should at most be inserted at once # @param [Integer] batch_size How many items should at most be inserted at once
# @param [Boolean] skip_duplicates Marks duplicates as allowed, and skips inserting them
# @param [Proc] handle_attributes Block that will receive each item attribute hash # @param [Proc] handle_attributes Block that will receive each item attribute hash
# prior to insertion for further processing # prior to insertion for further processing
# #
...@@ -75,26 +76,65 @@ module BulkInsertSafe ...@@ -75,26 +76,65 @@ module BulkInsertSafe
# - [ActiveRecord::RecordInvalid] on entity validation failures # - [ActiveRecord::RecordInvalid] on entity validation failures
# - [ActiveRecord::RecordNotUnique] on duplicate key errors # - [ActiveRecord::RecordNotUnique] on duplicate key errors
# #
# @return true if all items succeeded to be inserted, throws otherwise. # @return true if operation succeeded, throws otherwise.
# #
def bulk_insert!(items, validate: true, batch_size: DEFAULT_BATCH_SIZE, &handle_attributes) def bulk_insert!(items, validate: true, skip_duplicates: false, batch_size: DEFAULT_BATCH_SIZE, &handle_attributes)
return true if items.empty? _bulk_insert_all!(items,
validate: validate,
_bulk_insert_in_batches(items, batch_size, validate, &handle_attributes) on_duplicate: skip_duplicates ? :skip : :raise,
unique_by: nil,
batch_size: batch_size,
&handle_attributes)
end
true # Upserts the given ActiveRecord [items] to the table mapped to this class.
# Items will be inserted or updated in batches of a given size,
# where insertion semantics are "atomic across all batches".
#
# @param [Boolean] validate Whether validations should run on [items]
# @param [Integer] batch_size How many items should at most be inserted at once
# @param [Symbol/Array] unique_by Defines index or columns to use to consider item duplicate
# @param [Proc] handle_attributes Block that will receive each item attribute hash
# prior to insertion for further processing
#
# Unique indexes can be identified by columns or name:
# - unique_by: :isbn
# - unique_by: %i[ author_id name ]
# - unique_by: :index_books_on_isbn
#
# Note that this method will throw on the following occasions:
# - [PrimaryKeySetError] when primary keys are set on entities prior to insertion
# - [ActiveRecord::RecordInvalid] on entity validation failures
# - [ActiveRecord::RecordNotUnique] on duplicate key errors
#
# @return true if operation succeeded, throws otherwise.
#
def bulk_upsert!(items, unique_by:, validate: true, batch_size: DEFAULT_BATCH_SIZE, &handle_attributes)
_bulk_insert_all!(items,
validate: validate,
on_duplicate: :update,
unique_by: unique_by,
batch_size: batch_size,
&handle_attributes)
end end
private private
def _bulk_insert_in_batches(items, batch_size, validate_items, &handle_attributes) def _bulk_insert_all!(items, on_duplicate:, unique_by:, validate:, batch_size:, &handle_attributes)
return true if items.empty?
transaction do transaction do
items.each_slice(batch_size) do |item_batch| items.each_slice(batch_size) do |item_batch|
attributes = _bulk_insert_item_attributes(item_batch, validate_items, &handle_attributes) attributes = _bulk_insert_item_attributes(
item_batch, validate, &handle_attributes)
insert_all!(attributes) ActiveRecord::InsertAll
.new(self, attributes, on_duplicate: on_duplicate, unique_by: unique_by)
.execute
end end
end end
true
end end
def _bulk_insert_item_attributes(items, validate_items) def _bulk_insert_item_attributes(items, validate_items)
......
...@@ -22,6 +22,18 @@ describe BulkInsertSafe do ...@@ -22,6 +22,18 @@ describe BulkInsertSafe do
algorithm: 'aes-256-gcm', algorithm: 'aes-256-gcm',
key: Settings.attr_encrypted_db_key_base_32, key: Settings.attr_encrypted_db_key_base_32,
insecure_mode: false insecure_mode: false
default_value_for :enum_value, 'case_1'
default_value_for :secret_value, 'my-secret'
default_value_for :sha_value, '2fd4e1c67a2d28fced849ee1bb76e7391b93eb12'
def self.valid_list(count)
Array.new(count) { |n| new(name: "item-#{n}") }
end
def self.invalid_list(count)
Array.new(count) { new }
end
end end
module InheritedUnsafeMethods module InheritedUnsafeMethods
...@@ -48,6 +60,8 @@ describe BulkInsertSafe do ...@@ -48,6 +60,8 @@ describe BulkInsertSafe do
t.text :encrypted_secret_value, null: false t.text :encrypted_secret_value, null: false
t.string :encrypted_secret_value_iv, null: false t.string :encrypted_secret_value_iv, null: false
t.binary :sha_value, null: false, limit: 20 t.binary :sha_value, null: false, limit: 20
t.index :name, unique: true
end end
end end
...@@ -60,87 +74,95 @@ describe BulkInsertSafe do ...@@ -60,87 +74,95 @@ describe BulkInsertSafe do
end end
end end
def build_valid_items_for_bulk_insertion describe BulkInsertItem do
Array.new(10) do |n| it_behaves_like 'a BulkInsertSafe model', described_class do
BulkInsertItem.new( let(:valid_items_for_bulk_insertion) { described_class.valid_list(10) }
name: "item-#{n}", let(:invalid_items_for_bulk_insertion) { described_class.invalid_list(10) }
enum_value: 'case_1',
secret_value: 'my-secret',
sha_value: '2fd4e1c67a2d28fced849ee1bb76e7391b93eb12'
)
end end
end
def build_invalid_items_for_bulk_insertion context 'when inheriting class methods' do
Array.new(10) do it 'raises an error when method is not bulk-insert safe' do
BulkInsertItem.new( expect { described_class.include(InheritedUnsafeMethods) }
name: nil, # requires `name` to be set .to raise_error(described_class::MethodNotAllowedError)
enum_value: 'case_1', end
secret_value: 'my-secret',
sha_value: '2fd4e1c67a2d28fced849ee1bb76e7391b93eb12' it 'does not raise an error when method is bulk-insert safe' do
) expect { described_class.include(InheritedSafeMethods) }.not_to raise_error
end
end end
end
it_behaves_like 'a BulkInsertSafe model', BulkInsertItem do context 'primary keys' do
let(:valid_items_for_bulk_insertion) { build_valid_items_for_bulk_insertion } it 'raises error if primary keys are set prior to insertion' do
let(:invalid_items_for_bulk_insertion) { build_invalid_items_for_bulk_insertion } item = described_class.new(name: 'valid', id: 10)
end
context 'when inheriting class methods' do expect { described_class.bulk_insert!([item]) }
it 'raises an error when method is not bulk-insert safe' do .to raise_error(described_class::PrimaryKeySetError)
expect { BulkInsertItem.include(InheritedUnsafeMethods) }.to( end
raise_error(subject::MethodNotAllowedError))
end end
it 'does not raise an error when method is bulk-insert safe' do describe '.bulk_insert!' do
expect { BulkInsertItem.include(InheritedSafeMethods) }.not_to raise_error it 'inserts items in the given number of batches' do
end items = described_class.valid_list(10)
end
expect(ActiveRecord::InsertAll).to receive(:new).twice.and_call_original
context 'primary keys' do described_class.bulk_insert!(items, batch_size: 5)
it 'raises error if primary keys are set prior to insertion' do
items = build_valid_items_for_bulk_insertion
items.each_with_index do |item, n|
item.id = n
end end
expect { BulkInsertItem.bulk_insert!(items) }.to raise_error(subject::PrimaryKeySetError) it 'items can be properly fetched from database' do
end items = described_class.valid_list(10)
end
describe '.bulk_insert!' do described_class.bulk_insert!(items)
it 'inserts items in the given number of batches' do
items = build_valid_items_for_bulk_insertion
expect(items.size).to eq(10)
expect(BulkInsertItem).to receive(:insert_all!).twice
BulkInsertItem.bulk_insert!(items, batch_size: 5) attribute_names = described_class.attribute_names - %w[id]
end expect(described_class.last(items.size).pluck(*attribute_names)).to eq(
items.pluck(*attribute_names))
end
it 'items can be properly fetched from database' do it 'rolls back the transaction when any item is invalid' do
items = build_valid_items_for_bulk_insertion # second batch is bad
all_items = described_class.valid_list(10) +
described_class.invalid_list(10)
BulkInsertItem.bulk_insert!(items) expect do
described_class.bulk_insert!(all_items, batch_size: 2) rescue nil
end.not_to change { described_class.count }
end
attribute_names = BulkInsertItem.attribute_names - %w[id] it 'does nothing and returns true when items are empty' do
expect(BulkInsertItem.last(items.size).pluck(*attribute_names)).to eq( expect(described_class.bulk_insert!([])).to be(true)
items.pluck(*attribute_names)) expect(described_class.count).to eq(0)
end
end end
it 'rolls back the transaction when any item is invalid' do context 'when duplicate items are to be inserted' do
# second batch is bad let!(:existing_object) { described_class.create!(name: 'duplicate', secret_value: 'old value') }
all_items = build_valid_items_for_bulk_insertion + build_invalid_items_for_bulk_insertion let(:new_object) { described_class.new(name: 'duplicate', secret_value: 'new value') }
batch_size = all_items.size / 2
describe '.bulk_insert!' do
context 'when skip_duplicates is set to false' do
it 'raises an exception' do
expect { described_class.bulk_insert!([new_object], skip_duplicates: false) }
.to raise_error(ActiveRecord::RecordNotUnique)
end
end
context 'when skip_duplicates is set to true' do
it 'does not update existing object' do
described_class.bulk_insert!([new_object], skip_duplicates: true)
expect(existing_object.reload.secret_value).to eq('old value')
end
end
end
expect do describe '.bulk_upsert!' do
BulkInsertItem.bulk_insert!(all_items, batch_size: batch_size) rescue nil it 'updates existing object' do
end.not_to change { BulkInsertItem.count } described_class.bulk_upsert!([new_object], unique_by: %w[name])
end
it 'does nothing and returns true when items are empty' do expect(existing_object.reload.secret_value).to eq('new value')
expect(BulkInsertItem.bulk_insert!([])).to be(true) end
expect(BulkInsertItem.count).to eq(0) end
end end
end end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment