Commit 98f55734 authored by Nick Thomas's avatar Nick Thomas

Prevent failed file syncs from stalling Geo backfill

parent 5ba5711f
class Geo::FileRegistry < Geo::BaseRegistry
scope :failed, -> { where(success: false) }
scope :synced, -> { where(success: true) }
end
......@@ -100,7 +100,7 @@ class GeoNodeStatus
def lfs_objects_synced_count
@lfs_objects_synced_count ||= begin
relation = Geo::FileRegistry.where(file_type: :lfs)
relation = Geo::FileRegistry.synced.where(file_type: :lfs)
if Gitlab::Geo.current_node.restricted_project_ids
relation = relation.where(file_id: lfs_objects.pluck(:id))
......@@ -129,7 +129,7 @@ class GeoNodeStatus
def attachments_synced_count
@attachments_synced_count ||= begin
upload_ids = attachments.pluck(:id)
synced_ids = Geo::FileRegistry.where(file_type: [:attachment, :avatar, :file]).pluck(:file_id)
synced_ids = Geo::FileRegistry.synced.where(file_type: [:attachment, :avatar, :file]).pluck(:file_id)
(synced_ids & upload_ids).length
end
......
......@@ -11,7 +11,7 @@ module Geo
success: success,
bytes_downloaded: bytes_downloaded,
download_time_s: (Time.now - start_time).to_f.round(3))
update_registry(bytes_downloaded) if success
update_registry(bytes_downloaded, success: success)
end
end
......@@ -37,13 +37,14 @@ module Geo
end
end
def update_registry(bytes_downloaded)
def update_registry(bytes_downloaded, success:)
transfer = Geo::FileRegistry.find_or_initialize_by(
file_type: object_type,
file_id: object_db_id
)
transfer.bytes = bytes_downloaded
transfer.success = success
transfer.save
end
......
......@@ -9,33 +9,47 @@ module Geo
end
def load_pending_resources
unsynced = find_unsynced_objects
failed = find_failed_objects
interleave(unsynced, failed)
end
def find_unsynced_objects
lfs_object_ids = find_lfs_object_ids
objects_ids = find_object_ids
interleave(lfs_object_ids, objects_ids)
end
def find_object_ids
downloaded_ids = find_downloaded_ids(Geo::FileService::DEFAULT_OBJECT_TYPES)
def find_failed_objects
Geo::FileRegistry
.failed
.limit(db_retrieve_batch_size)
.pluck(:file_id, :file_type)
end
unsynched_downloads = filter_downloaded_ids(
current_node.uploads, downloaded_ids, Upload.table_name)
def find_object_ids
unsynced_downloads = filter_registry_ids(
current_node.uploads,
Geo::FileService::DEFAULT_OBJECT_TYPES,
Upload.table_name
)
unsynched_downloads
.order(created_at: :desc)
unsynced_downloads
.limit(db_retrieve_batch_size)
.pluck(:id, :uploader)
.map { |id, uploader| [id, uploader.sub(/Uploader\z/, '').underscore] }
end
def find_lfs_object_ids
downloaded_ids = find_downloaded_ids([:lfs])
unsynced_downloads = filter_registry_ids(
current_node.lfs_objects,
[:lfs],
LfsObject.table_name
)
unsynched_downloads = filter_downloaded_ids(
current_node.lfs_objects, downloaded_ids, LfsObject.table_name)
unsynched_downloads
.order(created_at: :desc)
unsynced_downloads
.limit(db_retrieve_batch_size)
.pluck(:id)
.map { |id| [id, :lfs] }
......@@ -45,12 +59,14 @@ module Geo
# plucks a list of file IDs from one into the other. This will not scale
# well with the number of synchronized files--the query will increase
# linearly in size--so this should be replaced with postgres_fdw ASAP.
def filter_downloaded_ids(objects, downloaded_ids, table_name)
return objects if downloaded_ids.empty?
def filter_registry_ids(objects, file_types, table_name)
registry_ids = pluck_registry_ids(Geo::FileRegistry, file_types)
return objects if registry_ids.empty?
joined_relation = objects.joins(<<~SQL)
LEFT OUTER JOIN
(VALUES #{downloaded_ids.map { |id| "(#{id}, 't')" }.join(',')})
(VALUES #{registry_ids.map { |id| "(#{id}, 't')" }.join(',')})
file_registry(file_id, registry_present)
ON #{table_name}.id = file_registry.file_id
SQL
......@@ -58,9 +74,9 @@ module Geo
joined_relation.where(file_registry: { registry_present: [nil, false] })
end
def find_downloaded_ids(file_types)
downloaded_ids = Geo::FileRegistry.where(file_type: file_types).pluck(:file_id)
(downloaded_ids + scheduled_file_ids(file_types)).uniq
def pluck_registry_ids(relation, file_types)
ids = relation.where(file_type: file_types).pluck(:file_id)
(ids + scheduled_file_ids(file_types)).uniq
end
def scheduled_file_ids(types)
......
---
title: Prevent failed file syncs from stalling Geo backfill
merge_request: 3101
author:
type: fixed
class AddFileRegistrySuccess < ActiveRecord::Migration
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
disable_ddl_transaction!
def up
# Ensure existing rows are recorded as successes
add_column_with_default :file_registry, :success, :boolean, default: true, allow_null: false
change_column :file_registry, :success, :boolean, default: false
end
def down
# Prevent failures from being converted into successes
false_value = Arel::Nodes::False.new.to_sql(Geo::BaseRegistry)
connection.execute("DELETE FROM file_registry WHERE success = #{false_value}")
remove_column :file_registry, :success
end
end
class AddFileRegistrySuccessIndex < ActiveRecord::Migration
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
disable_ddl_transaction!
def up
add_concurrent_index :file_registry, :success
end
def down
remove_concurrent_index :file_registry, :success
end
end
......@@ -11,7 +11,7 @@
#
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema.define(version: 20171005045404) do
ActiveRecord::Schema.define(version: 20171009162209) do
# These are extensions that must be enabled in order to support this database
enable_extension "plpgsql"
......@@ -25,10 +25,12 @@ ActiveRecord::Schema.define(version: 20171005045404) do
t.integer "bytes", limit: 8
t.string "sha256"
t.datetime "created_at", null: false
t.boolean "success", default: false, null: false
end
add_index "file_registry", ["file_type", "file_id"], name: "index_file_registry_on_file_type_and_file_id", unique: true, using: :btree
add_index "file_registry", ["file_type"], name: "index_file_registry_on_file_type", using: :btree
add_index "file_registry", ["success"], name: "index_file_registry_on_success", using: :btree
create_table "project_registry", force: :cascade do |t|
t.integer "project_id", null: false
......
......@@ -2,6 +2,7 @@ FactoryGirl.define do
factory :geo_file_registry, class: Geo::FileRegistry do
sequence(:file_id)
file_type :file
success true
trait :avatar do
file_type :avatar
......
require 'spec_helper'
describe Geo::FileRegistry do
set(:failed) { create(:geo_file_registry, success: false) }
set(:synced) { create(:geo_file_registry, success: true) }
describe '.failed' do
it 'returns registries in the failed state' do
expect(described_class.failed).to contain_exactly(failed)
end
end
describe '.synced' do
it 'returns registries in the synced state' do
expect(described_class.synced).to contain_exactly(synced)
end
end
end
......@@ -8,6 +8,8 @@ describe Geo::FileDownloadService do
before do
stub_current_geo_node(secondary)
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain).and_return(true)
end
describe '#execute' do
......@@ -15,15 +17,18 @@ describe Geo::FileDownloadService do
let(:user) { create(:user, avatar: fixture_file_upload(Rails.root + 'spec/fixtures/dk.png', 'image/png')) }
let(:upload) { Upload.find_by(model: user, uploader: 'AvatarUploader') }
subject { described_class.new(:avatar, upload.id) }
subject(:execute!) { described_class.new(:avatar, upload.id).execute }
it 'downloads a user avatar' do
allow_any_instance_of(Gitlab::ExclusiveLease)
.to receive(:try_obtain).and_return(true)
allow_any_instance_of(Gitlab::Geo::FileTransfer)
.to receive(:download_from_primary).and_return(100)
stub_transfer(Gitlab::Geo::FileTransfer, 100)
expect { execute! }.to change { Geo::FileRegistry.synced.count }.by(1)
end
it 'registers when the download fails' do
stub_transfer(Gitlab::Geo::FileTransfer, -1)
expect { subject.execute }.to change { Geo::FileRegistry.count }.by(1)
expect { execute! }.to change { Geo::FileRegistry.failed.count }.by(1)
end
end
......@@ -31,15 +36,18 @@ describe Geo::FileDownloadService do
let(:group) { create(:group, avatar: fixture_file_upload(Rails.root + 'spec/fixtures/dk.png', 'image/png')) }
let(:upload) { Upload.find_by(model: group, uploader: 'AvatarUploader') }
subject { described_class.new(:avatar, upload.id) }
subject(:execute!) { described_class.new(:avatar, upload.id).execute }
it 'downloads a group avatar' do
allow_any_instance_of(Gitlab::ExclusiveLease)
.to receive(:try_obtain).and_return(true)
allow_any_instance_of(Gitlab::Geo::FileTransfer)
.to receive(:download_from_primary).and_return(100)
stub_transfer(Gitlab::Geo::FileTransfer, 100)
expect { execute! }.to change { Geo::FileRegistry.synced.count }.by(1)
end
it 'registers when the download fails' do
stub_transfer(Gitlab::Geo::FileTransfer, -1)
expect { subject.execute }.to change { Geo::FileRegistry.count }.by(1)
expect { execute! }.to change { Geo::FileRegistry.failed.count }.by(1)
end
end
......@@ -47,15 +55,18 @@ describe Geo::FileDownloadService do
let(:project) { create(:project, avatar: fixture_file_upload(Rails.root + 'spec/fixtures/dk.png', 'image/png')) }
let(:upload) { Upload.find_by(model: project, uploader: 'AvatarUploader') }
subject { described_class.new(:avatar, upload.id) }
subject(:execute!) { described_class.new(:avatar, upload.id).execute }
it 'downloads a project avatar' do
allow_any_instance_of(Gitlab::ExclusiveLease)
.to receive(:try_obtain).and_return(true)
allow_any_instance_of(Gitlab::Geo::FileTransfer)
.to receive(:download_from_primary).and_return(100)
stub_transfer(Gitlab::Geo::FileTransfer, 100)
expect { subject.execute }.to change { Geo::FileRegistry.count }.by(1)
expect { execute! }.to change { Geo::FileRegistry.synced.count }.by(1)
end
it 'registers when the download fails' do
stub_transfer(Gitlab::Geo::FileTransfer, -1)
expect { execute! }.to change { Geo::FileRegistry.failed.count }.by(1)
end
end
......@@ -63,30 +74,36 @@ describe Geo::FileDownloadService do
let(:note) { create(:note, :with_attachment) }
let(:upload) { Upload.find_by(model: note, uploader: 'AttachmentUploader') }
subject { described_class.new(:attachment, upload.id) }
subject(:execute!) { described_class.new(:attachment, upload.id).execute }
it 'downloads the attachment' do
allow_any_instance_of(Gitlab::ExclusiveLease)
.to receive(:try_obtain).and_return(true)
allow_any_instance_of(Gitlab::Geo::FileTransfer)
.to receive(:download_from_primary).and_return(100)
stub_transfer(Gitlab::Geo::FileTransfer, 100)
expect { subject.execute }.to change { Geo::FileRegistry.count }.by(1)
expect { execute! }.to change { Geo::FileRegistry.synced.count }.by(1)
end
it 'registers when the download fails' do
stub_transfer(Gitlab::Geo::FileTransfer, -1)
expect { execute! }.to change { Geo::FileRegistry.failed.count }.by(1)
end
end
context 'with a snippet' do
let(:upload) { create(:upload, :personal_snippet) }
subject { described_class.new(:personal_file, upload.id) }
subject(:execute!) { described_class.new(:personal_file, upload.id).execute }
it 'downloads the file' do
allow_any_instance_of(Gitlab::ExclusiveLease)
.to receive(:try_obtain).and_return(true)
allow_any_instance_of(Gitlab::Geo::FileTransfer)
.to receive(:download_from_primary).and_return(100)
stub_transfer(Gitlab::Geo::FileTransfer, 100)
expect { subject.execute }.to change { Geo::FileRegistry.count }.by(1)
expect { execute! }.to change { Geo::FileRegistry.synced.count }.by(1)
end
it 'registers when the download fails' do
stub_transfer(Gitlab::Geo::FileTransfer, -1)
expect { execute! }.to change { Geo::FileRegistry.failed.count }.by(1)
end
end
......@@ -101,12 +118,15 @@ describe Geo::FileDownloadService do
end
it 'downloads the file' do
allow_any_instance_of(Gitlab::ExclusiveLease)
.to receive(:try_obtain).and_return(true)
allow_any_instance_of(Gitlab::Geo::FileTransfer)
.to receive(:download_from_primary).and_return(100)
stub_transfer(Gitlab::Geo::FileTransfer, 100)
expect { subject.execute }.to change { Geo::FileRegistry.count }.by(1)
expect { subject.execute }.to change { Geo::FileRegistry.synced.count }.by(1)
end
it 'registers when the download fails' do
stub_transfer(Gitlab::Geo::FileTransfer, -1)
expect { subject.execute }.to change { Geo::FileRegistry.failed.count }.by(1)
end
end
......@@ -115,18 +135,21 @@ describe Geo::FileDownloadService do
subject { described_class.new(:lfs, lfs_object.id) }
before do
allow_any_instance_of(Gitlab::ExclusiveLease)
.to receive(:try_obtain).and_return(true)
allow_any_instance_of(Gitlab::Geo::LfsTransfer)
.to receive(:download_from_primary).and_return(100)
it 'downloads an LFS object' do
stub_transfer(Gitlab::Geo::LfsTransfer, 100)
expect { subject.execute }.to change { Geo::FileRegistry.synced.count }.by(1)
end
it 'downloads an LFS object' do
expect { subject.execute }.to change { Geo::FileRegistry.count }.by(1)
it 'registers when the download fails' do
stub_transfer(Gitlab::Geo::LfsTransfer, -1)
expect { subject.execute }.to change { Geo::FileRegistry.failed.count }.by(1)
end
it 'logs a message' do
stub_transfer(Gitlab::Geo::LfsTransfer, 100)
expect(Gitlab::Geo::Logger).to receive(:info).with(hash_including(:message, :download_time_s, success: true, bytes_downloaded: 100)).and_call_original
subject.execute
......@@ -138,5 +161,10 @@ describe Geo::FileDownloadService do
expect { described_class.new(:bad, 1).execute }.to raise_error(NameError)
end
end
def stub_transfer(kls, result)
instance = double("(instance of #{kls})", download_from_primary: result)
allow(kls).to receive(:new).and_return(instance)
end
end
end
......@@ -84,6 +84,27 @@ describe Geo::FileDownloadDispatchWorker, :postgresql do
end
end
context 'with a failed file' do
let!(:failed_registry) { create(:geo_file_registry, :lfs, file_id: 999, success: false) }
it 'does not stall backfill' do
unsynced = create(:lfs_object, :with_file)
stub_const('Geo::BaseSchedulerWorker::DB_RETRIEVE_BATCH_SIZE', 1)
expect(GeoFileDownloadWorker).not_to receive(:perform_async).with(:lfs, failed_registry.file_id)
expect(GeoFileDownloadWorker).to receive(:perform_async).with(:lfs, unsynced.id)
subject.perform
end
it 'retries failed files' do
expect(GeoFileDownloadWorker).to receive(:perform_async).with('lfs', failed_registry.file_id)
subject.perform
end
end
context 'when node has namespace restrictions' do
let(:synced_group) { create(:group) }
let!(:project_in_synced_group) { create(:project, group: synced_group) }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment