Commit 90df6ad9 authored by Nick Thomas's avatar Nick Thomas Committed by Bob Van Landuyt

Extract RemoteMirrorLfsObjectUploaderWorker

This will allow us to parallelize the work involved in uploading an
arbitrary number of LFS objects to the remote mirror.
parent f9b0acdb
...@@ -814,6 +814,7 @@ Rails/SaveBang: ...@@ -814,6 +814,7 @@ Rails/SaveBang:
- 'ee/spec/workers/repository_import_worker_spec.rb' - 'ee/spec/workers/repository_import_worker_spec.rb'
- 'ee/spec/workers/update_all_mirrors_worker_spec.rb' - 'ee/spec/workers/update_all_mirrors_worker_spec.rb'
- 'qa/qa/specs/features/browser_ui/3_create/repository/push_mirroring_over_http_spec.rb' - 'qa/qa/specs/features/browser_ui/3_create/repository/push_mirroring_over_http_spec.rb'
- 'qa/qa/specs/features/browser_ui/3_create/repository/push_mirroring_lfs_over_http_spec.rb'
- 'qa/qa/specs/features/ee/browser_ui/3_create/repository/pull_mirroring_over_http_spec.rb' - 'qa/qa/specs/features/ee/browser_ui/3_create/repository/pull_mirroring_over_http_spec.rb'
- 'qa/qa/specs/features/ee/browser_ui/3_create/repository/pull_mirroring_over_ssh_with_key_spec.rb' - 'qa/qa/specs/features/ee/browser_ui/3_create/repository/pull_mirroring_over_ssh_with_key_spec.rb'
- 'spec/controllers/abuse_reports_controller_spec.rb' - 'spec/controllers/abuse_reports_controller_spec.rb'
......
...@@ -1469,6 +1469,12 @@ class Project < ApplicationRecord ...@@ -1469,6 +1469,12 @@ class Project < ApplicationRecord
forked_from_project || fork_network&.root_project forked_from_project || fork_network&.root_project
end end
def lfs_objects_for_repository_types(*types)
LfsObject
.joins(:lfs_objects_projects)
.where(lfs_objects_projects: { project: self, repository_type: types })
end
def lfs_objects_oids(oids: []) def lfs_objects_oids(oids: [])
oids(lfs_objects, oids: oids) oids(lfs_objects, oids: oids)
end end
......
...@@ -210,6 +210,10 @@ class RemoteMirror < ApplicationRecord ...@@ -210,6 +210,10 @@ class RemoteMirror < ApplicationRecord
super(usernames_whitelist: %w[git]) super(usernames_whitelist: %w[git])
end end
def bare_url
Gitlab::UrlSanitizer.new(read_attribute(:url)).full_url
end
def ensure_remote! def ensure_remote!
return unless project return unless project
return unless remote_name && remote_url return unless remote_name && remote_url
......
# frozen_string_literal: true
module Lfs
# Lfs::PushService pushes the LFS objects associated with a project to a
# remote URL
class PushService < BaseService
include Gitlab::Utils::StrongMemoize
# Match the canonical LFS client's batch size:
# https://github.com/git-lfs/git-lfs/blob/master/tq/transfer_queue.go#L19
BATCH_SIZE = 100
def execute
lfs_objects_relation.each_batch(of: BATCH_SIZE) do |objects|
push_objects(objects)
end
success
rescue => err
error(err.message)
end
private
# Currently we only set repository_type for design repository objects, so
# push mirroring must send objects with a `nil` repository type - but if the
# wiki repository uses LFS, its objects will also be sent. This will be
# addressed by https://gitlab.com/gitlab-org/gitlab/-/issues/250346
def lfs_objects_relation
project.lfs_objects_for_repository_types(nil, :project)
end
def push_objects(objects)
rsp = lfs_client.batch('upload', objects)
objects = objects.index_by(&:oid)
rsp.fetch('objects', []).each do |spec|
actions = spec['actions']
object = objects[spec['oid']]
upload_object!(object, spec) if actions&.key?('upload')
verify_object!(object, spec) if actions&.key?('verify')
end
end
def upload_object!(object, spec)
authenticated = spec['authenticated']
upload = spec.dig('actions', 'upload')
# The server wants us to upload the object but something is wrong
unless object && object.size == spec['size'].to_i
log_error("Couldn't match object #{spec['oid']}/#{spec['size']}")
return
end
lfs_client.upload(object, upload, authenticated: authenticated)
end
def verify_object!(object, spec)
# TODO: the remote has requested that we make another call to verify that
# the object has been sent correctly.
# https://gitlab.com/gitlab-org/gitlab/-/issues/250654
log_error("LFS upload verification requested, but not supported for #{object.oid}")
end
def url
params.fetch(:url)
end
def credentials
params.fetch(:credentials)
end
def lfs_client
strong_memoize(:lfs_client) do
Gitlab::Lfs::Client.new(url, credentials: credentials)
end
end
end
end
...@@ -31,6 +31,9 @@ module Projects ...@@ -31,6 +31,9 @@ module Projects
remote_mirror.update_start! remote_mirror.update_start!
remote_mirror.ensure_remote! remote_mirror.ensure_remote!
# LFS objects must be sent first, or the push has dangling pointers
send_lfs_objects!(remote_mirror)
response = remote_mirror.update_repository response = remote_mirror.update_repository
if response.divergent_refs.any? if response.divergent_refs.any?
...@@ -43,6 +46,23 @@ module Projects ...@@ -43,6 +46,23 @@ module Projects
end end
end end
def send_lfs_objects!(remote_mirror)
return unless Feature.enabled?(:push_mirror_syncs_lfs, project)
return unless project.lfs_enabled?
# TODO: Support LFS sync over SSH
# https://gitlab.com/gitlab-org/gitlab/-/issues/249587
return unless remote_mirror.url =~ /\Ahttps?:\/\//i
return unless remote_mirror.password_auth?
Lfs::PushService.new(
project,
current_user,
url: remote_mirror.bare_url,
credentials: remote_mirror.credentials
).execute
end
def retry_or_fail(mirror, message, tries) def retry_or_fail(mirror, message, tries)
if tries < MAX_TRIES if tries < MAX_TRIES
mirror.mark_for_retry!(message) mirror.mark_for_retry!(message)
......
---
title: Sync LFS objects when push mirroring
merge_request: 40137
author:
type: added
---
name: push_mirror_syncs_lfs
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/40137
rollout_issue_url:
group: group::source code
type: development
default_enabled: false
# frozen_string_literal: true
module Gitlab
module Lfs
# Gitlab::Lfs::Client implements a simple LFS client, designed to talk to
# LFS servers as described in these documents:
# * https://github.com/git-lfs/git-lfs/blob/master/docs/api/batch.md
# * https://github.com/git-lfs/git-lfs/blob/master/docs/api/basic-transfers.md
class Client
attr_reader :base_url
def initialize(base_url, credentials:)
@base_url = base_url
@credentials = credentials
end
def batch(operation, objects)
body = {
operation: operation,
transfers: ['basic'],
# We don't know `ref`, so can't send it
objects: objects.map { |object| { oid: object.oid, size: object.size } }
}
rsp = Gitlab::HTTP.post(
batch_url,
basic_auth: basic_auth,
body: body.to_json,
headers: { 'Content-Type' => 'application/vnd.git-lfs+json' }
)
raise BatchSubmitError unless rsp.success?
# HTTParty provides rsp.parsed_response, but it only kicks in for the
# application/json content type in the response, which we can't rely on
body = Gitlab::Json.parse(rsp.body)
transfer = body.fetch('transfer', 'basic')
raise UnsupportedTransferError.new(transfer.inspect) unless transfer == 'basic'
body
end
def upload(object, upload_action, authenticated:)
file = object.file.open
params = {
body_stream: file,
headers: {
'Content-Length' => object.size.to_s,
'Content-Type' => 'application/octet-stream'
}.merge(upload_action['header'] || {})
}
params[:basic_auth] = basic_auth unless authenticated
rsp = Gitlab::HTTP.put(upload_action['href'], params)
raise ObjectUploadError unless rsp.success?
ensure
file&.close
end
private
attr_reader :credentials
def batch_url
base_url + '/info/lfs/objects/batch'
end
def basic_auth
return unless credentials[:auth_method] == "password"
{ username: credentials[:user], password: credentials[:password] }
end
class BatchSubmitError < StandardError
def message
"Failed to submit batch"
end
end
class UnsupportedTransferError < StandardError
def initialize(transfer = nil)
super
@transfer = transfer
end
def message
"Unsupported transfer: #{@transfer}"
end
end
class ObjectUploadError < StandardError
def message
"Failed to upload object"
end
end
end
end
end
# frozen_string_literal: true
module QA
RSpec.describe 'Create' do
describe 'Push mirror a repository over HTTP' do
it 'configures and syncs LFS objects for a (push) mirrored repository', testcase: 'https://gitlab.com/gitlab-org/quality/testcases/-/issues/414' do
Runtime::Feature.enable_and_verify('push_mirror_syncs_lfs')
Runtime::Browser.visit(:gitlab, Page::Main::Login)
Page::Main::Login.perform(&:sign_in_using_credentials)
target_project = Resource::Project.fabricate_via_api! do |project|
project.name = 'push-mirror-target-project'
end
target_project_uri = target_project.repository_http_location.uri
target_project_uri.user = Runtime::User.username
source_project_push = Resource::Repository::ProjectPush.fabricate! do |push|
push.file_name = 'README.md'
push.file_content = '# This is a test project'
push.commit_message = 'Add README.md'
push.use_lfs = true
end
source_project_push.project.visit!
Page::Project::Menu.perform(&:go_to_repository_settings)
Page::Project::Settings::Repository.perform do |settings|
settings.expand_mirroring_repositories do |mirror_settings|
# Configure the source project to push to the target project
mirror_settings.repository_url = target_project_uri
mirror_settings.mirror_direction = 'Push'
mirror_settings.authentication_method = 'Password'
mirror_settings.password = Runtime::User.password
mirror_settings.mirror_repository
mirror_settings.update target_project_uri
end
end
# Check that the target project has the commit from the source
target_project.visit!
expect(page).to have_content('README.md')
expect(page).to have_content('The rendered file could not be displayed because it is stored in LFS')
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::Lfs::Client do
let(:base_url) { "https://example.com" }
let(:username) { 'user' }
let(:password) { 'password' }
let(:credentials) { { user: username, password: password, auth_method: 'password' } }
let(:basic_auth_headers) do
{ 'Authorization' => "Basic #{Base64.strict_encode64("#{username}:#{password}")}" }
end
let(:upload_action) do
{
"href" => "#{base_url}/some/file",
"header" => {
"Key" => "value"
}
}
end
subject(:lfs_client) { described_class.new(base_url, credentials: credentials) }
describe '#batch' do
let_it_be(:objects) { create_list(:lfs_object, 3) }
context 'server returns 200 OK' do
it 'makes a successful batch request' do
stub = stub_batch(
objects: objects,
headers: basic_auth_headers
).to_return(
status: 200,
body: { 'objects' => 'anything', 'transfer' => 'basic' }.to_json,
headers: { 'Content-Type' => 'application/vnd.git-lfs+json' }
)
result = lfs_client.batch('upload', objects)
expect(stub).to have_been_requested
expect(result).to eq('objects' => 'anything', 'transfer' => 'basic')
end
end
context 'server returns 400 error' do
it 'raises an error' do
stub_batch(objects: objects, headers: basic_auth_headers).to_return(status: 400)
expect { lfs_client.batch('upload', objects) }.to raise_error(/Failed/)
end
end
context 'server returns 500 error' do
it 'raises an error' do
stub_batch(objects: objects, headers: basic_auth_headers).to_return(status: 400)
expect { lfs_client.batch('upload', objects) }.to raise_error(/Failed/)
end
end
context 'server returns an exotic transfer method' do
it 'raises an error' do
stub_batch(
objects: objects,
headers: basic_auth_headers
).to_return(
status: 200,
body: { 'transfer' => 'carrier-pigeon' }.to_json,
headers: { 'Content-Type' => 'application/vnd.git-lfs+json' }
)
expect { lfs_client.batch('upload', objects) }.to raise_error(/Unsupported transfer/)
end
end
def stub_batch(objects:, headers:, operation: 'upload', transfer: 'basic')
objects = objects.map { |o| { oid: o.oid, size: o.size } }
body = { operation: operation, 'transfers': [transfer], objects: objects }.to_json
stub_request(:post, base_url + '/info/lfs/objects/batch').with(body: body, headers: headers)
end
end
describe "#upload" do
let_it_be(:object) { create(:lfs_object) }
context 'server returns 200 OK to an authenticated request' do
it "makes an HTTP PUT with expected parameters" do
stub_upload(object: object, headers: upload_action['header']).to_return(status: 200)
lfs_client.upload(object, upload_action, authenticated: true)
end
end
context 'server returns 200 OK to an unauthenticated request' do
it "makes an HTTP PUT with expected parameters" do
stub = stub_upload(
object: object,
headers: basic_auth_headers.merge(upload_action['header'])
).to_return(status: 200)
lfs_client.upload(object, upload_action, authenticated: false)
expect(stub).to have_been_requested
end
end
context 'LFS object has no file' do
let(:object) { LfsObject.new }
it 'makes an HJTT PUT with expected parameters' do
stub = stub_upload(
object: object,
headers: upload_action['header']
).to_return(status: 200)
lfs_client.upload(object, upload_action, authenticated: true)
expect(stub).to have_been_requested
end
end
context 'server returns 400 error' do
it 'raises an error' do
stub_upload(object: object, headers: upload_action['header']).to_return(status: 400)
expect { lfs_client.upload(object, upload_action, authenticated: true) }.to raise_error(/Failed/)
end
end
context 'server returns 500 error' do
it 'raises an error' do
stub_upload(object: object, headers: upload_action['header']).to_return(status: 500)
expect { lfs_client.upload(object, upload_action, authenticated: true) }.to raise_error(/Failed/)
end
end
def stub_upload(object:, headers:)
stub_request(:put, upload_action['href']).with(
body: object.file.read,
headers: headers.merge('Content-Length' => object.size.to_s)
)
end
end
end
...@@ -2901,6 +2901,20 @@ RSpec.describe Project do ...@@ -2901,6 +2901,20 @@ RSpec.describe Project do
end end
end end
describe '#lfs_objects_for_repository_types' do
let(:project) { create(:project) }
it 'returns LFS objects of the specified type only' do
none, design, wiki = *[nil, :design, :wiki].map do |type|
create(:lfs_objects_project, project: project, repository_type: type).lfs_object
end
expect(project.lfs_objects_for_repository_types(nil)).to contain_exactly(none)
expect(project.lfs_objects_for_repository_types(nil, :wiki)).to contain_exactly(none, wiki)
expect(project.lfs_objects_for_repository_types(:design)).to contain_exactly(design)
end
end
context 'forks' do context 'forks' do
include ProjectForksHelper include ProjectForksHelper
......
...@@ -142,6 +142,20 @@ RSpec.describe RemoteMirror, :mailer do ...@@ -142,6 +142,20 @@ RSpec.describe RemoteMirror, :mailer do
end end
end end
describe '#bare_url' do
it 'returns the URL without any credentials' do
remote_mirror = build(:remote_mirror, url: 'http://user:pass@example.com/foo')
expect(remote_mirror.bare_url).to eq('http://example.com/foo')
end
it 'returns an empty string when the URL is nil' do
remote_mirror = build(:remote_mirror, url: nil)
expect(remote_mirror.bare_url).to eq('')
end
end
describe '#update_repository' do describe '#update_repository' do
it 'performs update including options' do it 'performs update including options' do
git_remote_mirror = stub_const('Gitlab::Git::RemoteMirror', spy) git_remote_mirror = stub_const('Gitlab::Git::RemoteMirror', spy)
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Lfs::PushService do
let(:logger) { service.send(:logger) }
let(:lfs_client) { service.send(:lfs_client) }
let_it_be(:project) { create(:forked_project_with_submodules) }
let_it_be(:remote_mirror) { create(:remote_mirror, project: project, enabled: true) }
let_it_be(:lfs_object) { create_linked_lfs_object(project, :project) }
let(:params) { { url: remote_mirror.bare_url, credentials: remote_mirror.credentials } }
subject(:service) { described_class.new(project, nil, params) }
describe "#execute" do
it 'uploads the object when upload is requested' do
stub_lfs_batch(lfs_object)
expect(lfs_client)
.to receive(:upload)
.with(lfs_object, upload_action_spec(lfs_object), authenticated: true)
expect(service.execute).to eq(status: :success)
end
it 'does nothing if there are no LFS objects' do
lfs_object.destroy!
expect(lfs_client).not_to receive(:upload)
expect(service.execute).to eq(status: :success)
end
it 'does not upload the object when upload is not requested' do
stub_lfs_batch(lfs_object, upload: false)
expect(lfs_client).not_to receive(:upload)
expect(service.execute).to eq(status: :success)
end
it 'returns a failure when submitting a batch fails' do
expect(lfs_client).to receive(:batch) { raise 'failed' }
expect(service.execute).to eq(status: :error, message: 'failed')
end
it 'returns a failure when submitting an upload fails' do
stub_lfs_batch(lfs_object)
expect(lfs_client).to receive(:upload) { raise 'failed' }
expect(service.execute).to eq(status: :error, message: 'failed')
end
context 'non-project-repository LFS objects' do
let_it_be(:nil_lfs_object) { create_linked_lfs_object(project, nil) }
let_it_be(:wiki_lfs_object) { create_linked_lfs_object(project, :wiki) }
let_it_be(:design_lfs_object) { create_linked_lfs_object(project, :design) }
it 'only tries to upload the project-repository LFS object' do
stub_lfs_batch(nil_lfs_object, lfs_object, upload: false)
expect(service.execute).to eq(status: :success)
end
end
end
def create_linked_lfs_object(project, type)
create(:lfs_objects_project, project: project, repository_type: type).lfs_object
end
def stub_lfs_batch(*objects, upload: true)
expect(lfs_client)
.to receive(:batch).with('upload', containing_exactly(*objects))
.and_return('transfer' => 'basic', 'objects' => objects.map { |o| object_spec(o, upload: upload) })
end
def batch_spec(*objects, upload: true)
{ 'transfer' => 'basic', 'objects' => objects.map {|o| object_spec(o, upload: upload) } }
end
def object_spec(object, upload: true)
{ 'oid' => object.oid, 'size' => object.size, 'authenticated' => true }.tap do |spec|
spec['actions'] = { 'upload' => upload_action_spec(object) } if upload
end
end
def upload_action_spec(object)
{ 'href' => "https://example.com/#{object.oid}/#{object.size}", 'header' => { 'Key' => 'value' } }
end
end
...@@ -3,9 +3,10 @@ ...@@ -3,9 +3,10 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe Projects::UpdateRemoteMirrorService do RSpec.describe Projects::UpdateRemoteMirrorService do
let(:project) { create(:project, :repository) } let_it_be(:project) { create(:project, :repository, lfs_enabled: true) }
let(:remote_project) { create(:forked_project_with_submodules) } let_it_be(:remote_project) { create(:forked_project_with_submodules) }
let(:remote_mirror) { create(:remote_mirror, project: project, enabled: true) } let_it_be(:remote_mirror) { create(:remote_mirror, project: project, enabled: true) }
let(:remote_name) { remote_mirror.remote_name } let(:remote_name) { remote_mirror.remote_name }
subject(:service) { described_class.new(project, project.creator) } subject(:service) { described_class.new(project, project.creator) }
...@@ -127,5 +128,63 @@ RSpec.describe Projects::UpdateRemoteMirrorService do ...@@ -127,5 +128,63 @@ RSpec.describe Projects::UpdateRemoteMirrorService do
expect(remote_mirror.last_error).to include("refs/heads/develop") expect(remote_mirror.last_error).to include("refs/heads/develop")
end end
end end
context "sending lfs objects" do
let_it_be(:lfs_pointer) { create(:lfs_objects_project, project: project) }
before do
stub_lfs_setting(enabled: true)
end
context 'feature flag enabled' do
before do
stub_feature_flags(push_mirror_syncs_lfs: true)
end
it 'pushes LFS objects to a HTTP repository' do
expect_next_instance_of(Lfs::PushService) do |service|
expect(service).to receive(:execute)
end
execute!
end
it 'does nothing to an SSH repository' do
remote_mirror.update!(url: 'ssh://example.com')
expect_any_instance_of(Lfs::PushService).not_to receive(:execute)
execute!
end
it 'does nothing if LFS is disabled' do
expect(project).to receive(:lfs_enabled?) { false }
expect_any_instance_of(Lfs::PushService).not_to receive(:execute)
execute!
end
it 'does nothing if non-password auth is specified' do
remote_mirror.update!(auth_method: 'ssh_public_key')
expect_any_instance_of(Lfs::PushService).not_to receive(:execute)
execute!
end
end
context 'feature flag disabled' do
before do
stub_feature_flags(push_mirror_syncs_lfs: false)
end
it 'does nothing' do
expect_any_instance_of(Lfs::PushService).not_to receive(:execute)
execute!
end
end
end
end end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment