Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
G
gitlab-ce
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
1
Merge Requests
1
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
nexedi
gitlab-ce
Commits
03a081a0
Commit
03a081a0
authored
Sep 09, 2021
by
Aakriti Gupta
Committed by
Michael Kozono
Sep 09, 2021
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Refactor RegistryBatcher to make it reusable
parent
fe17d707
Changes
4
Show whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
334 additions
and
314 deletions
+334
-314
ee/lib/gitlab/geo/base_batcher.rb
ee/lib/gitlab/geo/base_batcher.rb
+158
-0
ee/lib/gitlab/geo/registry_batcher.rb
ee/lib/gitlab/geo/registry_batcher.rb
+2
-149
ee/spec/lib/gitlab/geo/registry_batcher_spec.rb
ee/spec/lib/gitlab/geo/registry_batcher_spec.rb
+4
-165
ee/spec/support/shared_examples/models/geo_batcher_shared_examples.rb
...ort/shared_examples/models/geo_batcher_shared_examples.rb
+170
-0
No files found.
ee/lib/gitlab/geo/base_batcher.rb
0 → 100644
View file @
03a081a0
# frozen_string_literal: true
module
Gitlab
module
Geo
# Returns an ID range to allow iteration over a destination table and its
# source replicable table. Repeats from the beginning after it reaches
# the end.
#
# Used by Geo in particular to iterate over a replicable and its destination
# table.
#
# Tracks a cursor for each table, by "key". If the table is smaller than
# batch_size, then a range for the whole table is returned on every call.
class
BaseBatcher
# @param [Class] destination_class the class of the table to iterate on
# @param [String] key to identify the cursor. Note, cursor is already unique
# per table.
# @param [Integer] batch_size to limit the number of records in a batch
def
initialize
(
source_class
,
destination_class
,
source_foreign_key
,
key
:,
batch_size:
1000
)
@source_class
=
source_class
@source_foreign_key
=
source_foreign_key
@destination_class
=
destination_class
@key
=
key
@batch_size
=
batch_size
end
# @return [Range] a range of IDs. `nil` if 0 records at or after the cursor.
def
next_range!
batch_first_id
=
cursor_id
batch_last_id
=
get_batch_last_id
(
batch_first_id
)
return
unless
batch_last_id
batch_first_id
..
batch_last_id
end
private
attr_reader
:source_class
,
:source_foreign_key
,
:destination_class
,
:key
,
:batch_size
# @private
#
# Get the last ID of the batch. Increment the cursor or reset it if at end.
#
# @param [Integer] batch_first_id the first ID of the batch
# @return [Integer] batch_last_id the last ID of the batch (not the table)
def
get_batch_last_id
(
batch_first_id
)
source_class_last_id
,
more_records
=
get_source_batch_last_id
(
batch_first_id
)
destination_class_last_id
,
more_destination_records
=
get_destination_batch_last_id
(
batch_first_id
)
batch_last_id
=
if
!
more_records
&&
more_destination_records
destination_class_last_id
else
source_class_last_id
end
if
more_records
||
more_destination_records
increment_batch
(
batch_last_id
)
else
reset
if
batch_first_id
>
1
end
batch_last_id
end
# @private
#
# Get the last ID of the of the batch (not the table) for the replicable
# and check if there are more rows in the table.
#
# @param [Integer] batch_first_id the first ID of the batch
# @return [Integer, Boolean] A tuple with the the last ID of the batch (not the table),
# and whether or not have more rows to check in the table
def
get_source_batch_last_id
(
batch_first_id
)
sql
=
<<~
SQL
SELECT MAX(batch.
#{
source_class
.
primary_key
}
) AS batch_last_id,
EXISTS (
SELECT
#{
source_class
.
primary_key
}
FROM
#{
source_class
.
table_name
}
WHERE
#{
source_class
.
primary_key
}
> MAX(batch.
#{
source_class
.
primary_key
}
)
) AS more_rows
FROM (
SELECT
#{
source_class
.
primary_key
}
FROM
#{
source_class
.
table_name
}
WHERE
#{
source_class
.
primary_key
}
>=
#{
batch_first_id
}
ORDER BY
#{
source_class
.
primary_key
}
LIMIT
#{
batch_size
}
) AS batch;
SQL
result
=
source_class
.
connection
.
exec_query
(
sql
).
first
[
result
[
"batch_last_id"
],
result
[
"more_rows"
]]
end
# @private
#
# Get the last ID of the of the batch (not the table) for the destination
# and check if there are more rows in the table.
#
# This query differs from the replicable query by:
#
# - We check against the foreign key IDs not the destination IDs;
# - In the where clause of the more_rows part, we use greater
# than or equal. This allows the batcher to switch to the
# destination table while getting the last ID of the batch
# when the previous batch included the end of the replicable
# table but there are orphaned registries where the foreign key
# ids are higher than the last replicable id;
#
# @param [Integer] batch_first_id the first ID of the batch
# @return [Integer, Boolean] A tuple with the the last ID of the batch (not the table),
# and whether or not have more rows to check in the table
def
get_destination_batch_last_id
(
batch_first_id
)
sql
=
<<~
SQL
SELECT MAX(batch.
#{
source_foreign_key
}
) AS batch_last_id,
EXISTS (
SELECT
#{
source_foreign_key
}
FROM
#{
destination_class
.
table_name
}
WHERE
#{
source_foreign_key
}
>= MAX(batch.
#{
source_foreign_key
}
)
) AS more_rows
FROM (
SELECT
#{
source_foreign_key
}
FROM
#{
destination_class
.
table_name
}
WHERE
#{
source_foreign_key
}
>=
#{
batch_first_id
}
ORDER BY
#{
source_foreign_key
}
LIMIT
#{
batch_size
}
) AS batch;
SQL
result
=
destination_class
.
connection
.
exec_query
(
sql
).
first
[
result
[
"batch_last_id"
],
result
[
"more_rows"
]]
end
def
reset
set_cursor_id
(
1
)
end
def
increment_batch
(
batch_last_id
)
set_cursor_id
(
batch_last_id
+
1
)
end
# @private
#
# @return [Integer] the cursor ID, or 1 if it is not set
def
cursor_id
Rails
.
cache
.
fetch
(
"
#{
cache_key
}
:cursor_id"
)
||
1
end
def
set_cursor_id
(
id
)
Rails
.
cache
.
write
(
"
#{
cache_key
}
:cursor_id"
,
id
)
end
def
cache_key
@cache_key
||=
"
#{
self
.
class
.
name
.
parameterize
}
:
#{
destination_class
.
name
.
parameterize
}
:
#{
key
}
:cursor_id"
end
end
end
end
ee/lib/gitlab/geo/registry_batcher.rb
View file @
03a081a0
...
...
@@ -2,156 +2,9 @@
module
Gitlab
module
Geo
# Returns an ID range to allow iteration over a registry table and its
# source replicable table. Repeats from the beginning after it reaches
# the end.
#
# Used by Geo in particular to iterate over a replicable and its registry
# table.
#
# Tracks a cursor for each table, by "key". If the table is smaller than
# batch_size, then a range for the whole table is returned on every call.
class
RegistryBatcher
# @param [Class] registry_class the class of the table to iterate on
# @param [String] key to identify the cursor. Note, cursor is already unique
# per table.
# @param [Integer] batch_size to limit the number of records in a batch
class
RegistryBatcher
<
BaseBatcher
def
initialize
(
registry_class
,
key
:,
batch_size:
1000
)
@model_class
=
registry_class
::
MODEL_CLASS
@model_foreign_key
=
registry_class
::
MODEL_FOREIGN_KEY
@registry_class
=
registry_class
@key
=
key
@batch_size
=
batch_size
end
# @return [Range] a range of IDs. `nil` if 0 records at or after the cursor.
def
next_range!
batch_first_id
=
cursor_id
batch_last_id
=
get_batch_last_id
(
batch_first_id
)
return
unless
batch_last_id
batch_first_id
..
batch_last_id
end
private
attr_reader
:model_class
,
:model_foreign_key
,
:registry_class
,
:key
,
:batch_size
# @private
#
# Get the last ID of the batch. Increment the cursor or reset it if at end.
#
# @param [Integer] batch_first_id the first ID of the batch
# @return [Integer] batch_last_id the last ID of the batch (not the table)
def
get_batch_last_id
(
batch_first_id
)
model_class_last_id
,
more_records
=
get_model_batch_last_id
(
batch_first_id
)
registry_class_last_id
,
more_registries
=
get_registry_batch_last_id
(
batch_first_id
)
batch_last_id
=
if
!
more_records
&&
more_registries
registry_class_last_id
else
model_class_last_id
end
if
more_records
||
more_registries
increment_batch
(
batch_last_id
)
else
reset
if
batch_first_id
>
1
end
batch_last_id
end
# @private
#
# Get the last ID of the of the batch (not the table) for the replicable
# and check if there are more rows in the table.
#
# @param [Integer] batch_first_id the first ID of the batch
# @return [Integer, Boolean] A tuple with the the last ID of the batch (not the table),
# and whether or not have more rows to check in the table
def
get_model_batch_last_id
(
batch_first_id
)
sql
=
<<~
SQL
SELECT MAX(batch.
#{
model_class
.
primary_key
}
) AS batch_last_id,
EXISTS (
SELECT
#{
model_class
.
primary_key
}
FROM
#{
model_class
.
table_name
}
WHERE
#{
model_class
.
primary_key
}
> MAX(batch.
#{
model_class
.
primary_key
}
)
) AS more_rows
FROM (
SELECT
#{
model_class
.
primary_key
}
FROM
#{
model_class
.
table_name
}
WHERE
#{
model_class
.
primary_key
}
>=
#{
batch_first_id
}
ORDER BY
#{
model_class
.
primary_key
}
LIMIT
#{
batch_size
}
) AS batch;
SQL
result
=
model_class
.
connection
.
exec_query
(
sql
).
first
[
result
[
"batch_last_id"
],
result
[
"more_rows"
]]
end
# @private
#
# Get the last ID of the of the batch (not the table) for the registry
# and check if there are more rows in the table.
#
# This query differs from the replicable query by:
#
# - We check against the foreign key IDs not the registry IDs;
# - In the where clause of the more_rows part, we use greater
# than or equal. This allows the batcher to switch to the
# registry table while getting the last ID of the batch
# when the previous batch included the end of the replicable
# table but there are orphaned registries where the foreign key
# ids are higher than the last replicable id;
#
# @param [Integer] batch_first_id the first ID of the batch
# @return [Integer, Boolean] A tuple with the the last ID of the batch (not the table),
# and whether or not have more rows to check in the table
def
get_registry_batch_last_id
(
batch_first_id
)
sql
=
<<~
SQL
SELECT MAX(batch.
#{
model_foreign_key
}
) AS batch_last_id,
EXISTS (
SELECT
#{
model_foreign_key
}
FROM
#{
registry_class
.
table_name
}
WHERE
#{
model_foreign_key
}
>= MAX(batch.
#{
model_foreign_key
}
)
) AS more_rows
FROM (
SELECT
#{
model_foreign_key
}
FROM
#{
registry_class
.
table_name
}
WHERE
#{
model_foreign_key
}
>=
#{
batch_first_id
}
ORDER BY
#{
model_foreign_key
}
LIMIT
#{
batch_size
}
) AS batch;
SQL
result
=
registry_class
.
connection
.
exec_query
(
sql
).
first
[
result
[
"batch_last_id"
],
result
[
"more_rows"
]]
end
def
reset
set_cursor_id
(
1
)
end
def
increment_batch
(
batch_last_id
)
set_cursor_id
(
batch_last_id
+
1
)
end
# @private
#
# @return [Integer] the cursor ID, or 1 if it is not set
def
cursor_id
Rails
.
cache
.
fetch
(
"
#{
cache_key
}
:cursor_id"
)
||
1
end
def
set_cursor_id
(
id
)
Rails
.
cache
.
write
(
"
#{
cache_key
}
:cursor_id"
,
id
)
end
def
cache_key
@cache_key
||=
"
#{
self
.
class
.
name
.
parameterize
}
:
#{
registry_class
.
name
.
parameterize
}
:
#{
key
}
:cursor_id"
super
(
registry_class
::
MODEL_CLASS
,
registry_class
,
registry_class
::
MODEL_FOREIGN_KEY
,
key:
key
,
batch_size:
batch_size
)
end
end
end
...
...
ee/spec/lib/gitlab/geo/registry_batcher_spec.rb
View file @
03a081a0
...
...
@@ -5,170 +5,9 @@ require 'spec_helper'
RSpec
.
describe
Gitlab
::
Geo
::
RegistryBatcher
,
:geo
,
:use_clean_rails_memory_store_caching
do
include
EE
::
GeoHelpers
describe
'#next_range!'
do
let
(
:model_class
)
{
LfsObject
}
let
(
:model_foreign_key
)
{
registry_class
::
MODEL_FOREIGN_KEY
}
let
(
:registry_class
)
{
Geo
::
LfsObjectRegistry
}
let
(
:registry_class_factory
)
{
registry_factory_name
(
registry_class
)
}
let
(
:key
)
{
'looping_batcher_spec'
}
let
(
:batch_size
)
{
2
}
let
(
:source_class
)
{
LfsObject
}
let
(
:destination_class
)
{
Geo
::
LfsObjectRegistry
}
let
(
:destination_class_factory
)
{
registry_factory_name
(
destination_class
)
}
subject
{
described_class
.
new
(
registry_class
,
key:
key
,
batch_size:
batch_size
).
next_range!
}
context
'when there are no records'
do
it
{
is_expected
.
to
be_nil
}
end
context
'when there are no records but there are orphaned registries'
do
let!
(
:registries
)
{
create_list
(
registry_class_factory
,
3
)
}
context
'when it has never been called before'
do
it
{
is_expected
.
to
be_a
Range
}
it
'starts from the beginning'
do
expect
(
subject
.
first
).
to
eq
(
1
)
end
it
'ends at a full batch'
do
expect
(
subject
.
last
).
to
eq
(
registries
.
second
.
public_send
(
model_foreign_key
))
end
context
'when the batch size is greater than the number of registries'
do
let
(
:batch_size
)
{
5
}
it
'ends at the last ID'
do
expect
(
subject
.
last
).
to
eq
(
registries
.
last
.
public_send
(
model_foreign_key
))
end
end
end
context
'when it was called before'
do
context
'when the previous batch included the end of the table'
do
before
do
described_class
.
new
(
registry_class
,
key:
key
,
batch_size:
registry_class
.
count
).
next_range!
end
it
{
is_expected
.
to
be_nil
}
end
context
'when the previous batch did not include the end of the table'
do
before
do
described_class
.
new
(
registry_class
,
key:
key
,
batch_size:
registry_class
.
count
-
1
).
next_range!
end
it
'starts after the previous batch'
do
expect
(
subject
).
to
eq
(
registries
.
last
.
public_send
(
model_foreign_key
)
..
registries
.
last
.
public_send
(
model_foreign_key
))
end
end
context
'if cache is cleared'
do
before
do
described_class
.
new
(
registry_class
,
key:
key
,
batch_size:
batch_size
).
next_range!
end
it
'starts from the beginning'
do
Rails
.
cache
.
clear
expect
(
subject
).
to
eq
(
1
..
registries
.
second
.
public_send
(
model_foreign_key
))
end
end
end
end
context
'when there are records'
do
let!
(
:records
)
{
create_list
(
model_class
.
underscore
,
3
)
}
context
'when it has never been called before'
do
it
{
is_expected
.
to
be_a
Range
}
it
'starts from the beginning'
do
expect
(
subject
.
first
).
to
eq
(
1
)
end
it
'ends at a full batch'
do
expect
(
subject
.
last
).
to
eq
(
records
.
second
.
id
)
end
context
'when the batch size is greater than the number of records'
do
let
(
:batch_size
)
{
5
}
it
'ends at the last ID'
do
expect
(
subject
.
last
).
to
eq
(
records
.
last
.
id
)
end
end
end
context
'when it was called before'
do
context
'when the previous batch included the end of the table'
do
before
do
described_class
.
new
(
registry_class
,
key:
key
,
batch_size:
model_class
.
count
).
next_range!
end
it
'starts from the beginning'
do
expect
(
subject
).
to
eq
(
1
..
records
.
second
.
id
)
end
end
context
'when the previous batch did not include the end of the table'
do
before
do
described_class
.
new
(
registry_class
,
key:
key
,
batch_size:
model_class
.
count
-
1
).
next_range!
end
it
'starts after the previous batch'
do
expect
(
subject
).
to
eq
(
records
.
last
.
id
..
records
.
last
.
id
)
end
end
context
'if cache is cleared'
do
before
do
described_class
.
new
(
registry_class
,
key:
key
,
batch_size:
batch_size
).
next_range!
end
it
'starts from the beginning'
do
Rails
.
cache
.
clear
expect
(
subject
).
to
eq
(
1
..
records
.
second
.
id
)
end
end
end
end
context
'when there are records and orphaned registries with foreign key greater than last record id'
do
let!
(
:records
)
{
create_list
(
model_class
.
underscore
,
3
)
}
let
(
:orphaned_registry_foreign_key_id
)
{
records
.
last
.
id
}
let!
(
:registry
)
{
create
(
registry_class_factory
,
model_foreign_key
=>
orphaned_registry_foreign_key_id
)
}
before
do
model_class
.
where
(
id:
orphaned_registry_foreign_key_id
).
delete_all
end
context
'when it has never been called before'
do
it
{
is_expected
.
to
be_a
Range
}
it
'starts from the beginning'
do
expect
(
subject
.
first
).
to
eq
(
1
)
end
it
'ends at the last registry foreign key ID'
do
expect
(
subject
.
last
).
to
eq
(
orphaned_registry_foreign_key_id
)
end
end
context
'when it was called before'
do
before
do
described_class
.
new
(
registry_class
,
key:
key
,
batch_size:
batch_size
).
next_range!
end
it
{
is_expected
.
to
be_nil
}
context
'if cache is cleared'
do
it
'starts from the beginning'
do
Rails
.
cache
.
clear
expect
(
subject
).
to
eq
(
1
..
orphaned_registry_foreign_key_id
)
end
end
end
end
end
include_examples
'is a Geo batcher'
end
ee/spec/support/shared_examples/models/geo_batcher_shared_examples.rb
0 → 100644
View file @
03a081a0
# frozen_string_literal: true
RSpec
.
shared_examples
'is a Geo batcher'
do
include
EE
::
GeoHelpers
describe
'#next_range!'
do
let
(
:batcher
)
{
described_class
.
new
(
destination_class
,
key:
key
,
batch_size:
batch_size
)
}
let
(
:source_foreign_key
)
{
batcher
.
send
(
:source_foreign_key
)
}
let
(
:key
)
{
'looping_batcher_spec'
}
let
(
:batch_size
)
{
2
}
subject
{
batcher
.
next_range!
}
context
'when there are no records'
do
it
{
is_expected
.
to
be_nil
}
end
context
'when there are no records but there are orphaned destination_records'
do
let!
(
:destination_records
)
{
create_list
(
destination_class_factory
,
3
)
}
context
'when it has never been called before'
do
it
{
is_expected
.
to
be_a
Range
}
it
'starts from the beginning'
do
expect
(
subject
.
first
).
to
eq
(
1
)
end
it
'ends at a full batch'
do
expect
(
subject
.
last
).
to
eq
(
destination_records
.
second
.
public_send
(
source_foreign_key
))
end
context
'when the batch size is greater than the number of destination_records'
do
let
(
:batch_size
)
{
5
}
it
'ends at the last ID'
do
expect
(
subject
.
last
).
to
eq
(
destination_records
.
last
.
public_send
(
source_foreign_key
))
end
end
end
context
'when it was called before'
do
context
'when the previous batch included the end of the table'
do
before
do
described_class
.
new
(
destination_class
,
key:
key
,
batch_size:
destination_class
.
count
).
next_range!
end
it
{
is_expected
.
to
be_nil
}
end
context
'when the previous batch did not include the end of the table'
do
before
do
described_class
.
new
(
destination_class
,
key:
key
,
batch_size:
destination_class
.
count
-
1
).
next_range!
end
it
'starts after the previous batch'
do
expect
(
subject
).
to
eq
(
destination_records
.
last
.
public_send
(
source_foreign_key
)
..
destination_records
.
last
.
public_send
(
source_foreign_key
))
end
end
context
'if cache is cleared'
do
before
do
described_class
.
new
(
destination_class
,
key:
key
,
batch_size:
batch_size
).
next_range!
end
it
'starts from the beginning'
do
Rails
.
cache
.
clear
expect
(
subject
).
to
eq
(
1
..
destination_records
.
second
.
public_send
(
source_foreign_key
))
end
end
end
end
context
'when there are records'
do
let!
(
:records
)
{
create_list
(
source_class
.
underscore
,
3
)
}
context
'when it has never been called before'
do
it
{
is_expected
.
to
be_a
Range
}
it
'starts from the beginning'
do
expect
(
subject
.
first
).
to
eq
(
1
)
end
it
'ends at a full batch'
do
expect
(
subject
.
last
).
to
eq
(
records
.
second
.
id
)
end
context
'when the batch size is greater than the number of records'
do
let
(
:batch_size
)
{
5
}
it
'ends at the last ID'
do
expect
(
subject
.
last
).
to
eq
(
records
.
last
.
id
)
end
end
end
context
'when it was called before'
do
context
'when the previous batch included the end of the table'
do
before
do
described_class
.
new
(
destination_class
,
key:
key
,
batch_size:
source_class
.
count
).
next_range!
end
it
'starts from the beginning'
do
expect
(
subject
).
to
eq
(
1
..
records
.
second
.
id
)
end
end
context
'when the previous batch did not include the end of the table'
do
before
do
described_class
.
new
(
destination_class
,
key:
key
,
batch_size:
source_class
.
count
-
1
).
next_range!
end
it
'starts after the previous batch'
do
expect
(
subject
).
to
eq
(
records
.
last
.
id
..
records
.
last
.
id
)
end
end
context
'if cache is cleared'
do
before
do
described_class
.
new
(
destination_class
,
key:
key
,
batch_size:
batch_size
).
next_range!
end
it
'starts from the beginning'
do
Rails
.
cache
.
clear
expect
(
subject
).
to
eq
(
1
..
records
.
second
.
id
)
end
end
end
end
context
'when there are records and orphaned destination_records with foreign key greater than last record id'
do
let!
(
:records
)
{
create_list
(
source_class
.
underscore
,
3
)
}
let
(
:orphaned_destination_foreign_key_id
)
{
records
.
last
.
id
}
let!
(
:destination
)
{
create
(
destination_class_factory
,
source_foreign_key
=>
orphaned_destination_foreign_key_id
)
}
before
do
source_class
.
where
(
id:
orphaned_destination_foreign_key_id
).
delete_all
end
context
'when it has never been called before'
do
it
{
is_expected
.
to
be_a
Range
}
it
'starts from the beginning'
do
expect
(
subject
.
first
).
to
eq
(
1
)
end
it
'ends at the last destination foreign key ID'
do
expect
(
subject
.
last
).
to
eq
(
orphaned_destination_foreign_key_id
)
end
end
context
'when it was called before'
do
before
do
described_class
.
new
(
destination_class
,
key:
key
,
batch_size:
batch_size
).
next_range!
end
it
{
is_expected
.
to
be_nil
}
context
'if cache is cleared'
do
it
'starts from the beginning'
do
Rails
.
cache
.
clear
expect
(
subject
).
to
eq
(
1
..
orphaned_destination_foreign_key_id
)
end
end
end
end
end
end
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment