Rename Gitlab::LoopingBatcher to Gitlab::Geo::RegistryBatcher

It became specific to Geo to iterate over a replicable
and its registry tables, and it probably won't be useful
for anyone else.
parent e514c627
......@@ -33,7 +33,7 @@ module Geo
# @return [Range] the next range of a batch of records
def next_range!
Gitlab::LoopingBatcher.new(registry_class, key: batcher_key, batch_size: batch_size).next_range!
Gitlab::Geo::RegistryBatcher.new(registry_class, key: batcher_key, batch_size: batch_size).next_range!
end
def batcher_key
......
# frozen_string_literal: true
module Gitlab
module Geo
# Returns an ID range within a table so it can be iterated over. Repeats from
# the beginning after it reaches the end.
#
# Used by Geo in particular to iterate over a replicable and its registry
# table.
#
# Tracks a cursor for each table, by "key". If the table is smaller than
# batch_size, then a range for the whole table is returned on every call.
class RegistryBatcher
# @param [Class] registry_class the class of the table to iterate on
# @param [String] key to identify the cursor. Note, cursor is already unique
# per table.
# @param [Integer] batch_size to limit the number of records in a batch
def initialize(registry_class, key:, batch_size: 1000)
@model_class = registry_class::MODEL_CLASS
@model_foreign_key = registry_class::MODEL_FOREIGN_KEY
@registry_class = registry_class
@key = key
@batch_size = batch_size
end
# @return [Range] a range of IDs. `nil` if 0 records at or after the cursor.
def next_range!
batch_first_id = cursor_id
batch_last_id = get_batch_last_id(batch_first_id)
return unless batch_last_id
batch_first_id..batch_last_id
end
private
attr_reader :model_class, :model_foreign_key, :registry_class, :key, :batch_size
# @private
#
# Get the last ID of the batch. Increment the cursor or reset it if at end.
#
# @param [Integer] batch_first_id the first ID of the batch
# @return [Integer] batch_last_id the last ID of the batch (not the table)
def get_batch_last_id(batch_first_id)
model_class_last_id, more_records = get_model_batch_last_id(batch_first_id)
registry_class_last_id, more_registries = get_registry_batch_last_id(batch_first_id)
batch_last_id =
if !more_records && more_registries
registry_class_last_id
else
model_class_last_id
end
if more_records || more_registries
increment_batch(batch_last_id)
else
reset if batch_first_id > 1
end
batch_last_id
end
# @private
#
# Get the last ID of the of the batch (not the table) for the replicable
# and check if there are more rows in the table.
#
# @param [Integer] batch_first_id the first ID of the batch
# @return [Integer, Boolean] A tuple with the the last ID of the batch (not the table),
# and whether or not have more rows to check in the table
def get_model_batch_last_id(batch_first_id)
sql = <<~SQL
SELECT MAX(batch.#{model_class.primary_key}) AS batch_last_id,
EXISTS (
SELECT #{model_class.primary_key}
FROM #{model_class.table_name}
WHERE #{model_class.primary_key} > MAX(batch.#{model_class.primary_key})
) AS more_rows
FROM (
SELECT #{model_class.primary_key}
FROM #{model_class.table_name}
WHERE #{model_class.primary_key} >= #{batch_first_id}
ORDER BY #{model_class.primary_key}
LIMIT #{batch_size}) AS batch;
SQL
result = model_class.connection.exec_query(sql).first
[result["batch_last_id"], result["more_rows"]]
end
# @private
#
# This method is used to remove orphaned registries where the foreign key
# IDs are greather than last replicable ID. The difference here is that
# we need to check against the foreign key IDS not the registry ID and
# for the existence of more rows to check we need to check against the
# the first ID of the batch.
#
# @param [Integer] batch_first_id the first ID of the batch
# @return [Integer, Boolean] A tuple with the the last ID of the batch (not the table),
# and whether or not have more rows to check in the table
def get_registry_batch_last_id(batch_first_id)
sql = <<~SQL
SELECT MAX(batch.#{model_foreign_key}) AS batch_last_id,
EXISTS (
SELECT #{model_foreign_key}
FROM #{registry_class.table_name}
WHERE #{model_foreign_key} > #{batch_first_id}
) AS more_rows
FROM (
SELECT #{model_foreign_key}
FROM #{registry_class.table_name}
WHERE #{model_foreign_key} > #{batch_first_id}
ORDER BY #{model_foreign_key}
LIMIT #{batch_size}) AS batch;
SQL
result = registry_class.connection.exec_query(sql).first
[result["batch_last_id"], result["more_rows"]]
end
def reset
set_cursor_id(1)
end
def increment_batch(batch_last_id)
set_cursor_id(batch_last_id + 1)
end
# @private
#
# @return [Integer] the cursor ID, or 1 if it is not set
def cursor_id
Rails.cache.fetch("#{cache_key}:cursor_id") || 1
end
def set_cursor_id(id)
Rails.cache.write("#{cache_key}:cursor_id", id)
end
def cache_key
@cache_key ||= "#{self.class.name.parameterize}:#{registry_class.name.parameterize}:#{key}:cursor_id"
end
end
end
end
......@@ -2,7 +2,7 @@
require 'spec_helper'
describe Gitlab::LoopingBatcher, :geo, :use_clean_rails_memory_store_caching do
describe Gitlab::Geo::RegistryBatcher, :geo, :use_clean_rails_memory_store_caching do
describe '#next_range!' do
let(:model_class) { LfsObject }
let(:model_foreign_key) { registry_class::MODEL_FOREIGN_KEY }
......@@ -17,7 +17,7 @@ describe Gitlab::LoopingBatcher, :geo, :use_clean_rails_memory_store_caching do
it { is_expected.to be_nil }
end
context 'when there are no records but there are unused registries' do
context 'when there are no records but there are orphaned registries' do
let!(:registries) { create_list(registry_class_factory, 3) }
context 'when it has never been called before' do
......@@ -115,13 +115,13 @@ describe Gitlab::LoopingBatcher, :geo, :use_clean_rails_memory_store_caching do
end
end
context 'when there are records and unused registries with foreign key greather than last record id' do
context 'when there are records and orphaned registries with foreign key greater than last record id' do
let!(:records) { create_list(model_class.underscore, 3) }
let(:unused_registry_foreign_key_id) { records.last.id }
let!(:registry) { create(registry_class_factory, model_foreign_key => unused_registry_foreign_key_id) }
let(:orphaned_registry_foreign_key_id) { records.last.id }
let!(:registry) { create(registry_class_factory, model_foreign_key => orphaned_registry_foreign_key_id) }
before do
model_class.where(id: unused_registry_foreign_key_id).delete_all
model_class.where(id: orphaned_registry_foreign_key_id).delete_all
end
context 'when it has never been called before' do
......@@ -132,7 +132,7 @@ describe Gitlab::LoopingBatcher, :geo, :use_clean_rails_memory_store_caching do
end
it 'ends at the last registry foreign key ID' do
expect(subject.last).to eq(unused_registry_foreign_key_id)
expect(subject.last).to eq(orphaned_registry_foreign_key_id)
end
end
......@@ -147,7 +147,7 @@ describe Gitlab::LoopingBatcher, :geo, :use_clean_rails_memory_store_caching do
it 'starts from the beginning' do
Rails.cache.clear
expect(subject).to eq(1..unused_registry_foreign_key_id)
expect(subject).to eq(1..orphaned_registry_foreign_key_id)
end
end
end
......
# frozen_string_literal: true
module Gitlab
# Returns an ID range within a table so it can be iterated over. Repeats from
# the beginning after it reaches the end.
#
# Used by Geo in particular to iterate over a replicable and its registry
# table.
#
# Tracks a cursor for each table, by "key". If the table is smaller than
# batch_size, then a range for the whole table is returned on every call.
class LoopingBatcher
# @param [Class] model_class the class of the table to iterate on
# @param [String] key to identify the cursor. Note, cursor is already unique
# per table.
# @param [Integer] batch_size to limit the number of records in a batch
def initialize(registry_class, key:, batch_size: 1000)
@model_class = registry_class::MODEL_CLASS
@model_foreign_key = registry_class::MODEL_FOREIGN_KEY
@registry_class = registry_class
@key = key
@batch_size = batch_size
end
# @return [Range] a range of IDs. `nil` if 0 records at or after the cursor.
def next_range!
# return unless @model_class.any?
batch_first_id = cursor_id
batch_last_id = get_batch_last_id(batch_first_id)
return unless batch_last_id
batch_first_id..batch_last_id
end
private
attr_reader :model_class, :model_foreign_key, :registry_class, :key, :batch_size
# @private
#
# Get the last ID of the batch. Increment the cursor or reset it if at end.
#
# @param [Integer] batch_first_id the first ID of the batch
# @return [Integer] batch_last_id the last ID of the batch (not the table)
def get_batch_last_id(batch_first_id)
model_class_last_id, more_records = get_model_batch_last_id(batch_first_id)
registry_class_last_id, more_registries = get_registry_batch_last_id(batch_first_id)
batch_last_id =
if !more_records && more_registries
registry_class_last_id
else
model_class_last_id
end
if more_records || more_registries
increment_batch(batch_last_id)
else
reset if batch_first_id > 1
end
batch_last_id
end
def get_model_batch_last_id(batch_first_id)
sql = <<~SQL
SELECT MAX(batch.#{model_class.primary_key}) AS batch_last_id,
EXISTS (
SELECT #{model_class.primary_key}
FROM #{model_class.table_name}
WHERE #{model_class.primary_key} > MAX(batch.#{model_class.primary_key})
) AS more_rows
FROM (
SELECT #{model_class.primary_key}
FROM #{model_class.table_name}
WHERE #{model_class.primary_key} >= #{batch_first_id}
ORDER BY #{model_class.primary_key}
LIMIT #{batch_size}) AS batch;
SQL
result = model_class.connection.exec_query(sql).first
[result["batch_last_id"], result["more_rows"]]
end
def get_registry_batch_last_id(batch_first_id)
sql = <<~SQL
SELECT MAX(batch.#{model_foreign_key}) AS batch_last_id,
EXISTS (
SELECT #{model_foreign_key}
FROM #{registry_class.table_name}
WHERE #{model_foreign_key} > #{batch_first_id}
) AS more_rows
FROM (
SELECT #{model_foreign_key}
FROM #{registry_class.table_name}
WHERE #{model_foreign_key} > #{batch_first_id}
ORDER BY #{model_foreign_key}
LIMIT #{batch_size}) AS batch;
SQL
result = registry_class.connection.exec_query(sql).first
[result["batch_last_id"], result["more_rows"]]
end
def reset
set_cursor_id(1)
end
def increment_batch(batch_last_id)
set_cursor_id(batch_last_id + 1)
end
# @private
#
# @return [Integer] the cursor ID, or 1 if it is not set
def cursor_id
Rails.cache.fetch("#{cache_key}:cursor_id") || 1
end
def set_cursor_id(id)
Rails.cache.write("#{cache_key}:cursor_id", id)
end
def cache_key
@cache_key ||= "#{self.class.name.parameterize}:#{registry_class.name.parameterize}:#{key}:cursor_id"
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment