Commit c5de19b9 authored by Nick Thomas's avatar Nick Thomas

Implement selective synchronization by repository shard for Geo

parent 65db1a81
---
title: Implement selective synchronization by repository shard for Geo
merge_request: 4286
author:
type: added
...@@ -1014,6 +1014,8 @@ ActiveRecord::Schema.define(version: 20180131104538) do ...@@ -1014,6 +1014,8 @@ ActiveRecord::Schema.define(version: 20180131104538) do
t.integer "files_max_capacity", default: 10, null: false t.integer "files_max_capacity", default: 10, null: false
t.integer "repos_max_capacity", default: 25, null: false t.integer "repos_max_capacity", default: 25, null: false
t.string "url", null: false t.string "url", null: false
t.string "selective_sync_type"
t.text "selective_sync_shards"
end end
add_index "geo_nodes", ["access_key"], name: "index_geo_nodes_on_access_key", using: :btree add_index "geo_nodes", ["access_key"], name: "index_geo_nodes_on_access_key", using: :btree
......
...@@ -68,7 +68,16 @@ module Geo ...@@ -68,7 +68,16 @@ module Geo
private private
def group_uploads def group_uploads
namespace_ids = Gitlab::GroupHierarchy.new(current_node.namespaces).base_and_descendants.select(:id) namespace_ids =
if current_node.selective_sync_by_namespaces?
Gitlab::GroupHierarchy.new(current_node.namespaces).base_and_descendants.select(:id)
elsif current_node.selective_sync_by_shards?
leaf_groups = Namespace.where(id: current_node.projects.select(:namespace_id))
Gitlab::GroupHierarchy.new(leaf_groups).base_and_ancestors.select(:id)
else
Namespace.none
end
arel_namespace_ids = Arel::Nodes::SqlLiteral.new(namespace_ids.to_sql) arel_namespace_ids = Arel::Nodes::SqlLiteral.new(namespace_ids.to_sql)
upload_table[:model_type].eq('Namespace').and(upload_table[:model_id].in(arel_namespace_ids)) upload_table[:model_type].eq('Namespace').and(upload_table[:model_id].in(arel_namespace_ids))
......
class GeoNode < ActiveRecord::Base class GeoNode < ActiveRecord::Base
include Presentable include Presentable
SELECTIVE_SYNC_TYPES = %w[namespaces shards].freeze
# Array of repository storages to synchronize for selective sync by shards
serialize :selective_sync_shards, Array # rubocop:disable Cop/ActiveRecordSerialize
belongs_to :oauth_application, class_name: 'Doorkeeper::Application', dependent: :destroy # rubocop: disable Cop/ActiveRecordDependent belongs_to :oauth_application, class_name: 'Doorkeeper::Application', dependent: :destroy # rubocop: disable Cop/ActiveRecordDependent
has_many :geo_node_namespace_links has_many :geo_node_namespace_links
...@@ -18,6 +23,12 @@ class GeoNode < ActiveRecord::Base ...@@ -18,6 +23,12 @@ class GeoNode < ActiveRecord::Base
validates :access_key, presence: true validates :access_key, presence: true
validates :encrypted_secret_access_key, presence: true validates :encrypted_secret_access_key, presence: true
validates :selective_sync_type, inclusion: {
in: SELECTIVE_SYNC_TYPES,
allow_blank: true,
allow_nil: true
}
validate :check_not_adding_primary_as_secondary, if: :secondary? validate :check_not_adding_primary_as_secondary, if: :secondary?
after_save :expire_cache! after_save :expire_cache!
...@@ -119,13 +130,26 @@ class GeoNode < ActiveRecord::Base ...@@ -119,13 +130,26 @@ class GeoNode < ActiveRecord::Base
end end
def projects def projects
if selective_sync? return Project.all unless selective_sync?
Project.where(namespace_id: Gitlab::GroupHierarchy.new(namespaces).base_and_descendants.select(:id))
if selective_sync_by_namespaces?
query = Gitlab::GroupHierarchy.new(namespaces).base_and_descendants
Project.where(namespace_id: query.select(:id))
elsif selective_sync_by_shards?
Project.where(repository_storage: selective_sync_shards)
else else
Project.all Project.none
end end
end end
def selective_sync_by_namespaces?
selective_sync_type == 'namespaces'
end
def selective_sync_by_shards?
selective_sync_type == 'shards'
end
def projects_include?(project_id) def projects_include?(project_id)
return true unless selective_sync? return true unless selective_sync?
...@@ -133,7 +157,7 @@ class GeoNode < ActiveRecord::Base ...@@ -133,7 +157,7 @@ class GeoNode < ActiveRecord::Base
end end
def selective_sync? def selective_sync?
namespaces.exists? selective_sync_type.present?
end end
def replication_slots_count def replication_slots_count
......
class GeoNodeStatus < ActiveRecord::Base class GeoNodeStatus < ActiveRecord::Base
belongs_to :geo_node belongs_to :geo_node
delegate :selective_sync_type, to: :geo_node
# Whether we were successful in reaching this node # Whether we were successful in reaching this node
attr_accessor :success, :version, :revision attr_accessor :success, :version, :revision
attr_writer :health_status attr_writer :health_status
......
module Geo module Geo
class NodeUpdateService class NodeUpdateService
attr_reader :geo_node, :old_namespace_ids, :params attr_reader :geo_node, :old_namespace_ids, :old_shards, :params
def initialize(geo_node, params) def initialize(geo_node, params)
@geo_node = geo_node @geo_node = geo_node
@old_namespace_ids = geo_node.namespace_ids @old_namespace_ids = geo_node.namespace_ids
@old_shards = geo_node.selective_sync_shards
@params = params.dup @params = params.dup
@params[:namespace_ids] = @params[:namespace_ids].to_s.split(',') @params[:namespace_ids] = @params[:namespace_ids].to_s.split(',')
end end
def execute def execute
return false unless geo_node.update(params) return false unless geo_node.update(params)
if geo_node.secondary? && namespaces_changed?(geo_node) Geo::RepositoriesChangedEventStore.new(geo_node).create if selective_sync_changed?
Geo::RepositoriesChangedEventStore.new(geo_node).create
end
true true
end end
private private
def namespaces_changed?(geo_node) def selective_sync_changed?
return false unless geo_node.secondary?
geo_node.selective_sync_type_changed? ||
selective_sync_by_namespaces_changed? ||
selective_sync_by_shards_changed?
end
def selective_sync_by_namespaces_changed?
return false unless geo_node.selective_sync_by_namespaces?
geo_node.namespace_ids.any? && geo_node.namespace_ids != old_namespace_ids geo_node.namespace_ids.any? && geo_node.namespace_ids != old_namespace_ids
end end
def selective_sync_by_shards_changed?
return false unless geo_node.selective_sync_by_shards?
geo_node.selective_sync_shards.any? && geo_node.selective_sync_shards != old_shards
end
end end
end end
class GeoSelectiveSyncByShard < ActiveRecord::Migration
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
def up
add_column :geo_nodes, :selective_sync_type, :string
add_column :geo_nodes, :selective_sync_shards, :text
# Nodes with associated namespaces should be set to 'namespaces'
connection.execute(<<~SQL)
UPDATE geo_nodes
SET selective_sync_type = 'namespaces'
WHERE id IN(
SELECT DISTINCT geo_node_id
FROM geo_node_namespace_links
)
SQL
end
def down
remove_column :geo_nodes, :selective_sync_type
remove_column :geo_nodes, :selective_sync_shards
end
end
...@@ -1226,6 +1226,9 @@ module API ...@@ -1226,6 +1226,9 @@ module API
expose :version expose :version
expose :revision expose :revision
expose :selective_sync_type
# Deprecated: remove in API v5. We use selective_sync_type instead now.
expose :namespaces, using: NamespaceBasic expose :namespaces, using: NamespaceBasic
expose :updated_at expose :updated_at
......
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
"last_event_timestamp", "last_event_timestamp",
"cursor_last_event_id", "cursor_last_event_id",
"cursor_last_event_timestamp", "cursor_last_event_timestamp",
"selective_sync_type",
"namespaces", "namespaces",
"version", "version",
"revision", "revision",
...@@ -72,6 +73,7 @@ ...@@ -72,6 +73,7 @@
"cursor_last_event_id": { "type": ["integer", "null"] }, "cursor_last_event_id": { "type": ["integer", "null"] },
"cursor_last_event_timestamp": { "type": ["integer", "null"] }, "cursor_last_event_timestamp": { "type": ["integer", "null"] },
"last_successful_status_check_timestamp": { "type": ["integer", "null"] }, "last_successful_status_check_timestamp": { "type": ["integer", "null"] },
"selective_sync_type": { "type": ["string", "null"] },
"namespaces": { "type": "array" }, "namespaces": { "type": "array" },
"storage_shards": { "type": "array" }, "storage_shards": { "type": "array" },
"storage_shards_match": { "type": "boolean" }, "storage_shards_match": { "type": "boolean" },
......
...@@ -9,7 +9,7 @@ describe Geo::AttachmentRegistryFinder, :geo do ...@@ -9,7 +9,7 @@ describe Geo::AttachmentRegistryFinder, :geo do
let(:synced_subgroup) { create(:group, parent: synced_group) } let(:synced_subgroup) { create(:group, parent: synced_group) }
let(:unsynced_group) { create(:group) } let(:unsynced_group) { create(:group) }
let(:synced_project) { create(:project, group: synced_group) } let(:synced_project) { create(:project, group: synced_group) }
let(:unsynced_project) { create(:project, group: unsynced_group) } let(:unsynced_project) { create(:project, group: unsynced_group, repository_storage: 'broken') }
let!(:upload_1) { create(:upload, model: synced_group) } let!(:upload_1) { create(:upload, model: synced_group) }
let!(:upload_2) { create(:upload, model: unsynced_group) } let!(:upload_2) { create(:upload, model: unsynced_group) }
...@@ -54,21 +54,12 @@ describe Geo::AttachmentRegistryFinder, :geo do ...@@ -54,21 +54,12 @@ describe Geo::AttachmentRegistryFinder, :geo do
end end
context 'with selective sync' do context 'with selective sync' do
it 'returns synced avatars, attachment, personal snippets and files' do it 'falls back to legacy queries' do
create(:geo_file_registry, :avatar, file_id: upload_1.id) secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
create(:geo_file_registry, :avatar, file_id: upload_2.id)
create(:geo_file_registry, :avatar, file_id: upload_3.id)
create(:geo_file_registry, :avatar, file_id: upload_4.id)
create(:geo_file_registry, :avatar, file_id: upload_5.id, success: false)
create(:geo_file_registry, :avatar, file_id: upload_6.id)
create(:geo_file_registry, :avatar, file_id: upload_7.id)
create(:geo_file_registry, :lfs, file_id: lfs_object.id)
secondary.update_attribute(:namespaces, [synced_group]) expect(subject).to receive(:legacy_find_synced_attachments)
synced_attachments = subject.find_synced_attachments
expect(synced_attachments.pluck(:id)).to match_array([upload_1.id, upload_3.id, upload_6.id, upload_7.id]) subject.find_synced_attachments
end end
end end
end end
...@@ -94,21 +85,12 @@ describe Geo::AttachmentRegistryFinder, :geo do ...@@ -94,21 +85,12 @@ describe Geo::AttachmentRegistryFinder, :geo do
end end
context 'with selective sync' do context 'with selective sync' do
it 'returns failed avatars, attachment, personal snippets and files' do it 'falls back to legacy queries' do
create(:geo_file_registry, :avatar, file_id: upload_1.id, success: false) secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
create(:geo_file_registry, :avatar, file_id: upload_2.id)
create(:geo_file_registry, :avatar, file_id: upload_3.id, success: false)
create(:geo_file_registry, :avatar, file_id: upload_4.id)
create(:geo_file_registry, :avatar, file_id: upload_5.id)
create(:geo_file_registry, :avatar, file_id: upload_6.id, success: false)
create(:geo_file_registry, :avatar, file_id: upload_7.id, success: false)
create(:geo_file_registry, :lfs, file_id: lfs_object.id, success: false)
secondary.update_attribute(:namespaces, [synced_group])
failed_attachments = subject.find_failed_attachments expect(subject).to receive(:legacy_find_failed_attachments)
expect(failed_attachments.pluck(:id)).to match_array([upload_1.id, upload_3.id, upload_6.id, upload_7.id]) subject.find_failed_attachments
end end
end end
end end
...@@ -163,7 +145,7 @@ describe Geo::AttachmentRegistryFinder, :geo do ...@@ -163,7 +145,7 @@ describe Geo::AttachmentRegistryFinder, :geo do
expect(synced_attachments).to match_array([upload_1, upload_2, upload_6, upload_7]) expect(synced_attachments).to match_array([upload_1, upload_2, upload_6, upload_7])
end end
context 'with selective sync' do context 'with selective sync by namespace' do
it 'returns synced avatars, attachment, personal snippets and files' do it 'returns synced avatars, attachment, personal snippets and files' do
create(:geo_file_registry, :avatar, file_id: upload_1.id) create(:geo_file_registry, :avatar, file_id: upload_1.id)
create(:geo_file_registry, :avatar, file_id: upload_2.id) create(:geo_file_registry, :avatar, file_id: upload_2.id)
...@@ -174,13 +156,32 @@ describe Geo::AttachmentRegistryFinder, :geo do ...@@ -174,13 +156,32 @@ describe Geo::AttachmentRegistryFinder, :geo do
create(:geo_file_registry, :avatar, file_id: upload_7.id) create(:geo_file_registry, :avatar, file_id: upload_7.id)
create(:geo_file_registry, :lfs, file_id: lfs_object.id) create(:geo_file_registry, :lfs, file_id: lfs_object.id)
secondary.update_attribute(:namespaces, [synced_group]) secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
synced_attachments = subject.find_synced_attachments synced_attachments = subject.find_synced_attachments
expect(synced_attachments).to match_array([upload_1, upload_3, upload_6, upload_7]) expect(synced_attachments).to match_array([upload_1, upload_3, upload_6, upload_7])
end end
end end
context 'with selective sync by shard' do
it 'returns synced avatars, attachment, personal snippets and files' do
create(:geo_file_registry, :avatar, file_id: upload_1.id)
create(:geo_file_registry, :avatar, file_id: upload_2.id)
create(:geo_file_registry, :avatar, file_id: upload_3.id)
create(:geo_file_registry, :avatar, file_id: upload_4.id)
create(:geo_file_registry, :avatar, file_id: upload_5.id, success: false)
create(:geo_file_registry, :avatar, file_id: upload_6.id)
create(:geo_file_registry, :avatar, file_id: upload_7.id)
create(:geo_file_registry, :lfs, file_id: lfs_object.id)
secondary.update!(selective_sync_type: 'shards', selective_sync_shards: ['default'])
synced_attachments = subject.find_synced_attachments
expect(synced_attachments).to match_array([upload_1, upload_3, upload_6])
end
end
end end
describe '#find_failed_attachments' do describe '#find_failed_attachments' do
...@@ -203,24 +204,43 @@ describe Geo::AttachmentRegistryFinder, :geo do ...@@ -203,24 +204,43 @@ describe Geo::AttachmentRegistryFinder, :geo do
expect(failed_attachments).to match_array([upload_3, upload_6, upload_7]) expect(failed_attachments).to match_array([upload_3, upload_6, upload_7])
end end
context 'with selective sync' do context 'with selective sync by namespace' do
it 'returns failed avatars, attachment, personal snippets and files' do it 'returns failed avatars, attachment, personal snippets and files' do
create(:geo_file_registry, :avatar, file_id: upload_1.id, success: false) create(:geo_file_registry, :avatar, file_id: upload_1.id, success: false)
create(:geo_file_registry, :avatar, file_id: upload_2.id) create(:geo_file_registry, :avatar, file_id: upload_2.id)
create(:geo_file_registry, :avatar, file_id: upload_3.id, success: false) create(:geo_file_registry, :avatar, file_id: upload_3.id, success: false)
create(:geo_file_registry, :avatar, file_id: upload_4.id) create(:geo_file_registry, :avatar, file_id: upload_4.id, success: false)
create(:geo_file_registry, :avatar, file_id: upload_5.id) create(:geo_file_registry, :avatar, file_id: upload_5.id)
create(:geo_file_registry, :avatar, file_id: upload_6.id, success: false) create(:geo_file_registry, :avatar, file_id: upload_6.id, success: false)
create(:geo_file_registry, :avatar, file_id: upload_7.id, success: false) create(:geo_file_registry, :avatar, file_id: upload_7.id, success: false)
create(:geo_file_registry, :lfs, file_id: lfs_object.id, success: false) create(:geo_file_registry, :lfs, file_id: lfs_object.id, success: false)
secondary.update_attribute(:namespaces, [synced_group]) secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
failed_attachments = subject.find_failed_attachments failed_attachments = subject.find_failed_attachments
expect(failed_attachments).to match_array([upload_1, upload_3, upload_6, upload_7]) expect(failed_attachments).to match_array([upload_1, upload_3, upload_6, upload_7])
end end
end end
context 'with selective sync by shard' do
it 'returns failed avatars, attachment, personal snippets and files' do
create(:geo_file_registry, :avatar, file_id: upload_1.id, success: false)
create(:geo_file_registry, :avatar, file_id: upload_2.id)
create(:geo_file_registry, :avatar, file_id: upload_3.id, success: false)
create(:geo_file_registry, :avatar, file_id: upload_4.id, success: false)
create(:geo_file_registry, :avatar, file_id: upload_5.id)
create(:geo_file_registry, :avatar, file_id: upload_6.id, success: false)
create(:geo_file_registry, :avatar, file_id: upload_7.id, success: false)
create(:geo_file_registry, :lfs, file_id: lfs_object.id, success: false)
secondary.update!(selective_sync_type: 'shards', selective_sync_shards: ['default'])
failed_attachments = subject.find_failed_attachments
expect(failed_attachments).to match_array([upload_1, upload_3, upload_6])
end
end
end end
describe '#find_unsynced_attachments' do describe '#find_unsynced_attachments' do
......
...@@ -36,7 +36,7 @@ describe Geo::JobArtifactRegistryFinder, :geo do ...@@ -36,7 +36,7 @@ describe Geo::JobArtifactRegistryFinder, :geo do
context 'with selective sync' do context 'with selective sync' do
before do before do
secondary.update_attribute(:namespaces, [synced_group]) secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
end end
it 'delegates to #legacy_find_synced_job_artifacts' do it 'delegates to #legacy_find_synced_job_artifacts' do
...@@ -72,7 +72,7 @@ describe Geo::JobArtifactRegistryFinder, :geo do ...@@ -72,7 +72,7 @@ describe Geo::JobArtifactRegistryFinder, :geo do
context 'with selective sync' do context 'with selective sync' do
before do before do
secondary.update_attribute(:namespaces, [synced_group]) secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
end end
it 'delegates to #legacy_find_failed_job_artifacts' do it 'delegates to #legacy_find_failed_job_artifacts' do
......
...@@ -36,7 +36,7 @@ describe Geo::LfsObjectRegistryFinder, :geo do ...@@ -36,7 +36,7 @@ describe Geo::LfsObjectRegistryFinder, :geo do
context 'with selective sync' do context 'with selective sync' do
before do before do
secondary.update_attribute(:namespaces, [synced_group]) secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
end end
it 'delegates to #legacy_find_synced_lfs_objects' do it 'delegates to #legacy_find_synced_lfs_objects' do
...@@ -78,7 +78,7 @@ describe Geo::LfsObjectRegistryFinder, :geo do ...@@ -78,7 +78,7 @@ describe Geo::LfsObjectRegistryFinder, :geo do
context 'with selective sync' do context 'with selective sync' do
before do before do
secondary.update_attribute(:namespaces, [synced_group]) secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
end end
it 'delegates to #legacy_find_failed_lfs_objects' do it 'delegates to #legacy_find_failed_lfs_objects' do
......
...@@ -34,7 +34,7 @@ describe Geo::ProjectRegistryFinder, :geo do ...@@ -34,7 +34,7 @@ describe Geo::ProjectRegistryFinder, :geo do
context 'with selective sync' do context 'with selective sync' do
before do before do
secondary.update_attribute(:namespaces, [synced_group]) secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
end end
it 'delegates to #legacy_find_synced_repositories' do it 'delegates to #legacy_find_synced_repositories' do
...@@ -85,7 +85,7 @@ describe Geo::ProjectRegistryFinder, :geo do ...@@ -85,7 +85,7 @@ describe Geo::ProjectRegistryFinder, :geo do
context 'with selective sync' do context 'with selective sync' do
before do before do
secondary.update_attribute(:namespaces, [synced_group]) secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
end end
it 'delegates to #legacy_find_synced_wiki' do it 'delegates to #legacy_find_synced_wiki' do
...@@ -125,7 +125,7 @@ describe Geo::ProjectRegistryFinder, :geo do ...@@ -125,7 +125,7 @@ describe Geo::ProjectRegistryFinder, :geo do
context 'with selective sync' do context 'with selective sync' do
before do before do
secondary.update_attribute(:namespaces, [synced_group]) secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
end end
it 'delegates to #find_failed_repositories' do it 'delegates to #find_failed_repositories' do
...@@ -165,7 +165,7 @@ describe Geo::ProjectRegistryFinder, :geo do ...@@ -165,7 +165,7 @@ describe Geo::ProjectRegistryFinder, :geo do
context 'with selective sync' do context 'with selective sync' do
before do before do
secondary.update_attribute(:namespaces, [synced_group]) secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
end end
it 'delegates to #find_failed_wikis' do it 'delegates to #find_failed_wikis' do
...@@ -212,7 +212,7 @@ describe Geo::ProjectRegistryFinder, :geo do ...@@ -212,7 +212,7 @@ describe Geo::ProjectRegistryFinder, :geo do
context 'with selective sync' do context 'with selective sync' do
before do before do
secondary.update_attribute(:namespaces, [synced_group]) secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
end end
it 'delegates to #legacy_find_filtered_failed_projects' do it 'delegates to #legacy_find_filtered_failed_projects' do
...@@ -265,7 +265,7 @@ describe Geo::ProjectRegistryFinder, :geo do ...@@ -265,7 +265,7 @@ describe Geo::ProjectRegistryFinder, :geo do
end end
it 'delegates to #legacy_find_unsynced_projects when node has selective sync' do it 'delegates to #legacy_find_unsynced_projects when node has selective sync' do
secondary.update_attribute(:namespaces, [synced_group]) secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
expect(subject).to receive(:legacy_find_unsynced_projects).and_call_original expect(subject).to receive(:legacy_find_unsynced_projects).and_call_original
...@@ -290,7 +290,7 @@ describe Geo::ProjectRegistryFinder, :geo do ...@@ -290,7 +290,7 @@ describe Geo::ProjectRegistryFinder, :geo do
end end
it 'delegates to #legacy_find_projects_updated_recently when node has selective sync' do it 'delegates to #legacy_find_projects_updated_recently when node has selective sync' do
secondary.update_attribute(:namespaces, [synced_group]) secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
expect(subject).to receive(:legacy_find_projects_updated_recently).and_call_original expect(subject).to receive(:legacy_find_projects_updated_recently).and_call_original
......
...@@ -204,7 +204,13 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared ...@@ -204,7 +204,13 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
end end
it 'does not replay events for projects that do not belong to selected namespaces to replicate' do it 'does not replay events for projects that do not belong to selected namespaces to replicate' do
secondary.update!(namespaces: [group_2]) secondary.update!(selective_sync_type: 'namespaces', namespaces: [group_2])
expect { daemon.run_once! }.not_to change(Geo::ProjectRegistry, :count)
end
it 'does not replay events for projects that do not belong to selected shards to replicate' do
secondary.update!(selective_sync_type: 'shards', selective_sync_shards: ['broken'])
expect { daemon.run_once! }.not_to change(Geo::ProjectRegistry, :count) expect { daemon.run_once! }.not_to change(Geo::ProjectRegistry, :count)
end end
......
...@@ -21,6 +21,10 @@ describe GeoNode, type: :model do ...@@ -21,6 +21,10 @@ describe GeoNode, type: :model do
it { is_expected.to have_many(:namespaces).through(:geo_node_namespace_links) } it { is_expected.to have_many(:namespaces).through(:geo_node_namespace_links) }
end end
context 'validations' do
it { is_expected.to validate_inclusion_of(:selective_sync_type).in_array([nil, *GeoNode::SELECTIVE_SYNC_TYPES]) }
end
context 'default values' do context 'default values' do
where(:attribute, :value) do where(:attribute, :value) do
:url | Gitlab::Routing.url_helpers.root_url :url | Gitlab::Routing.url_helpers.root_url
...@@ -254,27 +258,43 @@ describe GeoNode, type: :model do ...@@ -254,27 +258,43 @@ describe GeoNode, type: :model do
end end
describe '#projects_include?' do describe '#projects_include?' do
let(:unsynced_project) { create(:project) } let(:unsynced_project) { create(:project, repository_storage: 'broken') }
it 'returns true without namespace restrictions' do it 'returns true without selective sync' do
expect(node.projects_include?(unsynced_project.id)).to eq true expect(node.projects_include?(unsynced_project.id)).to eq true
end end
context 'with namespace restrictions' do context 'selective sync by namespaces' do
let(:synced_group) { create(:group) } let(:synced_group) { create(:group) }
before do before do
node.update_attribute(:namespaces, [synced_group]) node.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
end end
it 'returns true when project belongs to one of the namespaces' do it 'returns true when project belongs to one of the namespaces' do
project_in_synced_group = create(:project, group: synced_group) project_in_synced_group = create(:project, group: synced_group)
expect(node.projects_include?(project_in_synced_group.id)).to eq true expect(node.projects_include?(project_in_synced_group.id)).to be_truthy
end
it 'returns false when project does not belong to one of the namespaces' do
expect(node.projects_include?(unsynced_project.id)).to be_falsy
end
end
context 'selective sync by shards' do
before do
node.update!(selective_sync_type: 'shards', selective_sync_shards: ['default'])
end
it 'returns true when project belongs to one of the namespaces' do
project_in_synced_shard = create(:project)
expect(node.projects_include?(project_in_synced_shard.id)).to be_truthy
end end
it 'returns false when project does not belong to one of the namespaces' do it 'returns false when project does not belong to one of the namespaces' do
expect(node.projects_include?(unsynced_project.id)).to eq false expect(node.projects_include?(unsynced_project.id)).to be_falsy
end end
end end
end end
...@@ -285,28 +305,54 @@ describe GeoNode, type: :model do ...@@ -285,28 +305,54 @@ describe GeoNode, type: :model do
let(:nested_group_1) { create(:group, parent: group_1) } let(:nested_group_1) { create(:group, parent: group_1) }
let!(:project_1) { create(:project, group: group_1) } let!(:project_1) { create(:project, group: group_1) }
let!(:project_2) { create(:project, group: nested_group_1) } let!(:project_2) { create(:project, group: nested_group_1) }
let!(:project_3) { create(:project, group: group_2) } let!(:project_3) { create(:project, group: group_2, repository_storage: 'broken') }
it 'returns all projects without selective sync' do it 'returns all projects without selective sync' do
expect(node.projects).to match_array([project_1, project_2, project_3]) expect(node.projects).to match_array([project_1, project_2, project_3])
end end
it 'returns projects that belong to the namespaces with selective sync' do it 'returns projects that belong to the namespaces with selective sync by namespace' do
node.update_attribute(:namespaces, [group_1, nested_group_1]) node.update!(selective_sync_type: 'namespaces', namespaces: [group_1, nested_group_1])
expect(node.projects).to match_array([project_1, project_2]) expect(node.projects).to match_array([project_1, project_2])
end end
it 'returns projects that belong to the shards with selective sync by shard' do
node.update!(selective_sync_type: 'shards', selective_sync_shards: ['default'])
expect(node.projects).to match_array([project_1, project_2])
end
it 'returns nothing if an unrecognised selective sync type is used' do
node.update_attribute(:selective_sync_type, 'unknown')
expect(node.projects).to be_empty
end
end end
describe '#selective_sync?' do describe '#selective_sync?' do
it 'returns true when Geo node has namespace restrictions' do subject { node.selective_sync? }
node.update_attribute(:namespaces, [create(:group)])
it 'returns true when selective sync is by namespaces' do
node.update!(selective_sync_type: 'namespaces')
is_expected.to be_truthy
end
it 'returns true when selective sync is by shards' do
node.update!(selective_sync_type: 'shards')
expect(node.selective_sync?).to be true is_expected.to be_truthy
end end
it 'returns false when Geo node does not have namespace restrictions' do it 'returns false when selective sync is disabled' do
expect(node.selective_sync?).to be false node.update!(
selective_sync_type: '',
namespaces: [create(:group)],
selective_sync_shards: ['default']
)
is_expected.to be_falsy
end end
end end
end end
...@@ -132,7 +132,7 @@ describe GeoNodeStatus, :geo do ...@@ -132,7 +132,7 @@ describe GeoNodeStatus, :geo do
end end
it 'returns the right percentage with group restrictions' do it 'returns the right percentage with group restrictions' do
secondary.update_attribute(:namespaces, [group]) secondary.update!(selective_sync_type: 'namespaces', namespaces: [group])
create(:geo_file_registry, :avatar, file_id: upload_1.id) create(:geo_file_registry, :avatar, file_id: upload_1.id)
create(:geo_file_registry, :avatar, file_id: upload_2.id) create(:geo_file_registry, :avatar, file_id: upload_2.id)
...@@ -191,7 +191,7 @@ describe GeoNodeStatus, :geo do ...@@ -191,7 +191,7 @@ describe GeoNodeStatus, :geo do
end end
it 'returns the right percentage with group restrictions' do it 'returns the right percentage with group restrictions' do
secondary.update_attribute(:namespaces, [group]) secondary.update!(selective_sync_type: 'namespaces', namespaces: [group])
create(:geo_file_registry, :lfs, file_id: lfs_object_project.lfs_object_id, success: true) create(:geo_file_registry, :lfs, file_id: lfs_object_project.lfs_object_id, success: true)
expect(subject.lfs_objects_synced_in_percentage).to be_within(0.0001).of(50) expect(subject.lfs_objects_synced_in_percentage).to be_within(0.0001).of(50)
...@@ -266,7 +266,7 @@ describe GeoNodeStatus, :geo do ...@@ -266,7 +266,7 @@ describe GeoNodeStatus, :geo do
end end
it 'returns the right number of failed repos with group restrictions' do it 'returns the right number of failed repos with group restrictions' do
secondary.update_attribute(:namespaces, [group]) secondary.update!(selective_sync_type: 'namespaces', namespaces: [group])
expect(subject.repositories_failed_count).to eq(1) expect(subject.repositories_failed_count).to eq(1)
end end
...@@ -285,7 +285,7 @@ describe GeoNodeStatus, :geo do ...@@ -285,7 +285,7 @@ describe GeoNodeStatus, :geo do
end end
it 'returns the right number of failed repos with group restrictions' do it 'returns the right number of failed repos with group restrictions' do
secondary.update_attribute(:namespaces, [group]) secondary.update!(selective_sync_type: 'namespaces', namespaces: [group])
expect(subject.wikis_failed_count).to eq(1) expect(subject.wikis_failed_count).to eq(1)
end end
...@@ -309,7 +309,7 @@ describe GeoNodeStatus, :geo do ...@@ -309,7 +309,7 @@ describe GeoNodeStatus, :geo do
end end
it 'returns the right percentage with group restrictions' do it 'returns the right percentage with group restrictions' do
secondary.update_attribute(:namespaces, [group]) secondary.update!(selective_sync_type: 'namespaces', namespaces: [group])
create(:geo_project_registry, :synced, project: project_1) create(:geo_project_registry, :synced, project: project_1)
expect(subject.repositories_synced_in_percentage).to be_within(0.0001).of(50) expect(subject.repositories_synced_in_percentage).to be_within(0.0001).of(50)
...@@ -336,7 +336,7 @@ describe GeoNodeStatus, :geo do ...@@ -336,7 +336,7 @@ describe GeoNodeStatus, :geo do
end end
it 'returns the right percentage with group restrictions' do it 'returns the right percentage with group restrictions' do
secondary.update_attribute(:namespaces, [group]) secondary.update!(selective_sync_type: 'namespaces', namespaces: [group])
create(:geo_project_registry, :synced, project: project_1) create(:geo_project_registry, :synced, project: project_1)
expect(subject.wikis_synced_in_percentage).to be_within(0.0001).of(50) expect(subject.wikis_synced_in_percentage).to be_within(0.0001).of(50)
......
require 'spec_helper' require 'spec_helper'
describe Geo::NodeUpdateService do describe Geo::NodeUpdateService do
let(:groups) { create_list(:group, 2) } set(:primary) { create(:geo_node, :primary) }
let!(:primary) { create(:geo_node, :primary) }
let(:geo_node) { create(:geo_node) } let(:geo_node) { create(:geo_node) }
let(:geo_node_with_restrictions) { create(:geo_node, namespace_ids: [groups.first.id]) } let(:groups) { create_list(:group, 2) }
let(:namespace_ids) { groups.map(&:id).join(',') }
describe '#execute' do describe '#execute' do
it 'updates the node' do it 'updates the node' do
...@@ -31,22 +32,58 @@ describe Geo::NodeUpdateService do ...@@ -31,22 +32,58 @@ describe Geo::NodeUpdateService do
expect(service.execute).to eq false expect(service.execute).to eq false
end end
it 'logs an event to the Geo event log when namespaces change' do context 'selective sync disabled' do
service = described_class.new(geo_node, namespace_ids: groups.map(&:id).join(',')) it 'does not log an event to the Geo event log when adding restrictions' do
service = described_class.new(geo_node, namespace_ids: namespace_ids, selective_sync_shards: ['default'])
expect { service.execute }.to change(Geo::RepositoriesChangedEvent, :count).by(1) expect { service.execute }.not_to change(Geo::RepositoriesChangedEvent, :count)
end
end end
it 'does not log an event to the Geo event log when removing namespace restrictions' do context 'selective sync by namespaces' do
service = described_class.new(geo_node_with_restrictions, namespace_ids: '') let(:restricted_geo_node) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [create(:group)]) }
it 'logs an event to the Geo event log when adding namespace restrictions' do
service = described_class.new(restricted_geo_node, namespace_ids: namespace_ids)
expect { service.execute }.to change(Geo::RepositoriesChangedEvent, :count).by(1)
end
it 'does not log an event to the Geo event log when removing namespace restrictions' do
service = described_class.new(restricted_geo_node, namespace_ids: '')
expect { service.execute }.not_to change(Geo::RepositoriesChangedEvent, :count)
end
expect { service.execute }.not_to change(Geo::RepositoriesChangedEvent, :count) it 'does not log an event to the Geo event log when node is a primary node' do
primary.update!(selective_sync_type: 'namespaces')
service = described_class.new(primary, namespace_ids: namespace_ids)
expect { service.execute }.not_to change(Geo::RepositoriesChangedEvent, :count)
end
end end
it 'does not log an event to the Geo event log when node is a primary node' do context 'selective sync by shards' do
service = described_class.new(primary, namespace_ids: groups.map(&:id).join(',')) let(:restricted_geo_node) { create(:geo_node, selective_sync_type: 'shards', selective_sync_shards: ['default']) }
it 'logs an event to the Geo event log when adding shard restrictions' do
service = described_class.new(restricted_geo_node, selective_sync_shards: %w[default broken])
expect { service.execute }.to change(Geo::RepositoriesChangedEvent, :count).by(1)
end
it 'does not log an event to the Geo event log when removing shard restrictions' do
service = described_class.new(restricted_geo_node, selective_sync_shards: [])
expect { service.execute }.not_to change(Geo::RepositoriesChangedEvent, :count)
end
it 'does not log an event to the Geo event log when node is a primary node' do
primary.update!(selective_sync_type: 'shards')
service = described_class.new(primary, selective_sync_shards: %w[default broken'])
expect { service.execute }.not_to change(Geo::RepositoriesChangedEvent, :count) expect { service.execute }.not_to change(Geo::RepositoriesChangedEvent, :count)
end
end end
end end
end end
...@@ -181,7 +181,7 @@ describe Geo::FileDownloadDispatchWorker, :geo do ...@@ -181,7 +181,7 @@ describe Geo::FileDownloadDispatchWorker, :geo do
before do before do
allow(ProjectCacheWorker).to receive(:perform_async).and_return(true) allow(ProjectCacheWorker).to receive(:perform_async).and_return(true)
secondary.update_attribute(:namespaces, [synced_group]) secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
end end
it 'does not perform Geo::FileDownloadWorker for LFS object that does not belong to selected namespaces to replicate' do it 'does not perform Geo::FileDownloadWorker for LFS object that does not belong to selected namespaces to replicate' do
......
...@@ -10,7 +10,7 @@ describe Geo::RepositoriesCleanUpWorker do ...@@ -10,7 +10,7 @@ describe Geo::RepositoriesCleanUpWorker do
context 'when node has namespace restrictions' do context 'when node has namespace restrictions' do
let(:synced_group) { create(:group) } let(:synced_group) { create(:group) }
let(:geo_node) { create(:geo_node, namespaces: [synced_group]) } let(:geo_node) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) }
context 'legacy storage' do context 'legacy storage' do
it 'performs GeoRepositoryDestroyWorker for each project that does not belong to selected namespaces to replicate' do it 'performs GeoRepositoryDestroyWorker for each project that does not belong to selected namespaces to replicate' do
......
...@@ -104,7 +104,7 @@ describe Geo::RepositoryShardSyncWorker, :geo, :delete, :clean_gitlab_redis_cach ...@@ -104,7 +104,7 @@ describe Geo::RepositoryShardSyncWorker, :geo, :delete, :clean_gitlab_redis_cach
context 'when node has namespace restrictions' do context 'when node has namespace restrictions' do
before do before do
secondary.update_attribute(:namespaces, [synced_group]) secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
end end
it 'does not perform Geo::ProjectSyncWorker for projects that do not belong to selected namespaces to replicate' do it 'does not perform Geo::ProjectSyncWorker for projects that do not belong to selected namespaces to replicate' do
......
...@@ -66,6 +66,7 @@ export const rawMockNodeDetails = { ...@@ -66,6 +66,7 @@ export const rawMockNodeDetails = {
last_successful_status_check_timestamp: 1515142330, last_successful_status_check_timestamp: 1515142330,
version: '10.4.0-pre', version: '10.4.0-pre',
revision: 'b93c51849b', revision: 'b93c51849b',
selective_sync_type: 'namespaces',
namespaces: [ namespaces: [
{ {
id: 54, id: 54,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment