Commit f21cb86d authored by Michael Kozono's avatar Michael Kozono

Merge branch 'ag-refactor-registry-worker' into 'master'

Refactor RegistryBatcher to make it reusable

See merge request gitlab-org/gitlab!69492
parents e1938dec 03a081a0
# frozen_string_literal: true
module Gitlab
module Geo
# Returns an ID range to allow iteration over a destination table and its
# source replicable table. Repeats from the beginning after it reaches
# the end.
#
# Used by Geo in particular to iterate over a replicable and its destination
# table.
#
# Tracks a cursor for each table, by "key". If the table is smaller than
# batch_size, then a range for the whole table is returned on every call.
class BaseBatcher
# @param [Class] destination_class the class of the table to iterate on
# @param [String] key to identify the cursor. Note, cursor is already unique
# per table.
# @param [Integer] batch_size to limit the number of records in a batch
def initialize(source_class, destination_class, source_foreign_key, key:, batch_size: 1000)
@source_class = source_class
@source_foreign_key = source_foreign_key
@destination_class = destination_class
@key = key
@batch_size = batch_size
end
# @return [Range] a range of IDs. `nil` if 0 records at or after the cursor.
def next_range!
batch_first_id = cursor_id
batch_last_id = get_batch_last_id(batch_first_id)
return unless batch_last_id
batch_first_id..batch_last_id
end
private
attr_reader :source_class, :source_foreign_key, :destination_class, :key, :batch_size
# @private
#
# Get the last ID of the batch. Increment the cursor or reset it if at end.
#
# @param [Integer] batch_first_id the first ID of the batch
# @return [Integer] batch_last_id the last ID of the batch (not the table)
def get_batch_last_id(batch_first_id)
source_class_last_id, more_records = get_source_batch_last_id(batch_first_id)
destination_class_last_id, more_destination_records = get_destination_batch_last_id(batch_first_id)
batch_last_id =
if !more_records && more_destination_records
destination_class_last_id
else
source_class_last_id
end
if more_records || more_destination_records
increment_batch(batch_last_id)
else
reset if batch_first_id > 1
end
batch_last_id
end
# @private
#
# Get the last ID of the of the batch (not the table) for the replicable
# and check if there are more rows in the table.
#
# @param [Integer] batch_first_id the first ID of the batch
# @return [Integer, Boolean] A tuple with the the last ID of the batch (not the table),
# and whether or not have more rows to check in the table
def get_source_batch_last_id(batch_first_id)
sql = <<~SQL
SELECT MAX(batch.#{source_class.primary_key}) AS batch_last_id,
EXISTS (
SELECT #{source_class.primary_key}
FROM #{source_class.table_name}
WHERE #{source_class.primary_key} > MAX(batch.#{source_class.primary_key})
) AS more_rows
FROM (
SELECT #{source_class.primary_key}
FROM #{source_class.table_name}
WHERE #{source_class.primary_key} >= #{batch_first_id}
ORDER BY #{source_class.primary_key}
LIMIT #{batch_size}) AS batch;
SQL
result = source_class.connection.exec_query(sql).first
[result["batch_last_id"], result["more_rows"]]
end
# @private
#
# Get the last ID of the of the batch (not the table) for the destination
# and check if there are more rows in the table.
#
# This query differs from the replicable query by:
#
# - We check against the foreign key IDs not the destination IDs;
# - In the where clause of the more_rows part, we use greater
# than or equal. This allows the batcher to switch to the
# destination table while getting the last ID of the batch
# when the previous batch included the end of the replicable
# table but there are orphaned registries where the foreign key
# ids are higher than the last replicable id;
#
# @param [Integer] batch_first_id the first ID of the batch
# @return [Integer, Boolean] A tuple with the the last ID of the batch (not the table),
# and whether or not have more rows to check in the table
def get_destination_batch_last_id(batch_first_id)
sql = <<~SQL
SELECT MAX(batch.#{source_foreign_key}) AS batch_last_id,
EXISTS (
SELECT #{source_foreign_key}
FROM #{destination_class.table_name}
WHERE #{source_foreign_key} >= MAX(batch.#{source_foreign_key})
) AS more_rows
FROM (
SELECT #{source_foreign_key}
FROM #{destination_class.table_name}
WHERE #{source_foreign_key} >= #{batch_first_id}
ORDER BY #{source_foreign_key}
LIMIT #{batch_size}) AS batch;
SQL
result = destination_class.connection.exec_query(sql).first
[result["batch_last_id"], result["more_rows"]]
end
def reset
set_cursor_id(1)
end
def increment_batch(batch_last_id)
set_cursor_id(batch_last_id + 1)
end
# @private
#
# @return [Integer] the cursor ID, or 1 if it is not set
def cursor_id
Rails.cache.fetch("#{cache_key}:cursor_id") || 1
end
def set_cursor_id(id)
Rails.cache.write("#{cache_key}:cursor_id", id)
end
def cache_key
@cache_key ||= "#{self.class.name.parameterize}:#{destination_class.name.parameterize}:#{key}:cursor_id"
end
end
end
end
......@@ -2,156 +2,9 @@
module Gitlab
module Geo
# Returns an ID range to allow iteration over a registry table and its
# source replicable table. Repeats from the beginning after it reaches
# the end.
#
# Used by Geo in particular to iterate over a replicable and its registry
# table.
#
# Tracks a cursor for each table, by "key". If the table is smaller than
# batch_size, then a range for the whole table is returned on every call.
class RegistryBatcher
# @param [Class] registry_class the class of the table to iterate on
# @param [String] key to identify the cursor. Note, cursor is already unique
# per table.
# @param [Integer] batch_size to limit the number of records in a batch
class RegistryBatcher < BaseBatcher
def initialize(registry_class, key:, batch_size: 1000)
@model_class = registry_class::MODEL_CLASS
@model_foreign_key = registry_class::MODEL_FOREIGN_KEY
@registry_class = registry_class
@key = key
@batch_size = batch_size
end
# @return [Range] a range of IDs. `nil` if 0 records at or after the cursor.
def next_range!
batch_first_id = cursor_id
batch_last_id = get_batch_last_id(batch_first_id)
return unless batch_last_id
batch_first_id..batch_last_id
end
private
attr_reader :model_class, :model_foreign_key, :registry_class, :key, :batch_size
# @private
#
# Get the last ID of the batch. Increment the cursor or reset it if at end.
#
# @param [Integer] batch_first_id the first ID of the batch
# @return [Integer] batch_last_id the last ID of the batch (not the table)
def get_batch_last_id(batch_first_id)
model_class_last_id, more_records = get_model_batch_last_id(batch_first_id)
registry_class_last_id, more_registries = get_registry_batch_last_id(batch_first_id)
batch_last_id =
if !more_records && more_registries
registry_class_last_id
else
model_class_last_id
end
if more_records || more_registries
increment_batch(batch_last_id)
else
reset if batch_first_id > 1
end
batch_last_id
end
# @private
#
# Get the last ID of the of the batch (not the table) for the replicable
# and check if there are more rows in the table.
#
# @param [Integer] batch_first_id the first ID of the batch
# @return [Integer, Boolean] A tuple with the the last ID of the batch (not the table),
# and whether or not have more rows to check in the table
def get_model_batch_last_id(batch_first_id)
sql = <<~SQL
SELECT MAX(batch.#{model_class.primary_key}) AS batch_last_id,
EXISTS (
SELECT #{model_class.primary_key}
FROM #{model_class.table_name}
WHERE #{model_class.primary_key} > MAX(batch.#{model_class.primary_key})
) AS more_rows
FROM (
SELECT #{model_class.primary_key}
FROM #{model_class.table_name}
WHERE #{model_class.primary_key} >= #{batch_first_id}
ORDER BY #{model_class.primary_key}
LIMIT #{batch_size}) AS batch;
SQL
result = model_class.connection.exec_query(sql).first
[result["batch_last_id"], result["more_rows"]]
end
# @private
#
# Get the last ID of the of the batch (not the table) for the registry
# and check if there are more rows in the table.
#
# This query differs from the replicable query by:
#
# - We check against the foreign key IDs not the registry IDs;
# - In the where clause of the more_rows part, we use greater
# than or equal. This allows the batcher to switch to the
# registry table while getting the last ID of the batch
# when the previous batch included the end of the replicable
# table but there are orphaned registries where the foreign key
# ids are higher than the last replicable id;
#
# @param [Integer] batch_first_id the first ID of the batch
# @return [Integer, Boolean] A tuple with the the last ID of the batch (not the table),
# and whether or not have more rows to check in the table
def get_registry_batch_last_id(batch_first_id)
sql = <<~SQL
SELECT MAX(batch.#{model_foreign_key}) AS batch_last_id,
EXISTS (
SELECT #{model_foreign_key}
FROM #{registry_class.table_name}
WHERE #{model_foreign_key} >= MAX(batch.#{model_foreign_key})
) AS more_rows
FROM (
SELECT #{model_foreign_key}
FROM #{registry_class.table_name}
WHERE #{model_foreign_key} >= #{batch_first_id}
ORDER BY #{model_foreign_key}
LIMIT #{batch_size}) AS batch;
SQL
result = registry_class.connection.exec_query(sql).first
[result["batch_last_id"], result["more_rows"]]
end
def reset
set_cursor_id(1)
end
def increment_batch(batch_last_id)
set_cursor_id(batch_last_id + 1)
end
# @private
#
# @return [Integer] the cursor ID, or 1 if it is not set
def cursor_id
Rails.cache.fetch("#{cache_key}:cursor_id") || 1
end
def set_cursor_id(id)
Rails.cache.write("#{cache_key}:cursor_id", id)
end
def cache_key
@cache_key ||= "#{self.class.name.parameterize}:#{registry_class.name.parameterize}:#{key}:cursor_id"
super(registry_class::MODEL_CLASS, registry_class, registry_class::MODEL_FOREIGN_KEY, key: key, batch_size: batch_size)
end
end
end
......
......@@ -5,170 +5,9 @@ require 'spec_helper'
RSpec.describe Gitlab::Geo::RegistryBatcher, :geo, :use_clean_rails_memory_store_caching do
include EE::GeoHelpers
describe '#next_range!' do
let(:model_class) { LfsObject }
let(:model_foreign_key) { registry_class::MODEL_FOREIGN_KEY }
let(:registry_class) { Geo::LfsObjectRegistry }
let(:registry_class_factory) { registry_factory_name(registry_class) }
let(:key) { 'looping_batcher_spec' }
let(:batch_size) { 2 }
let(:source_class) { LfsObject }
let(:destination_class) { Geo::LfsObjectRegistry }
let(:destination_class_factory) { registry_factory_name(destination_class) }
subject { described_class.new(registry_class, key: key, batch_size: batch_size).next_range! }
context 'when there are no records' do
it { is_expected.to be_nil }
end
context 'when there are no records but there are orphaned registries' do
let!(:registries) { create_list(registry_class_factory, 3) }
context 'when it has never been called before' do
it { is_expected.to be_a Range }
it 'starts from the beginning' do
expect(subject.first).to eq(1)
end
it 'ends at a full batch' do
expect(subject.last).to eq(registries.second.public_send(model_foreign_key))
end
context 'when the batch size is greater than the number of registries' do
let(:batch_size) { 5 }
it 'ends at the last ID' do
expect(subject.last).to eq(registries.last.public_send(model_foreign_key))
end
end
end
context 'when it was called before' do
context 'when the previous batch included the end of the table' do
before do
described_class.new(registry_class, key: key, batch_size: registry_class.count).next_range!
end
it { is_expected.to be_nil }
end
context 'when the previous batch did not include the end of the table' do
before do
described_class.new(registry_class, key: key, batch_size: registry_class.count - 1).next_range!
end
it 'starts after the previous batch' do
expect(subject).to eq(registries.last.public_send(model_foreign_key)..registries.last.public_send(model_foreign_key))
end
end
context 'if cache is cleared' do
before do
described_class.new(registry_class, key: key, batch_size: batch_size).next_range!
end
it 'starts from the beginning' do
Rails.cache.clear
expect(subject).to eq(1..registries.second.public_send(model_foreign_key))
end
end
end
end
context 'when there are records' do
let!(:records) { create_list(model_class.underscore, 3) }
context 'when it has never been called before' do
it { is_expected.to be_a Range }
it 'starts from the beginning' do
expect(subject.first).to eq(1)
end
it 'ends at a full batch' do
expect(subject.last).to eq(records.second.id)
end
context 'when the batch size is greater than the number of records' do
let(:batch_size) { 5 }
it 'ends at the last ID' do
expect(subject.last).to eq(records.last.id)
end
end
end
context 'when it was called before' do
context 'when the previous batch included the end of the table' do
before do
described_class.new(registry_class, key: key, batch_size: model_class.count).next_range!
end
it 'starts from the beginning' do
expect(subject).to eq(1..records.second.id)
end
end
context 'when the previous batch did not include the end of the table' do
before do
described_class.new(registry_class, key: key, batch_size: model_class.count - 1).next_range!
end
it 'starts after the previous batch' do
expect(subject).to eq(records.last.id..records.last.id)
end
end
context 'if cache is cleared' do
before do
described_class.new(registry_class, key: key, batch_size: batch_size).next_range!
end
it 'starts from the beginning' do
Rails.cache.clear
expect(subject).to eq(1..records.second.id)
end
end
end
end
context 'when there are records and orphaned registries with foreign key greater than last record id' do
let!(:records) { create_list(model_class.underscore, 3) }
let(:orphaned_registry_foreign_key_id) { records.last.id }
let!(:registry) { create(registry_class_factory, model_foreign_key => orphaned_registry_foreign_key_id) }
before do
model_class.where(id: orphaned_registry_foreign_key_id).delete_all
end
context 'when it has never been called before' do
it { is_expected.to be_a Range }
it 'starts from the beginning' do
expect(subject.first).to eq(1)
end
it 'ends at the last registry foreign key ID' do
expect(subject.last).to eq(orphaned_registry_foreign_key_id)
end
end
context 'when it was called before' do
before do
described_class.new(registry_class, key: key, batch_size: batch_size).next_range!
end
it { is_expected.to be_nil }
context 'if cache is cleared' do
it 'starts from the beginning' do
Rails.cache.clear
expect(subject).to eq(1..orphaned_registry_foreign_key_id)
end
end
end
end
end
include_examples 'is a Geo batcher'
end
# frozen_string_literal: true
RSpec.shared_examples 'is a Geo batcher' do
include EE::GeoHelpers
describe '#next_range!' do
let(:batcher) { described_class.new(destination_class, key: key, batch_size: batch_size) }
let(:source_foreign_key) { batcher.send(:source_foreign_key) }
let(:key) { 'looping_batcher_spec' }
let(:batch_size) { 2 }
subject { batcher.next_range! }
context 'when there are no records' do
it { is_expected.to be_nil }
end
context 'when there are no records but there are orphaned destination_records' do
let!(:destination_records) { create_list(destination_class_factory, 3) }
context 'when it has never been called before' do
it { is_expected.to be_a Range }
it 'starts from the beginning' do
expect(subject.first).to eq(1)
end
it 'ends at a full batch' do
expect(subject.last).to eq(destination_records.second.public_send(source_foreign_key))
end
context 'when the batch size is greater than the number of destination_records' do
let(:batch_size) { 5 }
it 'ends at the last ID' do
expect(subject.last).to eq(destination_records.last.public_send(source_foreign_key))
end
end
end
context 'when it was called before' do
context 'when the previous batch included the end of the table' do
before do
described_class.new(destination_class, key: key, batch_size: destination_class.count).next_range!
end
it { is_expected.to be_nil }
end
context 'when the previous batch did not include the end of the table' do
before do
described_class.new(destination_class, key: key, batch_size: destination_class.count - 1).next_range!
end
it 'starts after the previous batch' do
expect(subject).to eq(destination_records.last.public_send(source_foreign_key)..destination_records.last.public_send(source_foreign_key))
end
end
context 'if cache is cleared' do
before do
described_class.new(destination_class, key: key, batch_size: batch_size).next_range!
end
it 'starts from the beginning' do
Rails.cache.clear
expect(subject).to eq(1..destination_records.second.public_send(source_foreign_key))
end
end
end
end
context 'when there are records' do
let!(:records) { create_list(source_class.underscore, 3) }
context 'when it has never been called before' do
it { is_expected.to be_a Range }
it 'starts from the beginning' do
expect(subject.first).to eq(1)
end
it 'ends at a full batch' do
expect(subject.last).to eq(records.second.id)
end
context 'when the batch size is greater than the number of records' do
let(:batch_size) { 5 }
it 'ends at the last ID' do
expect(subject.last).to eq(records.last.id)
end
end
end
context 'when it was called before' do
context 'when the previous batch included the end of the table' do
before do
described_class.new(destination_class, key: key, batch_size: source_class.count).next_range!
end
it 'starts from the beginning' do
expect(subject).to eq(1..records.second.id)
end
end
context 'when the previous batch did not include the end of the table' do
before do
described_class.new(destination_class, key: key, batch_size: source_class.count - 1).next_range!
end
it 'starts after the previous batch' do
expect(subject).to eq(records.last.id..records.last.id)
end
end
context 'if cache is cleared' do
before do
described_class.new(destination_class, key: key, batch_size: batch_size).next_range!
end
it 'starts from the beginning' do
Rails.cache.clear
expect(subject).to eq(1..records.second.id)
end
end
end
end
context 'when there are records and orphaned destination_records with foreign key greater than last record id' do
let!(:records) { create_list(source_class.underscore, 3) }
let(:orphaned_destination_foreign_key_id) { records.last.id }
let!(:destination) { create(destination_class_factory, source_foreign_key => orphaned_destination_foreign_key_id) }
before do
source_class.where(id: orphaned_destination_foreign_key_id).delete_all
end
context 'when it has never been called before' do
it { is_expected.to be_a Range }
it 'starts from the beginning' do
expect(subject.first).to eq(1)
end
it 'ends at the last destination foreign key ID' do
expect(subject.last).to eq(orphaned_destination_foreign_key_id)
end
end
context 'when it was called before' do
before do
described_class.new(destination_class, key: key, batch_size: batch_size).next_range!
end
it { is_expected.to be_nil }
context 'if cache is cleared' do
it 'starts from the beginning' do
Rails.cache.clear
expect(subject).to eq(1..orphaned_destination_foreign_key_id)
end
end
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment