Commit a99ea00e authored by Gabriel Mazetto's avatar Gabriel Mazetto Committed by Nick Thomas

Geo: Expire and resync attachments from renamed projects in secondary nodes...

Geo: Expire and resync attachments from renamed projects in secondary nodes when using legacy storage
parent 0ddc1f7c
module Geo
class ExpireUploadsFinder
def find_project_uploads(project)
if Gitlab::Geo.fdw?
fdw_find_project_uploads(project)
else
legacy_find_project_uploads(project)
end
end
def find_file_registries_uploads(project)
if Gitlab::Geo.fdw?
fdw_find_file_registries_uploads(project)
else
legacy_find_file_registries_uploads(project)
end
end
#
# FDW accessors
#
# @return [ActiveRecord::Relation<Geo::Fdw::Upload>]
def fdw_find_project_uploads(project)
fdw_table = Geo::Fdw::Upload.table_name
upload_type = 'file'
Geo::Fdw::Upload.joins("JOIN file_registry
ON file_registry.file_id = #{fdw_table}.id
AND #{fdw_table}.model_id='#{project.id}'
AND #{fdw_table}.model_type='#{project.class.name}'
AND file_registry.file_type='#{upload_type}'")
end
# @return [ActiveRecord::Relation<Geo::FileRegistry>]
def fdw_find_file_registries_uploads(project)
fdw_table = Geo::Fdw::Upload.table_name
upload_type = 'file'
Geo::FileRegistry.joins("JOIN #{fdw_table}
ON file_registry.file_id = #{fdw_table}.id
AND #{fdw_table}.model_id='#{project.id}'
AND #{fdw_table}.model_type='#{project.class.name}'
AND file_registry.file_type='#{upload_type}'")
end
#
# Legacy accessors (non FDW)
#
# @return [ActiveRecord::Relation<Geo::FileRegistry>] list of file registry items
def legacy_find_file_registries_uploads(project)
upload_ids = Upload.where(model_type: project.class.name, model_id: project.id).pluck(:id)
return Geo::FileRegistry.none if upload_ids.empty?
values_sql = upload_ids.map { |id| "(#{id})" }.join(',')
upload_type = 'file'
Geo::FileRegistry.joins(<<~SQL)
JOIN (VALUES #{values_sql})
AS uploads (id)
ON uploads.id = file_registry.file_id
AND file_registry.file_type='#{upload_type}'
SQL
end
# @return [ActiveRecord::Relation<Upload>] list of upload files
def legacy_find_project_uploads(project)
file_registry_ids = legacy_find_file_registries_uploads(project).pluck(:file_id)
return Upload.none if file_registry_ids.empty?
values_sql = file_registry_ids.map { |f_id| "(#{f_id})" }.join(',')
Upload.joins(<<~SQL)
JOIN (VALUES #{values_sql})
AS file_registry (file_id)
ON (file_registry.file_id = uploads.id)
SQL
end
end
end
module Geo
class RegistryFinder
attr_reader :current_node
def initialize(current_node: nil)
@current_node = current_node
end
def find_failed_objects(batch_size:)
Geo::FileRegistry
.failed
.retry_due
.limit(batch_size)
.pluck(:file_id, :file_type)
end
# Find limited amount of non replicated lfs objects.
#
# You can pass a list with `except_registry_ids:` so you can exclude items you
# already scheduled but haven't finished and persisted to the database yet
#
# TODO: Alternative here is to use some sort of window function with a cursor instead
# of simply limiting the query and passing a list of items we don't want
#
# @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_registry_ids ids that will be ignored from the query
def find_nonreplicated_lfs_objects(batch_size:, except_registry_ids:)
# Selective project replication adds a wrinkle to FDW queries, so
# we fallback to the legacy version for now.
relation =
if Gitlab::Geo.fdw? && !selective_sync
fdw_find_nonreplicated_lfs_objects
else
legacy_find_nonreplicated_lfs_objects(except_registry_ids: except_registry_ids)
end
relation
.limit(batch_size)
.pluck(:id)
.map { |id| [id, :lfs] }
end
# Find limited amount of non replicated uploads.
#
# You can pass a list with `except_registry_ids:` so you can exclude items you
# already scheduled but haven't finished and persisted to the database yet
#
# TODO: Alternative here is to use some sort of window function with a cursor instead
# of simply limiting the query and passing a list of items we don't want
#
# @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_registry_ids ids that will be ignored from the query
def find_nonreplicated_uploads(batch_size:, except_registry_ids:)
# Selective project replication adds a wrinkle to FDW queries, so
# we fallback to the legacy version for now.
relation =
if Gitlab::Geo.fdw? && !selective_sync
fdw_find_nonreplicated_uploads
else
legacy_find_nonreplicated_uploads(except_registry_ids: except_registry_ids)
end
relation
.limit(batch_size)
.pluck(:id, :uploader)
.map { |id, uploader| [id, uploader.sub(/Uploader\z/, '').underscore] }
end
protected
def selective_sync
current_node.restricted_project_ids
end
#
# FDW accessors
#
def fdw_find_nonreplicated_lfs_objects
fdw_table = Geo::Fdw::LfsObject.table_name
# Filter out objects in object storage (this is done in GeoNode#lfs_objects)
Geo::Fdw::LfsObject.joins("LEFT OUTER JOIN file_registry
ON file_registry.file_id = #{fdw_table}.id
AND file_registry.file_type = 'lfs'")
.where("#{fdw_table}.file_store IS NULL OR #{fdw_table}.file_store = #{LfsObjectUploader::LOCAL_STORE}")
.where('file_registry.file_id IS NULL')
end
def fdw_find_nonreplicated_uploads
fdw_table = Geo::Fdw::Upload.table_name
upload_types = Geo::FileService::DEFAULT_OBJECT_TYPES.map { |val| "'#{val}'" }.join(',')
Geo::Fdw::Upload.joins("LEFT OUTER JOIN file_registry
ON file_registry.file_id = #{fdw_table}.id
AND file_registry.file_type IN (#{upload_types})")
.where('file_registry.file_id IS NULL')
end
#
# Legacy accessors (non FDW)
#
def legacy_find_nonreplicated_lfs_objects(except_registry_ids:)
registry_ids = legacy_pluck_registry_ids(file_types: :lfs, except_registry_ids: except_registry_ids)
legacy_filter_registry_ids(
current_node.lfs_objects,
registry_ids,
LfsObject.table_name
)
end
def legacy_find_nonreplicated_uploads(except_registry_ids:)
registry_ids = legacy_pluck_registry_ids(file_types: Geo::FileService::DEFAULT_OBJECT_TYPES, except_registry_ids: except_registry_ids)
legacy_filter_registry_ids(
current_node.uploads,
registry_ids,
Upload.table_name
)
end
# This query requires data from two different databases, and unavoidably
# plucks a list of file IDs from one into the other. This will not scale
# well with the number of synchronized files--the query will increase
# linearly in size--so this should be replaced with postgres_fdw ASAP.
def legacy_filter_registry_ids(objects, registry_ids, table_name)
return objects if registry_ids.empty?
joined_relation = objects.joins(<<~SQL)
LEFT OUTER JOIN
(VALUES #{registry_ids.map { |id| "(#{id}, 't')" }.join(',')})
file_registry(file_id, registry_present)
ON #{table_name}.id = file_registry.file_id
SQL
joined_relation.where(file_registry: { registry_present: [nil, false] })
end
def legacy_pluck_registry_ids(file_types:, except_registry_ids:)
ids = Geo::FileRegistry.where(file_type: file_types).pluck(:file_id)
(ids + except_registry_ids).uniq
end
end
end
module Geo
class FilesExpireService
include ::Gitlab::Geo::LogHelpers
BATCH_SIZE = 500
attr_reader :project, :old_full_path
def initialize(project, old_full_path)
@project = project
@old_full_path = old_full_path
end
# Expire already replicated uploads
#
# This is a fallback solution to support projects that haven't rolled out to hashed-storage yet.
#
# Note: Unless we add some locking mechanism, this will be best effort only
# as if there are files that are being replicated during this execution, they will not
# be expired.
#
# The long-term solution is to use hashed storage.
def execute
return unless Gitlab::Geo.secondary?
uploads = finder.find_project_uploads(project)
log_info("Expiring replicated attachments after project rename", count: uploads.count)
schedule_file_removal(uploads)
mark_for_resync!
end
# Project's base directory for attachments storage
#
# @return base directory where all uploads for the project are stored
def base_dir
@base_dir ||= File.join(CarrierWave.root, FileUploader.base_dir, old_full_path)
end
private
def schedule_file_removal(uploads)
paths_to_remove = uploads.find_each(batch_size: BATCH_SIZE).reduce([]) do |to_remove, upload|
file_path = File.join(base_dir, upload.path)
if File.exist?(file_path)
to_remove << [file_path]
log_info("Scheduled to remove file", file_path: file_path)
end
to_remove
end
Sidekiq::Client.push_bulk('class' => Geo::FileRemovalWorker, 'args' => paths_to_remove)
end
def mark_for_resync!
finder.find_file_registries_uploads(project).delete_all
end
def finder
@finder ||= ::Geo::ExpireUploadsFinder.new
end
# This is called by LogHelpers to build json log with context info
#
# @see ::Gitlab::Geo::LogHelpers
def base_log_data(message)
{
class: self.class.name,
project_id: project.id,
project_path: project.full_path,
project_old_path: old_full_path,
message: message
}
end
end
end
......@@ -14,14 +14,28 @@ module Geo
end
def execute
unless move_repositories!
return false
end
unless project.hashed_storage?(:attachments)
Geo::FilesExpireService.new(project, old_disk_path).execute
end
true
end
private
def move_repositories!
begin
project.ensure_storage_path_exists
move_project_repository && move_wiki_repository
rescue
log_error('Repository cannot be renamed')
rescue => ex
log_error('Repository cannot be renamed', error: ex)
false
end
private
end
def move_project_repository
gitlab_shell.mv_repository(project.repository_storage_path, old_disk_path, new_disk_path)
......
......@@ -12,6 +12,13 @@ module Geo
{ id: object_db_id, type: object_type, job_id: job_id } if job_id
end
def finder
@finder ||= RegistryFinder.new(current_node: current_node)
end
# Pools for new resources to be transferred
#
# @return [Array] resources to be transferred
def load_pending_resources
resources = find_unsynced_objects(batch_size: db_retrieve_batch_size)
remaining_capacity = db_retrieve_batch_size - resources.count
......@@ -19,120 +26,21 @@ module Geo
if remaining_capacity.zero?
resources
else
resources + find_failed_objects(batch_size: remaining_capacity)
resources + finder.find_failed_objects(batch_size: remaining_capacity)
end
end
def find_unsynced_objects(batch_size:)
lfs_object_ids = find_lfs_object_ids(batch_size: batch_size)
upload_objects_ids = find_upload_object_ids(batch_size: batch_size)
lfs_object_ids = finder.find_nonreplicated_lfs_objects(batch_size: batch_size, except_registry_ids: scheduled_file_ids(:lfs))
upload_objects_ids = finder.find_nonreplicated_uploads(batch_size: batch_size, except_registry_ids: scheduled_file_ids(Geo::FileService::DEFAULT_OBJECT_TYPES))
interleave(lfs_object_ids, upload_objects_ids)
end
def find_failed_objects(batch_size:)
Geo::FileRegistry
.failed
.retry_due
.limit(batch_size)
.pluck(:file_id, :file_type)
end
def selective_sync
current_node.restricted_project_ids
end
def find_lfs_object_ids(batch_size:)
# Selective project replication adds a wrinkle to FDW queries, so
# we fallback to the legacy version for now.
relation =
if Gitlab::Geo.fdw? && !selective_sync
fdw_find_lfs_object_ids
else
legacy_find_lfs_object_ids
end
relation
.limit(batch_size)
.pluck(:id)
.map { |id| [id, :lfs] }
end
def find_upload_object_ids(batch_size:)
# Selective project replication adds a wrinkle to FDW queries, so
# we fallback to the legacy version for now.
relation =
if Gitlab::Geo.fdw? && !selective_sync
fdw_find_upload_object_ids
else
legacy_find_upload_object_ids
end
relation
.limit(batch_size)
.pluck(:id, :uploader)
.map { |id, uploader| [id, uploader.sub(/Uploader\z/, '').underscore] }
end
def fdw_find_lfs_object_ids
fdw_table = Geo::Fdw::LfsObject.table_name
# Filter out objects in object storage (this is done in GeoNode#lfs_objects)
Geo::Fdw::LfsObject.joins("LEFT OUTER JOIN file_registry ON file_registry.file_id = #{fdw_table}.id AND file_registry.file_type = 'lfs'")
.where("#{fdw_table}.file_store IS NULL OR #{fdw_table}.file_store = #{LfsObjectUploader::LOCAL_STORE}")
.where('file_registry.file_id IS NULL')
end
def fdw_find_upload_object_ids
fdw_table = Geo::Fdw::Upload.table_name
obj_types = Geo::FileService::DEFAULT_OBJECT_TYPES.map { |val| "'#{val}'" }.join(',')
Geo::Fdw::Upload.joins("LEFT OUTER JOIN file_registry ON file_registry.file_id = #{fdw_table}.id AND file_registry.file_type IN (#{obj_types})")
.where('file_registry.file_id IS NULL')
end
def legacy_find_upload_object_ids
legacy_filter_registry_ids(
current_node.uploads,
Geo::FileService::DEFAULT_OBJECT_TYPES,
Upload.table_name
)
end
def legacy_find_lfs_object_ids
legacy_filter_registry_ids(
current_node.lfs_objects,
[:lfs],
LfsObject.table_name
)
end
# This query requires data from two different databases, and unavoidably
# plucks a list of file IDs from one into the other. This will not scale
# well with the number of synchronized files--the query will increase
# linearly in size--so this should be replaced with postgres_fdw ASAP.
def legacy_filter_registry_ids(objects, file_types, table_name)
registry_ids = legacy_pluck_registry_ids(Geo::FileRegistry, file_types)
return objects if registry_ids.empty?
joined_relation = objects.joins(<<~SQL)
LEFT OUTER JOIN
(VALUES #{registry_ids.map { |id| "(#{id}, 't')" }.join(',')})
file_registry(file_id, registry_present)
ON #{table_name}.id = file_registry.file_id
SQL
joined_relation.where(file_registry: { registry_present: [nil, false] })
end
def legacy_pluck_registry_ids(relation, file_types)
ids = relation.where(file_type: file_types).pluck(:file_id)
(ids + scheduled_file_ids(file_types)).uniq
end
def scheduled_file_ids(file_types)
file_types = Array(file_types) unless file_types.is_a? Array
def scheduled_file_ids(types)
scheduled_jobs.select { |data| types.include?(data[:type]) }.map { |data| data[:id] }
scheduled_jobs.select { |data| file_types.include?(data[:type]) }.map { |data| data[:id] }
end
end
end
module Geo
class FileRemovalWorker
include Sidekiq::Worker
include Gitlab::Geo::LogHelpers
sidekiq_options queue: :geo
def perform(file_path)
remove_file!(file_path)
end
private
def remove_file!(file_path)
if File.file?(file_path)
begin
File.unlink(file_path)
rescue => ex
log_error("Failed to remove file", ex, file_path: file_path)
end
log_info("Removed file", file_path: file_path)
else
log_info("Tried to remove file, but it was not found", file_path: file_path)
end
end
end
end
---
title: 'Geo: Expire and resync attachments from renamed projects in secondary nodes
when using legacy storage'
merge_request: 3259
author:
type: added
......@@ -9,5 +9,10 @@ FactoryGirl.define do
model { build(:personal_snippet) }
uploader "PersonalFileUploader"
end
trait :issuable_upload do
path { "#{SecureRandom.hex}/myfile.jpg" }
uploader "FileUploader"
end
end
end
require 'spec_helper'
# Disable transactions via :truncate method because a foreign table
# can't see changes inside a transaction of a different connection.
describe Geo::ExpireUploadsFinder, :geo, :truncate do
let(:project) { create(:project) }
context 'FDW' do
before do
skip('FDW is not configured') if Gitlab::Database.postgresql? && !Gitlab::Geo.fdw?
end
describe '#find_project_uploads' do
let(:project) { build_stubbed(:project) }
it 'delegates to #fdw_find_project_uploads' do
expect(subject).to receive(:fdw_find_project_uploads).with(project)
subject.find_project_uploads(project)
end
end
describe '#fdw_find_project_uploads' do
context 'filtering per project uploads' do
it 'returns only objects associated with the project' do
other_upload = create(:upload, :issuable_upload)
upload = create(:upload, :issuable_upload, model: project)
create(:geo_file_registry, file_id: upload.id)
create(:geo_file_registry, file_id: other_upload.id)
uploads = subject.fdw_find_project_uploads(project)
expect(uploads.count).to eq(1)
expect(uploads.first.id).to eq(upload.id)
end
end
context 'filtering replicated uploads only' do
it 'returns only replicated or to be replicated objects' do
create(:upload, :issuable_upload, model: project)
upload = create(:upload, :issuable_upload, model: project)
create(:geo_file_registry, file_id: upload.id, success: false)
uploads = subject.fdw_find_project_uploads(project)
expect(uploads.count).to eq(1)
expect(uploads.first.id).to eq(upload.id)
end
end
end
describe '#find_file_registries_uploads' do
let(:project) { build_stubbed(:project) }
it 'delegates to #fdw_find_file_registries_uploads' do
expect(subject).to receive(:fdw_find_file_registries_uploads).with(project)
subject.find_file_registries_uploads(project)
end
end
describe '#fdw_find_file_registries_uploads' do
context 'filtering per project uploads' do
it 'returns only objects associated with the project' do
other_upload = create(:upload, :issuable_upload)
upload = create(:upload, :issuable_upload, model: project)
create(:geo_file_registry, file_id: other_upload.id)
file_registry = create(:geo_file_registry, file_id: upload.id)
files = subject.fdw_find_file_registries_uploads(project)
expect(files.count).to eq(1)
expect(files.first.id).to eq(file_registry.id)
end
end
end
end
context 'Legacy' do
before do
allow(Gitlab::Geo).to receive(:fdw?).and_return(false)
end
describe '#find_project_uploads' do
let(:project) { build_stubbed(:project) }
it 'delegates to #legacy_find_project_uploads' do
expect(subject).to receive(:legacy_find_project_uploads).with(project)
subject.find_project_uploads(project)
end
end
describe '#legacy_find_project_uploads' do
context 'filtering per project uploads' do
it 'returns only objects associated with the project' do
other_upload = create(:upload, :issuable_upload)
upload = create(:upload, :issuable_upload, model: project)
create(:geo_file_registry, file_id: upload.id)
create(:geo_file_registry, file_id: other_upload.id)
uploads = subject.legacy_find_project_uploads(project)
expect(uploads.count).to eq(1)
expect(uploads.first.id).to eq(upload.id)
end
end
context 'filtering replicated uploads only' do
it 'returns only replicated or to be replicated objects' do
create(:upload, :issuable_upload, model: project)
upload = create(:upload, :issuable_upload, model: project)
create(:geo_file_registry, file_id: upload.id, success: false)
uploads = subject.legacy_find_project_uploads(project)
expect(uploads.count).to eq(1)
expect(uploads.first.id).to eq(upload.id)
end
end
end
describe '#find_file_registries_uploads' do
let(:project) { build_stubbed(:project) }
it 'delegates to #legacy_find_file_registries_uploads' do
expect(subject).to receive(:legacy_find_file_registries_uploads).with(project)
subject.find_file_registries_uploads(project)
end
end
describe '#legacy_find_file_registries_uploads' do
context 'filtering per project uploads' do
it 'returns only objects associated with the project' do
other_upload = create(:upload, :issuable_upload)
upload = create(:upload, :issuable_upload, model: project)
create(:geo_file_registry, file_id: other_upload.id)
file_registry = create(:geo_file_registry, file_id: upload.id)
files = subject.legacy_find_file_registries_uploads(project)
expect(files.count).to eq(1)
expect(files.first.id).to eq(file_registry.id)
end
end
end
end
end
require 'spec_helper'
# Disable transactions via :truncate method because a foreign table
# can't see changes inside a transaction of a different connection.
describe Geo::FilesExpireService, :geo, :truncate do
let(:project) { create(:project) }
let!(:old_full_path) { project.full_path }
subject { described_class.new(project, old_full_path) }
describe '#execute' do
let(:file_uploader) { build(:file_uploader, project: project) }
let!(:upload) { Upload.find_by(path: file_uploader.relative_path) }
let!(:file_registry) { create(:geo_file_registry, file_id: upload.id) }
before do
project.update(path: "#{project.path}_renamed")
end
context 'when in Geo secondary node' do
before do
allow(Gitlab::Geo).to receive(:secondary?) { true }
end
it 'remove file from disk' do
file_path = File.join(subject.base_dir, upload.path)
expect(File.exist?(file_path)).to be_truthy
Sidekiq::Testing.inline! { subject.execute }
expect(File.exist?(file_path)).to be_falsey
end
it 'removes file_registry associates with upload' do
expect(file_registry.success).to be_truthy
subject.execute
expect { file_registry.reload }.to raise_error(ActiveRecord::RecordNotFound)
end
end
context 'when not in Geo secondary node' do
it 'no-op execute action' do
expect(subject).not_to receive(:schedule_file_removal)
expect(subject).not_to receive(:mark_for_resync!)
subject.execute
end
end
end
end
require 'spec_helper'
describe Geo::MoveRepositoryService do
describe Geo::MoveRepositoryService, :geo do
describe '#execute' do
let(:project) { create(:project, :repository, :wiki_repo) }
let(:old_path) { project.full_path }
......
......@@ -20,10 +20,10 @@ describe Geo::FileDownloadDispatchWorker, :geo, :truncate do
shared_examples '#perform' do |skip_tests|
before do
skip if skip_tests
skip('FDW is not configured') if skip_tests
end
it 'does not schedule anything when secondary role is disabled' do
it 'does not schedule anything when tracking database is not configured' do
create(:lfs_object, :with_file)
allow(Gitlab::Geo).to receive(:geo_database_configured?) { false }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment