Commit 2d6bb65b authored by Douglas Barbosa Alexandre's avatar Douglas Barbosa Alexandre

Merge branch...

Merge branch '213829-geo-integrate-package-file-replication-with-framework-blob-endpoint' into 'master'

Geo: Integrate Package File replication with framework Blob endpoint

Closes #213829

See merge request gitlab-org/gitlab!29499
parents 06aed758 85c58de0
...@@ -11,9 +11,6 @@ module Geo ...@@ -11,9 +11,6 @@ module Geo
event :created event :created
end end
class_methods do
end
def handle_after_create_commit def handle_after_create_commit
publish(:created, **created_params) publish(:created, **created_params)
...@@ -23,7 +20,7 @@ module Geo ...@@ -23,7 +20,7 @@ module Geo
end end
# Called by Gitlab::Geo::Replicator#consume # Called by Gitlab::Geo::Replicator#consume
def consume_created_event def consume_event_created(**params)
download download
end end
...@@ -53,6 +50,10 @@ module Geo ...@@ -53,6 +50,10 @@ module Geo
private private
# Update checksum on Geo primary database
#
# @param [String] checksum value generated by the checksum routine
# @param [String] failure (optional) stacktrace from failed execution
def update_verification_state!(checksum: nil, failure: nil) def update_verification_state!(checksum: nil, failure: nil)
retry_at, retry_count = calculate_next_retry_attempt if failure.present? retry_at, retry_count = calculate_next_retry_attempt if failure.present?
......
...@@ -175,6 +175,19 @@ class GeoNode < ApplicationRecord ...@@ -175,6 +175,19 @@ class GeoNode < ApplicationRecord
@internal_uri ||= URI.parse(internal_url) if internal_url.present? @internal_uri ||= URI.parse(internal_url) if internal_url.present?
end end
# Geo API endpoint for retrieving a replicable item
#
# @param [String] replicable_name
# @param [Integer] replicable_id
def geo_retrieve_url(replicable_name, replicable_id)
geo_api_url("retrieve/#{replicable_name}/#{replicable_id}")
end
# Geo API endpoint for retrieving a file based on Uploads
#
# @deprecated
# @param [String] file_type
# @param [Integer] file_id
def geo_transfers_url(file_type, file_id) def geo_transfers_url(file_type, file_id)
geo_api_url("transfers/#{file_type}/#{file_id}") geo_api_url("transfers/#{file_type}/#{file_id}")
end end
......
...@@ -10,19 +10,19 @@ module Geo ...@@ -10,19 +10,19 @@ module Geo
def initialize(replicable_name, event_name, payload) def initialize(replicable_name, event_name, payload)
@replicable_name = replicable_name @replicable_name = replicable_name
@event_name = event_name @event_name = event_name.to_sym
@payload = payload @payload = payload.symbolize_keys
end end
def execute def execute
replicator.consume(event_name, payload) replicator.consume(event_name, **payload)
end end
private private
def replicator def replicator
strong_memoize(:replicator) do strong_memoize(:replicator) do
model_record_id = payload['model_record_id'] model_record_id = payload[:model_record_id]
replicator_class = ::Gitlab::Geo::Replicator.for_replicable_name(replicable_name) replicator_class = ::Gitlab::Geo::Replicator.for_replicable_name(replicable_name)
replicator_class.new(model_record_id: model_record_id) replicator_class.new(model_record_id: model_record_id)
......
...@@ -4,6 +4,7 @@ module Geo ...@@ -4,6 +4,7 @@ module Geo
class EventWorker # rubocop:disable Scalability/IdempotentWorker class EventWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker include ApplicationWorker
include GeoQueue include GeoQueue
include ::Gitlab::Geo::LogHelpers
sidekiq_options retry: 3, dead: false sidekiq_options retry: 3, dead: false
......
...@@ -33,9 +33,9 @@ module API ...@@ -33,9 +33,9 @@ module API
authorize_geo_transfer!(replicable_name: params[:replicable_name], id: params[:id]) authorize_geo_transfer!(replicable_name: params[:replicable_name], id: params[:id])
decoded_params = jwt_decoder.decode decoded_params = jwt_decoder.decode
service = Geo::BlobUploadService.new(replicable_name: params[:replicable_name], service = ::Geo::BlobUploadService.new(replicable_name: params[:replicable_name],
blob_id: params[:id], blob_id: params[:id],
decoded_params: decoded_params) decoded_params: decoded_params)
service.execute service.execute
end end
......
...@@ -15,6 +15,9 @@ module Gitlab ...@@ -15,6 +15,9 @@ module Gitlab
end end
class_methods do class_methods do
# Associate current model with specified replicator
#
# @param [Gitlab::Geo::Replicator] klass
def with_replicator(klass) def with_replicator(klass)
raise ArgumentError, 'Must be a class inheriting from Gitlab::Geo::Replicator' unless klass < ::Gitlab::Geo::Replicator raise ArgumentError, 'Must be a class inheriting from Gitlab::Geo::Replicator' unless klass < ::Gitlab::Geo::Replicator
...@@ -28,11 +31,13 @@ module Gitlab ...@@ -28,11 +31,13 @@ module Gitlab
# Geo Replicator # Geo Replicator
# #
# @abstract
# @return [Gitlab::Geo::Replicator] # @return [Gitlab::Geo::Replicator]
def replicator def replicator
raise NotImplementedError, 'There is no Replicator defined for this model' raise NotImplementedError, 'There is no Replicator defined for this model'
end end
# Clear model verification checksum and force recalculation
def calculate_checksum! def calculate_checksum!
self.verification_checksum = nil self.verification_checksum = nil
...@@ -41,10 +46,20 @@ module Gitlab ...@@ -41,10 +46,20 @@ module Gitlab
self.verification_checksum = self.class.hexdigest(file.path) self.verification_checksum = self.class.hexdigest(file.path)
end end
# Checks whether model needs checksum to be performed
#
# Conditions:
# - No checksum is present
# - It's capable of generating a checksum of itself
#
# @return [Boolean]
def needs_checksum? def needs_checksum?
verification_checksum.nil? && checksummable? verification_checksum.nil? && checksummable?
end end
# Return whether its capable of generating a checksum of itself
#
# @return [Boolean] whether it can generate a checksum
def checksummable? def checksummable?
local? && file_exist? local? && file_exist?
end end
......
...@@ -49,8 +49,7 @@ module Gitlab ...@@ -49,8 +49,7 @@ module Gitlab
# @return [String] URL to download the resource from # @return [String] URL to download the resource from
def resource_url def resource_url
# TODO change to Generalized API endpoint after that is implemented Gitlab::Geo.primary_node.geo_retrieve_url(replicable_name, model_record.id.to_s)
Gitlab::Geo.primary_node.geo_transfers_url(replicable_name, model_record.id.to_s)
end end
private private
...@@ -65,7 +64,7 @@ module Gitlab ...@@ -65,7 +64,7 @@ module Gitlab
def request_headers def request_headers
request_data = { request_data = {
replicable_name: replicable_name, replicable_name: replicable_name,
model_record_id: model_record.id id: model_record.id
} }
TransferRequest.new(request_data).headers TransferRequest.new(request_data).headers
......
...@@ -3,6 +3,8 @@ ...@@ -3,6 +3,8 @@
module Gitlab module Gitlab
module Geo module Geo
module Replication module Replication
# Handles retrieval of a blob to be returned via the Geo API request
#
class BlobRetriever < BaseRetriever class BlobRetriever < BaseRetriever
attr_reader :replicator, :checksum attr_reader :replicator, :checksum
......
...@@ -2,9 +2,19 @@ ...@@ -2,9 +2,19 @@
module Gitlab module Gitlab
module Geo module Geo
# Geo Replicators are objects that know how to replicate a replicable resource
#
# A replicator is responsible for:
# - firing events (producer)
# - consuming events (consumer)
#
# Each replicator is tied to a specific replicable resource
class Replicator class Replicator
include ::Gitlab::Geo::LogHelpers include ::Gitlab::Geo::LogHelpers
attr_reader :model_record_id
delegate :model, to: :class
# Declare supported event # Declare supported event
# #
# @example Declaring support for :update and :delete events # @example Declaring support for :update and :delete events
...@@ -29,22 +39,34 @@ module Gitlab ...@@ -29,22 +39,34 @@ module Gitlab
# Check if the replicator supports a specific event # Check if the replicator supports a specific event
# #
# @param [Boolean] event_name # @param [Symbol] event_name
# @return [Boolean] whether event support was registered in the replicator
def self.event_supported?(event_name) def self.event_supported?(event_name)
@events.include?(event_name.to_sym) @events.include?(event_name.to_sym)
end end
# Return the name of the replicator # Return the name of the replicable, e.g. "package_file"
#
# This can be used to retrieve the replicator class again
# by using the `.for_replicable_name` method
# #
# @return [String] name # @see .for_replicable_name
# @return [String] slug that identifies this replicator
def self.replicable_name def self.replicable_name
self.name.demodulize.sub('Replicator', '').underscore self.name.demodulize.sub('Replicator', '').underscore
end end
# Return the registry related to the replicable resource
#
# @return [Class<Geo::BaseRegistry>] registry class
def self.registry_class def self.registry_class
const_get("::Geo::#{replicable_name.camelize}Registry", false) const_get("::Geo::#{replicable_name.camelize}Registry", false)
end end
# Given a `replicable_name`, return the corresponding replicator
#
# @param [String] replicable_name the replicable slug
# @return [Class<Geo::Replicator>] replicator implementation
def self.for_replicable_name(replicable_name) def self.for_replicable_name(replicable_name)
replicator_class_name = "::Geo::#{replicable_name.camelize}Replicator" replicator_class_name = "::Geo::#{replicable_name.camelize}Replicator"
...@@ -67,15 +89,17 @@ module Gitlab ...@@ -67,15 +89,17 @@ module Gitlab
model.count model.count
end end
attr_reader :model_record_id # @param [ActiveRecord::Base] model_record
# @param [Integer] model_record_id
delegate :model, to: :class
def initialize(model_record: nil, model_record_id: nil) def initialize(model_record: nil, model_record_id: nil)
@model_record = model_record @model_record = model_record
@model_record_id = model_record_id @model_record_id = model_record_id
end end
# Instance of the replicable model
#
# @return [ActiveRecord::Base, nil]
# @raise ActiveRecord::RecordNotFound when a model with specified model_record_id can't be found
def model_record def model_record
if defined?(@model_record) && @model_record if defined?(@model_record) && @model_record
return @model_record return @model_record
...@@ -86,6 +110,10 @@ module Gitlab ...@@ -86,6 +110,10 @@ module Gitlab
end end
end end
# Publish an event with its related data
#
# @param [Symbol] event_name
# @param [Hash] event_data
def publish(event_name, **event_data) def publish(event_name, **event_data)
return unless Feature.enabled?(:geo_self_service_framework) return unless Feature.enabled?(:geo_self_service_framework)
...@@ -104,33 +132,45 @@ module Gitlab ...@@ -104,33 +132,45 @@ module Gitlab
# This method is called by the GeoLogCursor when reading the event from the queue # This method is called by the GeoLogCursor when reading the event from the queue
# #
# @param [Symbol] event_name # @param [Symbol] event_name
# @param [Hash] params contextual data published with the event # @param [Hash] event_data contextual data published with the event
def consume(event_name, **params) def consume(event_name, **event_data)
raise ArgumentError, "Unsupported event: '#{event_name}'" unless self.class.event_supported?(event_name) raise ArgumentError, "Unsupported event: '#{event_name}'" unless self.class.event_supported?(event_name)
consume_method = "consume_#{event_name}".to_sym consume_method = "consume_event_#{event_name}".to_sym
raise NotImplementedError, "Consume method not implemented: '#{consume_method}'" unless instance_method_defined?(consume_method) raise NotImplementedError, "Consume method not implemented: '#{consume_method}'" unless self.methods.include?(consume_method)
# Inject model_record based on included class # Inject model_record based on included class
if model_record if model_record
params[:model_record] = model_record event_data[:model_record] = model_record
end end
send(consume_method, **params) # rubocop:disable GitlabSecurity/PublicSend send(consume_method, **event_data) # rubocop:disable GitlabSecurity/PublicSend
end end
# Return the name of the replicator
#
# @return [String] slug that identifies this replicator
def replicable_name def replicable_name
self.class.replicable_name self.class.replicable_name
end end
# Return the registry related to the replicable resource
#
# @return [Class<Geo::BaseRegistry>] registry class
def registry_class def registry_class
self.class.registry_class self.class.registry_class
end end
# Return registry instance scoped to current model
#
# @return [Geo::BaseRegistry] registry instance
def registry def registry
registry_class.for_model_record_id(model_record.id) registry_class.for_model_record_id(model_record.id)
end end
# Checksum value from the main database
#
# @abstract
def primary_checksum def primary_checksum
nil nil
end end
...@@ -159,16 +199,6 @@ module Gitlab ...@@ -159,16 +199,6 @@ module Gitlab
rescue ActiveRecord::RecordInvalid, NoMethodError => e rescue ActiveRecord::RecordInvalid, NoMethodError => e
log_error("#{class_name} could not be created", e, params) log_error("#{class_name} could not be created", e, params)
end end
private
# Checks if method is implemented by current class (ignoring inherited methods)
#
# @param [Symbol] method_name
# @return [Boolean] whether method is implemented
def instance_method_defined?(method_name)
self.class.instance_methods(false).include?(method_name)
end
end end
end end
end end
...@@ -25,8 +25,8 @@ describe Gitlab::Geo::LogCursor::Events::Event, :clean_gitlab_redis_shared_state ...@@ -25,8 +25,8 @@ describe Gitlab::Geo::LogCursor::Events::Event, :clean_gitlab_redis_shared_state
it "eventually calls Replicator#consume", :sidekiq_inline do it "eventually calls Replicator#consume", :sidekiq_inline do
expect_next_instance_of(::Geo::PackageFileReplicator) do |replicator| expect_next_instance_of(::Geo::PackageFileReplicator) do |replicator|
expect(replicator).to receive(:consume).with( expect(replicator).to receive(:consume).with(
"created", :created,
{ "model_record_id" => replicable.id } { model_record_id: replicable.id }
) )
end end
......
...@@ -30,7 +30,7 @@ describe Gitlab::Geo::Replicator do ...@@ -30,7 +30,7 @@ describe Gitlab::Geo::Replicator do
protected protected
def publish_test(other:) def consume_event_test(user:, other:)
true true
end end
end end
...@@ -118,5 +118,19 @@ describe Gitlab::Geo::Replicator do ...@@ -118,5 +118,19 @@ describe Gitlab::Geo::Replicator do
end end
end end
end end
describe '#consume' do
subject { DummyReplicator.new }
it 'accepts valid attributes' do
expect { subject.consume(:test, user: 'something', other: 'something else') }.not_to raise_error
end
it 'calls corresponding method with specified named attributes' do
expect(subject).to receive(:consume_event_test).with(user: 'something', other: 'something else')
subject.consume(:test, user: 'something', other: 'something else')
end
end
end end
end end
# frozen_string_literal: true
require 'spec_helper'
describe Geo::EventService do
include ::EE::GeoHelpers
let_it_be(:primary) { create(:geo_node, :primary) }
let_it_be(:secondary) { create(:geo_node) }
let(:model_record) { create(:package_file, :npm) }
subject { described_class.new('package_file', 'created', { 'model_record_id' => model_record.id }) }
describe '#execute' do
before do
resource_url = primary.geo_retrieve_url('package_file', model_record.id.to_s)
content = model_record.file.open
File.unlink(model_record.file.path)
stub_request(:get, resource_url).to_return(status: 200, body: content)
stub_current_geo_node(secondary)
end
it 'executes the consume part of the replication' do
subject.execute
expect(model_record.file_exist?).to be_truthy
end
end
end
...@@ -76,7 +76,7 @@ RSpec.shared_examples 'a blob replicator' do ...@@ -76,7 +76,7 @@ RSpec.shared_examples 'a blob replicator' do
expect(service).to receive(:execute) expect(service).to receive(:execute)
expect(::Geo::BlobDownloadService).to receive(:new).with(replicator: replicator).and_return(service) expect(::Geo::BlobDownloadService).to receive(:new).with(replicator: replicator).and_return(service)
replicator.consume_created_event replicator.consume_event_created
end end
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment